It just so happens that the @akkateam released Akka 2.2. Akka 2.2 includes, amongst others, the new IO. In this post, I will show how to use the 1.2 version of Spray Client with the new Akka IO. I am going to build a simple repository crawler, including tests. You will see just how simple & powerful it is to use the new Akka IO and how nicely Spray integrates into it.

Alongside Scala 2.10.2, we will be using the following libraries in our example:

"com.typesafe.akka" %% "akka-actor"   % "2.2.0",
"com.typesafe.akka" %% "akka-testkit" % "2.2.0",
"io.spray"           % "spray-client" % "1.2-20130705",
"org.specs2"        %% "specs2"       % "2.0"

As motivation, you can see that the new Akka IO and Spray Client can make the network interfaces and CPUs work hard enough.

Working hard

Repository scanner

In a synchronous and blocking world, a repository crawler would make a GET request to the root URL of the repository and get the response. Then, for every "directory" link, it would append the link's location to the root URL and make another GET request, ..., until we reach a "file" link, which then represents the artifact. However, executing all this serially would take a very long time. Let's see if we can speed up the process using Akka IO and Spray Client.

We will do the same thing, except that we use non-blocking IO and we will run some of the requests in paralel in an actor. Our actor will receive the Scan message, and it will start by making the initial GET request to some baseUrl. When it receives the response, it will then make further GET requests to process each directory or file and descend further or store the discovered artifact. Let's see what it looks like in code:

object PlainRepositoryScanner {
  case object Scan
}

class PlainRepositoryScanner(baseUrl: String, dependencyStorage: ActorRef) extends Actor {
  require(baseUrl.endsWith("/"), "The URL must end with /")
  require(baseUrl.startsWith("http://") || baseUrl.startsWith("https://"), 
  	"The URL must start with http:// or https://")

  import scala.concurrent.duration._
  import context.dispatcher
  import PlainRepositoryScanner._

  def receive: Receive = {
    case Scan => descend(Nil)
    case elements: Elements => descend(elements)
  }

  private implicit val timeout = Timeout(10.seconds)
  private type Elements = List[String]

  private val pipeline = sendReceive ~> unmarshal[Elements]
  private implicit object StringUnmarshaller extends Unmarshaller[Elements] {
  	...
  }

  def descend(elements: Elements): Unit = {
    def prepareUrl(elements: Elements): String = ...
    def processResponse: PartialFunction[Try[Elements], Unit] = ...

    val url = prepareUrl(elements)
    pipeline(Get(url)) onComplete processResponse
  }

}

After the initial require sanity checks, we define the receive function. When it receives the Scan message, we descend with empty current [path] elements. To make the code clearer, I define type alias for Elements to be simply List[String]. Once that's out of the way, I construct the Spray Client pipeline: sendReceive ~> unmarshal[Elements]. The pipeline, when it completes is going to take the response and pass it to the instance of the Unmarshaller for type Elements.

The heavy lifting happens in the descend method: its overall structure is

def descend(elements: Elements): Unit = {
  def prepareUrl(elements: Elements): String = ...
  def processResponse: PartialFunction[Try[Elements], Unit] = ...

  val url = prepareUrl(elements)
  pipeline(Get(url)) onComplete processResponse
}

The pipeline makes a GET request to the url constructed from the current elements; when the I/O operation completes, it gets applied to the partial function that the processResponse returns. In that partial function, we decide whether to descend further or whether we have found our artifact.

Implementing the descend function

Let's get on with implementing the inner functions of the descend function, starting with the rough outline.

def descend(elements: Elements): Unit = {
  def prepareUrl(elements: Elements): String = baseUrl + elements.mkString("")

  def processResponse: PartialFunction[Try[Elements], Unit] = {
    case Success(newElements) =>
      // either descend or store artifact
    case Failure(exception) =>
      // report scan error
  }

  val url = prepareUrl(elements)
  pipeline(Get(url)) onComplete processResponse(elements)
}

So far, so good. We can make the HTTP requests and receive a response that is unmarshalled to instance of Elements. To complete our work, we need to decide whether the link is follows down, out, or whether it is an artifact.

def descend(elements: Elements): Unit = {
  def prepareUrl(elements: Elements): String = baseUrl + elements.mkString("")

  def dropLastSlash(s: String): String = {
    val lastSlash = s.lastIndexOf('/')
    if (lastSlash != -1) s.substring(0, lastSlash) else s
  }

  def processResponse: PartialFunction[Try[Elements], Unit] = {
    case Success(newElements) =>
      newElements foreach {
        case element if element startsWith "?" =>
        // stay on the same page; skip
        case element if !element.startsWith("/") && !element.startsWith("http") && element.endsWith("/") =>
          // this is another 'directory' on the same 'page'
          self ! elements :+ element
        case element if !element.startsWith("/") && !element.startsWith("http") && element.endsWith(".jar") =>
          // this is a .jar; not a directory
          val (rawVersion::rawArtifactId::rawGroupId) = elements.reverse
          val groupId = rawGroupId.reverse.map(dropLastSlash) mkString "."
          val version = dropLastSlash(rawVersion)
          val artifactId = dropLastSlash(rawArtifactId)
          dependencyStorage ! (baseUrl, groupId, artifactId, versionId)
        case x =>
          // some unknown form
      }
    case Failure(exception) =>
      // report scan error
  }

  val url = prepareUrl(elements)
  pipeline(Get(url)) onComplete processResponse(elements)
}

This is rather crude version, but it demonstrates what we are doing quite clearly. The elements are the path elements, for example List(com, typesafe, akka, akka-actor, 2.2.0, akka-actor_2.2.0.jar), to turn them into Maven-style artifact, we simply ignore the last element (the jar), reverse and pattern match.

val (rawVersion::rawArtifactId::rawGroupId) = elements.reverse

And thus, rawVersion is "2.2.0"; rawArtifactId is "akka-actor", and rawGroupId is List(akka, typesafe, com). From this, we can easily construct the artifact. (For now, it is a tuple containing its elements.)

Implementing Unmarshaller[Elements]

To complete our actor, we must implement the instance of the Unmarshaller typeclass for the type Elements. In Scala-speak this means completing the private implicit object StringUnmarshaller. I shall be lazy and abuse Scala's regular expressions to pick out all targets of a HTML elements.

private implicit object StringUnmarshaller extends Unmarshaller[Elements] {
  val AHrefRegex = """<a href="([^"]*)">[^<]*</a>""".r
  def apply(entity: HttpEntity): Deserialized[Elements] = {
    val body = entity.asString
    val matches = AHrefRegex.findAllMatchIn(body)
    val hrefs = matches.map(_.group(1)).toList
    Right(hrefs)
  }
}

Testing

It is tempting to just take some live repository and start firing requests at it. Apart from being rude (how would you feel if I started hitting your repository with hundreds of requests?), we would not be able to make any sensible assertions. It turns out that we'll have to implement our own repository server for the tests alone. The good news is that it won't be that hard.

Let's start with the simplified version of the spec, though. (I leave the complete implementation, which will assert that the discovered artifacts match the served ones as exercise for the readers. That's how kind I am!)

class PlainRepositoryScannerSpec extends TestKit(ActorSystem()) with SpecificationLike {
  sequential
  import PlainRepositoryScanner._

  "Plain repository" >> {
    val count = 5000
    val port = 12345
    val scanner = system.actorOf(
    	Props(new PlainRepositoryScanner(s"http://localhost:$port/", testActor)))

    "scan" in {
      val repository = Repository(Artifacts.generateArtifacts(3, count), port)
      scanner ! Scan
      receiveN(5000, FiniteDuration(30, TimeUnit.SECONDS))
      repository.stop()
      success
    }
  }

}

The test is pretty straight-forward. We construct our PlainRepositoryScanner, giving it a reference to our test-only repository server and a reference to the actor that will receive the discovered artifacts. Because we will want to examine the received messages, we can pass in the testActor. In the body of the example, we start the test-only repository, which serves count number of artifacts in a hierarchy three levels deep, bound to TCP port port. Then we send the Scan message to our scanner and give it up to 30 seconds to discover all count artifacts.

And that's all there is to it!

Test repository

If you are desperate to find out how I've implemented the Repository, I give you its full source here:

sealed trait Dependency {
  def groupId: GroupId
  def artifactId: ArtefactId
}
sealed trait VersionedDependency extends Dependency {
  def version: Version
}


class Repository private(system: ActorSystem, port: Int, artifacts: List[VersionedDependency],
                         elementsToBody: List[String] => String) {

  val blackHoleActor = system.actorOf(Props(new Actor {
    def receive: Receive = Actor.emptyBehavior
  }))

  private class Service extends Actor {
    def vdToUris(vd: VersionedDependency): List[String] = {
      val marker = '\ufffe'
      val path = vd.groupId.replace('.', marker) + marker + vd.artifactId + marker + vd.version.version
      val jar  = vd.artifactId + "_" + vd.version.version + ".jar"
      path.split(marker).map(_ + "/").toList :+ jar
    }
    val paths = artifacts.map(vdToUris)

    def receive: Receive = {
      case _: Http.Connected =>
        sender ! Http.Register(self)
      case HttpRequest(HttpMethods.GET, Uri.Path(requestPath), _, _, _) =>
        val requestPathSegments = if (requestPath == "/") Nil else requestPath.split("/").toList.map(_ + "/").tail
        val elements = 
        	paths.filter(_.startsWith(requestPathSegments)).
        	flatMap(_.slice(requestPathSegments.length, requestPathSegments.length + 1)).
        	distinct
        val body = elementsToBody(elements)
        sender ! HttpResponse(entity = HttpEntity(body))
      case _ =>
    }
  }

  private val service = 
    system.actorOf(Props(new Service).withRouter(RoundRobinRouter(nrOfInstances = 50)))
  private val io = IO(Http)(system)
  io.tell(Http.Bind(service, "localhost", port = port), blackHoleActor)

  def stop(): Unit = {
    io.tell(Http.Unbind, blackHoleActor)
    system.stop(service)
    system.stop(io)
  }
}

object Repository {

  private def trivialElementsToBody(elements: List[String]): String = {
    val builder = new mutable.StringBuilder()

    elements.foreach(element =>
      builder ++= s"""<a href="$element">$element</a>\n"""
    )

    builder.toString()
  }

  def apply(artifacts: List[VersionedDependency], port: Int)(implicit system: ActorSystem): Repository = {
    new Repository(system, port, artifacts, trivialElementsToBody)
  }

}