Spray and Akka IO July 8, 2013
It just so happens that the @akkateam released Akka 2.2. Akka 2.2 includes, amongst others, the new IO. In this post, I will show how to use the 1.2 version of Spray Client with the new Akka IO. I am going to build a simple repository crawler, including tests. You will see just how simple & powerful it is to use the new Akka IO and how nicely Spray integrates into it.
Alongside Scala 2.10.2, we will be using the following libraries in our example:
"com.typesafe.akka" %% "akka-actor" % "2.2.0",
"com.typesafe.akka" %% "akka-testkit" % "2.2.0",
"io.spray" % "spray-client" % "1.2-20130705",
"org.specs2" %% "specs2" % "2.0"
As motivation, you can see that the new Akka IO and Spray Client can make the network interfaces and CPUs work hard enough.
Repository scanner
In a synchronous and blocking world, a repository crawler would make a GET request to the root URL of the repository and get the response. Then, for every "directory" link, it would append the link's location to the root URL and make another GET request, ..., until we reach a "file" link, which then represents the artifact. However, executing all this serially would take a very long time. Let's see if we can speed up the process using Akka IO and Spray Client.
We will do the same thing, except that we use non-blocking IO and we will run some of the requests in paralel in an actor. Our actor will receive the Scan
message, and it will start by making the initial GET request to some baseUrl
. When it receives the response, it will then make further GET requests to process each directory or file and descend further or store the discovered artifact. Let's see what it looks like in code:
object PlainRepositoryScanner {
case object Scan
}
class PlainRepositoryScanner(baseUrl: String, dependencyStorage: ActorRef) extends Actor {
require(baseUrl.endsWith("/"), "The URL must end with /")
require(baseUrl.startsWith("http://") || baseUrl.startsWith("https://"),
"The URL must start with http:// or https://")
import scala.concurrent.duration._
import context.dispatcher
import PlainRepositoryScanner._
def receive: Receive = {
case Scan => descend(Nil)
case elements: Elements => descend(elements)
}
private implicit val timeout = Timeout(10.seconds)
private type Elements = List[String]
private val pipeline = sendReceive ~> unmarshal[Elements]
private implicit object StringUnmarshaller extends Unmarshaller[Elements] {
...
}
def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = ...
def processResponse: PartialFunction[Try[Elements], Unit] = ...
val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse
}
}
After the initial require
sanity checks, we define the receive
function. When it receives the Scan
message, we descend
with empty current [path] elements. To make the code clearer, I define type alias for Elements
to be simply List[String]
. Once that's out of the way, I construct the Spray Client pipeline: sendReceive ~> unmarshal[Elements]
. The pipeline, when it completes is going to take the response and pass it to the instance of the Unmarshaller
for type Elements
.
The heavy lifting happens in the descend
method: its overall structure is
def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = ...
def processResponse: PartialFunction[Try[Elements], Unit] = ...
val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse
}
The pipeline makes a GET request to the url
constructed from the current elements; when the I/O operation completes, it gets applied to the partial function that the processResponse
returns. In that partial function, we decide whether to descend further or whether we have found our artifact.
Implementing the descend function
Let's get on with implementing the inner functions of the descend
function, starting with the rough outline.
def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = baseUrl + elements.mkString("")
def processResponse: PartialFunction[Try[Elements], Unit] = {
case Success(newElements) =>
// either descend or store artifact
case Failure(exception) =>
// report scan error
}
val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse(elements)
}
So far, so good. We can make the HTTP requests and receive a response that is unmarshalled to instance of Elements
. To complete our work, we need to decide whether the link is follows down, out, or whether it is an artifact.
def descend(elements: Elements): Unit = {
def prepareUrl(elements: Elements): String = baseUrl + elements.mkString("")
def dropLastSlash(s: String): String = {
val lastSlash = s.lastIndexOf('/')
if (lastSlash != -1) s.substring(0, lastSlash) else s
}
def processResponse: PartialFunction[Try[Elements], Unit] = {
case Success(newElements) =>
newElements foreach {
case element if element startsWith "?" =>
// stay on the same page; skip
case element if !element.startsWith("/") && !element.startsWith("http") && element.endsWith("/") =>
// this is another 'directory' on the same 'page'
self ! elements :+ element
case element if !element.startsWith("/") && !element.startsWith("http") && element.endsWith(".jar") =>
// this is a .jar; not a directory
val (rawVersion::rawArtifactId::rawGroupId) = elements.reverse
val groupId = rawGroupId.reverse.map(dropLastSlash) mkString "."
val version = dropLastSlash(rawVersion)
val artifactId = dropLastSlash(rawArtifactId)
dependencyStorage ! (baseUrl, groupId, artifactId, versionId)
case x =>
// some unknown form
}
case Failure(exception) =>
// report scan error
}
val url = prepareUrl(elements)
pipeline(Get(url)) onComplete processResponse(elements)
}
This is rather crude version, but it demonstrates what we are doing quite clearly. The elements are the path elements, for example List(com, typesafe, akka, akka-actor, 2.2.0, akka-actor_2.2.0.jar)
, to turn them into Maven-style artifact, we simply ignore the last element (the jar), reverse and pattern match.
val (rawVersion::rawArtifactId::rawGroupId) = elements.reverse
And thus, rawVersion
is "2.2.0"
; rawArtifactId
is "akka-actor"
, and rawGroupId
is List(akka, typesafe, com)
. From this, we can easily construct the artifact. (For now, it is a tuple containing its elements.)
Implementing Unmarshaller[Elements]
To complete our actor, we must implement the instance of the Unmarshaller
typeclass for the type Elements
. In Scala-speak this means completing the private implicit object StringUnmarshaller
. I shall be lazy and abuse Scala's regular expressions to pick out all targets of a
HTML elements.
private implicit object StringUnmarshaller extends Unmarshaller[Elements] {
val AHrefRegex = """<a href="([^"]*)">[^<]*</a>""".r
def apply(entity: HttpEntity): Deserialized[Elements] = {
val body = entity.asString
val matches = AHrefRegex.findAllMatchIn(body)
val hrefs = matches.map(_.group(1)).toList
Right(hrefs)
}
}
Testing
It is tempting to just take some live repository and start firing requests at it. Apart from being rude (how would you feel if I started hitting your repository with hundreds of requests?), we would not be able to make any sensible assertions. It turns out that we'll have to implement our own repository server for the tests alone. The good news is that it won't be that hard.
Let's start with the simplified version of the spec, though. (I leave the complete implementation, which will assert that the discovered artifacts match the served ones as exercise for the readers. That's how kind I am!)
class PlainRepositoryScannerSpec extends TestKit(ActorSystem()) with SpecificationLike {
sequential
import PlainRepositoryScanner._
"Plain repository" >> {
val count = 5000
val port = 12345
val scanner = system.actorOf(
Props(new PlainRepositoryScanner(s"http://localhost:$port/", testActor)))
"scan" in {
val repository = Repository(Artifacts.generateArtifacts(3, count), port)
scanner ! Scan
receiveN(5000, FiniteDuration(30, TimeUnit.SECONDS))
repository.stop()
success
}
}
}
The test is pretty straight-forward. We construct our PlainRepositoryScanner
, giving it a reference to our test-only repository server and a reference to the actor that will receive the discovered artifacts. Because we will want to examine the received messages, we can pass in the testActor
. In the body of the example, we start the test-only repository, which serves count
number of artifacts in a hierarchy three levels deep, bound to TCP port port
. Then we send the Scan
message to our scanner
and give it up to 30 seconds to discover all count
artifacts.
And that's all there is to it!
Test repository
If you are desperate to find out how I've implemented the Repository
, I give you its full source here:
sealed trait Dependency {
def groupId: GroupId
def artifactId: ArtefactId
}
sealed trait VersionedDependency extends Dependency {
def version: Version
}
class Repository private(system: ActorSystem, port: Int, artifacts: List[VersionedDependency],
elementsToBody: List[String] => String) {
val blackHoleActor = system.actorOf(Props(new Actor {
def receive: Receive = Actor.emptyBehavior
}))
private class Service extends Actor {
def vdToUris(vd: VersionedDependency): List[String] = {
val marker = '\ufffe'
val path = vd.groupId.replace('.', marker) + marker + vd.artifactId + marker + vd.version.version
val jar = vd.artifactId + "_" + vd.version.version + ".jar"
path.split(marker).map(_ + "/").toList :+ jar
}
val paths = artifacts.map(vdToUris)
def receive: Receive = {
case _: Http.Connected =>
sender ! Http.Register(self)
case HttpRequest(HttpMethods.GET, Uri.Path(requestPath), _, _, _) =>
val requestPathSegments = if (requestPath == "/") Nil else requestPath.split("/").toList.map(_ + "/").tail
val elements =
paths.filter(_.startsWith(requestPathSegments)).
flatMap(_.slice(requestPathSegments.length, requestPathSegments.length + 1)).
distinct
val body = elementsToBody(elements)
sender ! HttpResponse(entity = HttpEntity(body))
case _ =>
}
}
private val service =
system.actorOf(Props(new Service).withRouter(RoundRobinRouter(nrOfInstances = 50)))
private val io = IO(Http)(system)
io.tell(Http.Bind(service, "localhost", port = port), blackHoleActor)
def stop(): Unit = {
io.tell(Http.Unbind, blackHoleActor)
system.stop(service)
system.stop(io)
}
}
object Repository {
private def trivialElementsToBody(elements: List[String]): String = {
val builder = new mutable.StringBuilder()
elements.foreach(element =>
builder ++= s"""<a href="$element">$element</a>\n"""
)
builder.toString()
}
def apply(artifacts: List[VersionedDependency], port: Int)(implicit system: ActorSystem): Repository = {
new Repository(system, port, artifacts, trivialElementsToBody)
}
}