The Artima Developer Community

Chapter 30 of Programming in Scala, First Edition
Actors and Concurrency
by Martin Odersky, Lex Spoon, and Bill Venners
December 10, 2008

Sometimes it helps in designing a program to specify that things happen independently, in parallel, concurrently. Java includes support for concurrency, and although this support is sufficient, it turns out to be quite difficult to get right in practice as programs get larger and more complex. Scala augments Java's native support by adding actors. Actors provide a concurrency model that is easier to work with and can, therefore, help you avoid many of the difficulties of using Java's native concurrency model. This chapter will show you the basics of how to use Scala's actors library and provide an extended example that transforms the single-threaded circuit simulation code of Chapter 18 into a multi-threaded version.

30.1 Trouble in paradise [link]

The Java platform comes with a built-in threading model based on shared data and locks. Each object is associated with a logical monitor, which can be used to control multi-threaded access to data. To use this model, you decide what data will be shared by multiple threads and mark as "synchronized" sections of the code that access, or control access to, the shared data. The Java runtime employs a locking mechanism to ensure that only one thread at a time enters synchronized sections guarded by the same lock, thereby enabling you to orchestrate multi-threaded access to the shared data.

Unfortunately, programmers have found it very difficult to reliably build robust multi-threaded applications using the shared data and locks model, especially as applications grow in size and complexity. The problem is that at each point in the program, you must reason about what data you are modifying or accessing that might be modified or accessed by other threads, and what locks are being held. At each method call, you must reason about what locks it will try to hold, and convince yourself that it will not deadlock while trying to obtain them. Compounding the problem, the locks you reason about are not fixed at compile time, because the program is free to create new locks at run time as it progresses.

Making things worse, testing is not reliable with multi-threaded code. Since threads are non-deterministic, you might successfully test a program one thousand times, yet still the program could go wrong the first time it runs on a customer's machine. With shared data and locks, you must get the program correct through reason alone.

Moreover, you can't solve the problem by over-synchronizing either. It can be just as problematic to synchronize everything as it is to synchronize nothing. The problem is that new lock operations remove possibilities for race conditions, but simultaneously add possibilities for deadlocks. A correct lock-using program must have neither race conditions nor deadlocks, so you cannot play it safe by overdoing it in either direction.

Java 5 introduced java.util.concurrent, a library of concurrency utilities that provides higher level abstractions for concurrent programming. Using the concurrency utilities makes multi-threaded programming far less error prone than rolling your own abstractions with Java's low-level synchronization primitives. Nevertheless, the concurrent utilities are also based on the shared data and locks model, and as a result do not solve the fundamental difficulties of using that model.

Scala's actors library does address the fundamental problem by providing an alternative, share-nothing, message-passing model that programmers tend to find much easier to reason about. Actors are a good first tool of choice when designing concurrent software, because they can help you avoid the deadlocks and race conditions that are easy to fall into when using the shared data and locks model.

30.2 Actors and message passing [link]

An actor is a thread-like entity that has a mailbox for receiving messages. To implement an actor, you subclass scala.actors.Actor and implement the act method. An example is shown in Listing 30.1. This actor doesn't do anything with its mailbox. It just prints a message five times and quits.

    import scala.actors._
  
  object SillyActor extends Actor {     def act() {        for (i <- 1 to 5) {         println("I'm acting!")         Thread.sleep(1000)       }     }   }
Listing 30.1 - A simple actor.

You start an actor by invoking its start method, similar to the way you start a Java thread:

  scala> SillyActor.start()
  I'm acting!
  res4: scala.actors.Actor = SillyActor$@1945696
  
scala> I'm acting! I'm acting! I'm acting! I'm acting!
Notice that the "I'm acting!" output is interleaved with the Scala shell's output. This interleaving is due to the SillyActor actor running independently from the thread running the shell. Actors run independently from each other, too. For example, given this second actor:
  import scala.actors._
  
object SeriousActor extends Actor {   def act() {      for (i <- 1 to 5) {       println("To be or not to be.")       Thread.sleep(1000)     }   } }

You could run two actors at the same time, like this:

  scala> SillyActor.start(); SeriousActor.start()
  res3: scala.actors.Actor = seriousActor$@1689405
  
scala> To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting!
You can also create an actor using a utility method named actor in object scala.actors.Actor:
  scala> import scala.actors.Actor._
  
scala> val seriousActor2 = actor {           for (i <- 1 to 5)             println("That is the question.")           Thread.sleep(1000)        }
scala> That is the question. That is the question. That is the question. That is the question. That is the question.
The val definition above creates an actor that executes the actions defined in the block following the actor method. The actor starts immediately when it is defined. There is no need to call a separate start method.

All well and good. You can create actors and they run independently. How do they work together, though? How do they communicate without using shared memory and locks? Actors communicate by sending each other messages. You send a message by using the ! method, like this:

  scala> SillyActor ! "hi there"
Nothing happens in this case, because SillyActor is too busy acting to process its messages, and so the "hi there" message sits in its mailbox unread. Listing 30.2 shows a new, more sociable, actor that waits for a message in its mailbox and prints out whatever it receives. It receives a message by calling receive, passing in a partial function.[1]

    val echoActor = actor {
      while (true) {
        receive {
          case msg =>
            println("received message: "+ msg)
        }
      }
    }
Listing 30.2 - An actor that calls receive.

When an actor sends a message, it does not block, and when an actor receives a message, it is not interrupted. The sent message waits in the receiving actor's mailbox until the actor calls receive. You can see this behavior illustrated here:

  scala> echoActor ! "hi there"
  received message: hi there
  
scala> echoActor ! 15
scala> received message: 15

As discussed in Section 15.7, a partial function (an instance of trait PartialFunction) is not a full function—i.e., it might not be defined over all input values. In addition to an apply method that takes one argument, a partial function offers an isDefinedAt method, which also takes one argument. The isDefinedAt method will return true if the partial function can "handle" the passed value. Such values are safe to pass to apply. If you pass a value to apply for which isDefinedAt would return false, however, apply will throw an exception.

An actor will only process messages matching one of the cases in the partial function passed to receive. For each message in the mailbox, receive will first invoke isDefinedAt on the passed partial function to determine whether it has a case that will match and handle the message. The receive method will choose the first message in the mailbox for which isDefinedAt returns true, and pass that message to the partial function's apply method. The partial function's apply method will handle the message. For example, echoActor's apply method will print "received message: " followed by the message object's toString result. If the mailbox contains no message for which isDefinedAt returns true, the actor on which receive was invoked will block until a matching message arrives.

For example, here is an actor that handles only messages of type Int:

  scala> val intActor = actor {
           receive {
             case x: Int => // I only want Ints
               println("Got an Int: "+ x)
           }
         }
  intActor: scala.actors.Actor = 
    scala.actors.Actor$$anon$1@34ba6b
If you send a String or Double, for example, the intActor will silently ignore the message:
  scala> intActor ! "hello"
  scala> intActor ! Math.Pi
But if you pass an Int, you'll get a response printed out:
  scala> intActor ! 12
  Got an Int: 12

30.3 Treating native threads as actors [link]

The actor subsystem manages one or more native threads for its own use. So long as you work with an explicit actor that you define, you do not need to think much about how they map to threads.

The other direction is also supported by the subsystem: every native thread is also usable as an actor. However, you cannot use Thread.current directly, because it does not have the necessary methods. Instead, you should use Actor.self if you want to view the current thread as an actor.

This facility is especially useful for debugging actors from the interactive shell. Here's an example:

  scala> import scala.actors.Actor._
  import scala.actors.Actor._
  
scala> self ! "hello"
scala> self.receive { case x => x } res6: Any = hello
The receive method returns the value computed by the partial function passed to it. In this case, the partial function returns the message itself, and so the received message ends up being printed out by the interpreter shell.

If you use this technique, it is better to use a variant of receive called receiveWithin. You can then specify a timeout in milliseconds. If you use receive in the interpreter shell, then the receive will block the shell until a message arrives. In the case of self.receive, this could mean waiting forever! Instead, use receiveWithin with some timeout value:

  scala> self.receiveWithin(1000) { case x => x } // wait a sec!
  res7: Any = TIMEOUT

30.4 Better performance through thread reuse [link]

Actors are implemented on top of normal Java threads. As described so far, in fact, every actor must be given its own thread, so that all the act methods get their turn.

Unfortunately, despite their light-sounding name, threads are not all that cheap in Java. Threads consume enough memory that typical Java virtual machines, which can host millions of objects, can have only thousands of threads. Worse, switching threads often takes hundreds if not thousands of processor cycles. If you want your program be as efficient as possible, then it is important to be sparing with thread creation and switching.

To help you conserve threads, Scala provides an alternative to the usual receive method called react. Like receive, react takes a partial function. Unlike receive, however, react does not return after it finds and processes a message. Its result type is Nothing. It evaluates the message handler and that's it.[2]

Because the react method does not need to return, the implementation does not need to preserve the call stack of the current thread. Instead, the library can reuse the current thread for the next actor that wakes up. This approach is so effective that if every actor in a program uses react instead of receive, only a single thread is necessary in principle to host all of the program's actors (to be sure, if your computer has several processor cores, the actors subsystem will use enough threads to utilize all cores when it can).


In practice, programs will need at least a few receive's, but you should try to use react whenever possible so as to conserve threads.

Because react does not return, the message handler you pass it must now both process that message and arrange to do all of the actor's remaining work. A common way to do this is to have a top-level work method—such as act itself—that the message handler calls when it finishes. Listing 30.3 shows an example that uses this approach.

    object NameResolver extends Actor {
      import java.net.{InetAddressUnknownHostException}
  
    def act() {        react {         case (name: String, actor: Actor) =>           actor ! getIp(name)           act()         case "EXIT" =>           println("Name resolver exiting.")           // quit         case msg =>           println("Unhandled message: "+ msg)           act()       }     }
    def getIp(name: String): Option[InetAddress] = {       try {         Some(InetAddress.getByName(name))       } catch {         case _:UnknownHostException => None       }     }   }
Listing 30.3 - An actor that calls react.

The actor shown in Listing 30.3 waits for strings that are host names, and if there is one, returns an IP address for that host name. Here's an example:

  scala> NameResolver.start()
  res0: scala.actors.Actor = NameResolver$@90d6c5
  
scala> NameResolver ! ("www.scala-lang.org", self)
scala> self.receiveWithin(0) { case x => x } res2: Any = Some(www.scala-lang.org/128.178.154.102)
scala> NameResolver ! ("wwwwww.scala-lang.org", self)
scala> self.receiveWithin(0) { case x => x } res4: Any = None
Writing an actor to use react instead of receive is challenging, but pays off in performance. Because react does not return, the calling actor's call stack can be discarded, freeing up the thread's resources for a different actor. At the extreme, if all of the actors of a program use react, then they can be implemented on a single thread.

This coding pattern is so common with event-based actors, there is special support in the library for it. The Actor.loop function executes a block of code repeatedly, even if the code calls react. NameResolver's act method can be rewritten to use loop as shown in Listing 30.4. The one difference in behavior between this act method and that of Listing 30.3 is that this one does not handle "EXIT" by quitting. Instead, this actor will loop and react to messages forever.

    def act() { 
      loop {
        react {
          case (name: String, actor: Actor) =>
            actor ! getIp(name)
          case msg =>
            println("Unhandled message: " + msg)
        }
      }
    }
Listing 30.4 - An actor's act method that uses loop.

How react works

A return type of Nothing indicates a function will never return normally, but instead will always complete abruptly with an exception. And indeed, this is true of react. The actual implementation of react is not as simple as the following description, and subject to change, but conceptually you can think of react as working like this:

When you call start on an actor, the start method will in some way arrange things such that some thread will eventually call act on that actor. If that act method invokes react, the react method will look in the actor's mailbox for a message the passed partial function can handle. (It does this the same way as receive, by passing candidate messages to the partial function's isDefinedAt method.) If it finds a message that can be handled, react will schedule the handling of that message for later execution and throw an exception. If it doesn't find one, it will place the actor in "cold storage," to be resurrected if and when it gets more messages in its mailbox, and throw an exception. In either case, react will complete abruptly with this exception, and so will act. The thread that invoked act will catch the exception, forget about the actor, and move on to other duties.

This is why if you want react to handle more than the first message, you'll need to call act again from inside your partial function, or use some other means to get react invoked again.

30.5 Good actors style [link]

At this point you have seen everything you need to write your own actors. Simply using these methods takes you only so far, however. The point of them is that they support an actors style of concurrent programming. To the extent you can write in this style, your code will be easier to debug and will have fewer deadlocks and race conditions. This section provides some guidelines for programming in an actors style.

Actors should not block

A well written actor does not block while processing a message. The problem is that while the actor blocks, some other actor might make a request on it that it could handle. If the actor is blocked on the first request, it will not even notice the second request. In the worst case, a deadlock can even result, with multiple actors blocked as they each wait for some other blocked actor to respond.

Instead of blocking, the actor should arrange for some message to arrive designating that action is ready to be taken. Often this rearrangement will require the help of other actors. For example, instead of calling Thread.sleep directly and blocking the current actor, you could create a helper actor that sleeps and then sends a message back when enough time has elapsed:

  actor {
    Thread.sleep(time)
    mainActor ! "WAKEUP"
This helper actor does indeed block, but since it will never receive a message, it is OK in this case. The main actor remains available to answer new requests. The emoteLater method, shown in Listing 30.5, demonstrates the use of this idiom. It creates a new actor that will do the sleep so that the main actor does not block. To ensure that it sends the "Emote" message to the correct actor, it is careful to evaluate self in the scope of the main actor instead of the scope of the helper actor.

    val sillyActor2 = actor {
      def emoteLater() {
        val mainActor = self
        actor {
          Thread.sleep(1000)
          mainActor ! "Emote"
        }
      }
  
    var emoted = 0     emoteLater()
    loop {       react {         case "Emote" =>           println("I'm acting!")           emoted += 1           if (emoted < 5)             emoteLater()         case msg =>           println("Received: "+ msg)       }     }   }
Listing 30.5 - An actor that uses a helper actor to avoid blocking itself.

Because this actor does not block in sleep—its helper actor does—it can continue to do other work while waiting for its next time to emote. Unlike the earlier silly actor, this one will continue to print out messages while it waits for its next input:

  scala> sillyActor2 ! "hi there"
  scala> Received: hi there
  I'm acting!
  I'm acting!
  I'm acting!

Communicate with actors only via messages

The key way the actors model addresses the difficulties of the shared data and locks model is by providing a safe space—the actor's act method—where you can think sequentially. Put another way, actors allow you to write a multi-threaded program as a bunch of independent single-threaded programs that communicate with each other via asynchronous messaging. This simplification works, however, only so long as messages are the only way you let your actors communicate.[3]

For example, a GoodActor could include a reference to itself in a message to a BadActor, to identify itself as the source of that message. If BadActor invokes some arbitrary method on GoodActor instead of sending it a message with `!', however, problems may ensue. The invoked method might read private instance data in GoodActor, which may have been written by a different thread. As a result, you would need to ensure that both the BadActor thread's reading of the instance data and the GoodActor thread's writing of it are synchronized on the same lock. The GoodActor's private instance data has become shared data that must be guarded by a lock. As soon as you go around the message passing scheme between actors, therefore, you drop back down into the shared data and locks model, with all the difficulties you were trying to avoid in the first place by using the actors model.

On the other hand, this does not mean that you should never go around message passing. Although shared data and locks is very difficult to get right, it is not impossible. One difference between Scala's approach to actors and that of Erlang, in fact, is that Scala gives you the option to combine the actors and shared data and locks models in the same program.

As an example, imagine you wanted multiple actors to share a common mutable map. Since the map is mutable, the pure actors approach would be to create an actor that "owns" the mutable map and define a set of messages that allows other actors to access it. You could define a message for putting a key-value pair into the shared map, getting a value given a key, and so on, for all the operations you need to do on the map. In addition, you'd need to define messages for sending asynchronous responses to actors that made queries of the map. Another option, however, is to pass a thread-safe map, such as ConcurrentHashMap from the Java Concurrency Utilities, in a message to multiple actors, and let those actors use that map directly.

Although it would be far easier and safer to implement a shared map via actors than to implement something like ConcurrentHashMap yourself, since ConcurrentHashMap already exists, you may judge it easier and as low risk to use that than to implement your own shared map with an actor. This would also mean that your responses from the shared map could be synchronous, whereas with actors they would need to be asynchronous. Scala's actors library gives you the choice.

If you're considering shared data and locks

When considering whether to combine the actors model with the shared data and locks model, it is helpful to recall the words of Harry Callahan, played by Clint Eastwood in the 1971 movie Dirty Harry:

I know what you're thinking. "Did he fire six shots or only five?" Well, to tell you the truth, in all this excitement I kind of lost track myself. But being as this is a .44 Magnum, the most powerful handgun in the world, and would blow your head clean off, you've got to ask yourself one question: Do I feel lucky? Well, do ya, punk?

Prefer immutable messages

Because Scala's actors model provides what amounts to a single-threaded environment inside each actor's act method, you need not worry about whether the objects you use in the implementation of this method are thread-safe. You can use unsynchronized, mutable objects to your hearts content in an act method, for example, because each act method is effectively confined to one thread.[4] This is why the actors model is called a share-nothing model—the data is confined to one thread rather than being shared by many.

There is one exception to the share-nothing rule, however: the data inside objects used to send messages between actors is "shared" by multiple actors. As a result, you do have to worry about whether message objects are thread-safe. And in general, they should be.

The best way to ensure that message objects are thread-safe is to only use immutable objects for messages. Instances of any class that has only val fields, which themselves refer only to immutable objects, are immutable. An easy way to define such message classes, of course, is as case classes. So long as you don't explicitly add var fields to a case class and ensure the val fields are all immutable types, your case class will by definition be immutable. It will also be convenient for pattern matching in the partial functions passed to react or receive. You can also use as messages instances of regular (non-case) immutable classes that you define. Or you can use instances of the many immutable classes provided in the Scala API, such as tuples, strings, lists, immutable sets and maps, and so on.

Now, if an actor sends a mutable, unsynchronized object as a message, and never reads or writes that object thereafter, it would work, but it's just asking for trouble. A future maintainer may not realize the object is shared and write to it, thereby creating a hard to find concurrency bug.

In general, it is best to arrange your data such that every unsynchronized, mutable object is "owned," and therefore accessed by, only one actor. You can arrange for objects to be transferred from one actor to another if you like, but you need to make sure that at any given time, it is clear which actor owns the object and is allowed to access it. In other words, when you design an actors-based system, you need to decide which parts of mutable memory are assigned to which actor. All other actors that access a mutable data structure must send messages to the data structure's owner and wait for a message to come back with a reply.

If you do find you have a mutable object you want to continue using as well as send in a message to another actor, you should make and send a copy of it instead. While you're at it, you may want to make it immutable. For example, because arrays are mutable and unsynchronized, any array you use should be accessed by one actor at a time. If you want to continue using an array as well as send it to another actor, you should send a copy. For example, if the array itself holds only immutable objects, you can make a copy with arr.clone. But you should also consider using arr.toList, and send the resulting immutable list instead.

Immutable objects are convenient in many cases, but they really shine for parallel systems, because they are the easiest, lowest risk way to design thread-safe objects. When you design a program that might involve parallelism in the future, whether using actors or not, you should try especially hard to make data structures immutable.

Make messages self-contained

When you return a value from a method, the caller is in a good position to remember what it was doing before it called the method. It can take the response value and then continue whatever it was doing.

With actors, things are trickier. When one actor makes a request of another, the response might not come for a long time. The calling actor should not block, but should continue to do any other work it can while it waits for the response. A difficulty, then, is interpreting the response when it finally does come back. Can the actor remember what it was doing when it made the request?

One way to simplify the logic of an actors program is to include redundant information in the messages. If the request is an immutable object, you can even cheaply include a reference to the request in the return value! For example, the IP-lookup actor would be better if it returned the host name in addition to the IP address found for it. It would make this actor slightly longer, but it should simplify the logic of any actor making requests on it:

  def act() { 
    loop {
      react {
        case (name: String, actor: Actor) =>
          actor ! (name, getIp(name))
      }
    }
  }
Another way to increase redundancy in the messages is to make a case class for each kind of message. While such a wrapper is not strictly necessary in many cases, it makes an actors program much easier to understand. Imagine a programmer looking at a send of a string, for example:
  lookerUpper ! ("www.scala-lang.org", self)
It can be difficult to figure out which actors in the code might respond. It is much easier if the code looks like this:
  case class LookupIP(hostname: String, requester: Actor)
  
lookerUpper ! LookupIP("www.scala-lang.org", self)
Now, the programmer can search for LookupIP in the source code, probably finding very few possible responders. Listing 30.6 shows an updated name-resolving actor that uses case classes instead of tuples for its messages.

    import scala.actors.Actor._
    import java.net.{InetAddressUnknownHostException}
  
  case class LookupIP(name: String, respondTo: Actor)   case class LookupResult(     name: String,      address: Option[InetAddress]   )
  object NameResolver2 extends Actor {
    def act() {        loop {         react {           case LookupIP(name, actor) =>             actor ! LookupResult(name, getIp(name))         }       }     }
    def getIp(name: String): Option[InetAddress] = {       // As before (in Listing 30.3)     }   }
Listing 30.6 - An actor that uses case classes for messages.

30.6 A longer example: Parallel discrete event simulation [link]

As a longer example, suppose you wanted to parallelize the discrete event simulation of Chapter 18. Each participant in the simulation could run as its own actor, thus allowing you to speed up a simulation by using more processors. This section will walk you through the process, using code based on a parallel circuit simulator developed by Philipp Haller.

Overall Design

Most of the design from Chapter 18 works fine for both sequential and parallel discrete event simulation. There are events, and they happen at designated times, processing an event can cause new events to be scheduled, and so forth. Likewise, a circuit simulation can be implemented as a discrete event simulation by making gates and wires participants in the simulation, and changes in the wires the events of the simulation. The one thing that would be nice to change would be to run the events in parallel. How can the design be rearranged to make this happen?

The key idea is to make each simulated object an actor. Each event can then be processed by the actor where most of that event's state lies. For circuit simulation, the update of a gate's output can be processed by the actor corresponding to that gate. With this arrangement, events will naturally be handled in parallel.

In code, it is likely that there will be some common behavior between different simulated objects. It makes sense, then, to define a trait Simulant that can be mixed into to any class to make it a simulated object. Wires, gates, and other simulated objects can mix in this trait.

  trait Simulant extends Actor
  class Wire extends Simulant

So far so good, but there are a few design issues to work out, several of which do not have a single, obviously best answer. For this chapter, we present a reasonable choice for each design issue that keeps the code concise. There are other solutions possible, though, and trying them out would make for good practice for anyone wanting experience programming with actors.

The first design issue is to figure out how to make the simulation participants stay synchronized with the simulated time. That is, participant A should not race ahead and process an event at time tick 100 until all other actors have finished with time tick 99. To see why this is essential, imagine for a moment that simulant A is working at time 90 while simulant B is working at time 100. It might be that participant A is about to send a message that changes B's state at time 91. B will not learn this until too late, because it has already processed times 92 to 99. To avoid this problem, the design approach used in this chapter is that no simulant should process events for time n until all other simulants are finished with time n-1.

That decision raises a new question, though: how do simulants know when it's safe to move forward? A straightforward approach is to have a "clock" actor that keeps track of the current time and tells the simulation participants when it is time to move forward. To keep the clock from moving forward before all simulants are ready, the clock can ping actors at carefully chosen times to make sure they have received and processed all messages for the current time tick. There will be Ping messages that the clock sends the simulants, and Pong messages that the simulants send back when they are ready for the clock to move forward.

  case class Ping(time: Int)
  case class Pong(time: Int, from: Actor)

Note that these messages could be defined as having no fields. However, the time and from fields add a little bit of redundancy to the system. The time field holds the time of a ping, and it can be used to connect a Pong with its associated Ping. The from field is the sender of a Pong. The sender of a Ping is always the clock, so it does not have a from field. All of this information is unnecessary if the program is behaving perfectly, but it can simplify the logic in some places, and it can greatly help in debugging if the program has any errors.

One question that arises is how a simulant knows it has finished with the current time tick. Simulants should not respond to a Ping until they have finished all the work for that tick, but how do they know? Maybe another actor has made a request to it that has not yet arrived. Maybe a message one actor has sent another has not been processed yet.

It simplifies the answer to this question to add two constraints. First, assume that simulants never send each other messages directly, but instead only schedule events on each other. Second, they never post events for the current time tick, but only for times at least one tick into the future. These two constraints are significant, but they appear tolerable for a typical simulation. After all, there is normally some non-zero propagation delay whenever two components of a system interact with each other. Further, at worst, time ticks can be made to correspond to shorter time intervals, and information that will be needed in the future can be sent ahead of time.

Other arrangements are possible. Simulants could be allowed to send messages directly to each other. However, if they do so, then there would need to be a more sophisticated mechanism for deciding when it is safe for an actor to send back a Pong. Each simulant should delay responding to a Ping until any other simulants it has made requests to are finished processing those requests. To ensure this property, you would need the simulants to pass each other some extra information. For now, assume that simulants don't communicate with each other except via the simulation's agenda.

Given that decision, there may as well be a single agenda of work items, and that agenda may as well be held by the clock actor. That way, the clock can wait to send out pings until it has sent out requests for all work items at the current time. Actors then know that whenever they receive a Ping, they have already received from the clock all work items that need to happen at the current time tick. It is thus safe when an actor receives a Ping to immediately send back a Pong, because no more work will be arriving during the current time tick. Taking this approach, a Clock has the following state:

  class Clock extends Actor {
    private var running = false
    private var currentTime = 0
    private var agenda: List[WorkItem] = List()
  }

The final design issue to work out is how a simulation is set up to begin with. A natural approach is to create the simulation with the clock stopped, add all the simulants, connect them all together, and then start the clock. The subtlety is that you need to be absolutely sure that everything is connected before you start the clock running! Otherwise, some parts of the simulation will start running before they are fully formed.

How do you know when the simulation is fully assembled and ready to start? There are again multiple ways to approach this problem. The simple way adopted in this chapter is to avoid sending message to actors while setting the simulation up. That way, once the last method call returns, you know that the simulation is entirely constructed. The resulting coding pattern is that you use regular method calls to set the simulation up, and you use actor message sends while the simulation is running.

Given the preceding decisions, the rest of the design is straightforward. A WorkItem can be defined much like in Chapter 18, in that it holds a time and an action. For the parallel simulation, however, the action itself has a different encoding. In Chapter 18, actions are represented as zero-argument functions. For parallel simulation, it is more natural to use a target actor and a message to be sent to that actor:

  case class WorkItem(time: Int, msg: Any, target: Actor)
Likewise, the afterDelay method for scheduling a new work item becomes an AfterDelay message that can be sent to the clock. Just as with the WorkItem class, the zero-argument action function is replaced by a message and a target actor:
  case class AfterDelay(delay: Int, msg: Any, target: Actor)
Finally, it will prove useful to have messages requesting the simulation to start and stop:
  case object Start
  case object Stop

That's it for the overall design. There is a Clock class holding a current time and an agenda, and a clock only advances the clock after it has pinged all of its simulants to be sure they are ready. There is a Simulant trait for simulation participants, and these communicate with their fellow simulants by sending work items to the clock to add to its agenda. The next section will take a look now at how to implement these core classes.

Implementing the simulation framework

There are two things that need implementing for the core framework: the Clock class and the Simulant trait. Consider the Clock class, first. The necessary state of a clock is as follows:

  class Clock extends Actor {
    private var running = false
    private var currentTime = 0
    private var agenda: List[WorkItem] = List()
    private var allSimulants: List[Actor] = List()
    private var busySimulants: Set[Actor] = Set.empty
A clock starts out with running set to false. Once the simulation is fully initialized, the clock will be sent the Start message and running will become true. This way, the simulation stays frozen until all of its pieces have been connected together as desired. It also means that, since all of the simulants are also frozen, it is safe to use regular method calls to set things up instead of needing to use actor message sends.

A clock may as well go ahead and start running as an actor once it is created. This is safe, because it will not actually do anything until it receives a Start message:

  start()

A clock also keeps track of the current time (currentTime), the list of participants managed by this clock (allSimulants), and the list of participants that are still working on the current time tick (busySimulants). A list is used to hold allSimulants, because it is only iterated through, but a set is used for busySimulants because items will be removed from it in an unpredictable order. Once the simulator starts running, it will only advance to a new time when busySimulants is empty, and whenever it advances the clock, it will set busySimulants to allSimulants.

To set up a simulation, there is going to be a need for a method to add new simulants to a clock. It may as well be added right now:

  def add(sim: Simulant) {
    allSimulants = sim :: allSimulants
  }

That's the state of a clock. Now look at its activity. Its main loop alternates between two responsibilities: advancing the clock, and responding to messages. Once the clock advances, it can only advance again when at least one message has been received, so it is safe to define the main loop as an alternation between these two activities:

  def act() {
    loop {
      if (running && busySimulants.isEmpty)
        advance()
  
    reactToOneMessage()   } }
The advancement of time has a few parts beyond simply incrementing the currentTime. First, if the agenda is empty, and the simulation is not just starting, then the simulation should exit. Second, assuming the agenda is non-empty, all work items for time currentTime should now take place. Third, all simulants should be put on the busySimulants list and sent Pings. The clock will not advance again until all Pings have been responded to:
  def advance() {
    if (agenda.isEmpty && currentTime > 0) {
      println("** Agenda empty.  Clock exiting at time "+
              currentTime+".")
      self ! Stop
      return
    }
  
  currentTime += 1   println("Advancing to time "+currentTime)
  processCurrentEvents()   for (sim <- allSimulants)     sim ! Ping(currentTime)
  busySimulants = Set.empty ++ allSimulants }
Processing the current events is simply a matter of processing all events at the top of the agenda whose time is currentTime:
  private def processCurrentEvents() {
    val todoNow = agenda.takeWhile(_.time <= currentTime)
  
  agenda = agenda.drop(todoNow.length)
  for (WorkItem(time, msg, target) <- todoNow) {     assert(time == currentTime)     target ! msg   } }
There are three steps in this method. First, the items that need to occur at the current time are selected using takeWhile and saved into the val todoNow. Second, those items are dropped from the agenda by using drop. Finally, the items to do now are looped through and sent the target message. The assert is included just to guarantee that the scheduler's logic is sound.

Given this ground work, handling the messages that a clock can receive is straightforward. An AfterDelay message causes a new item to be added to the work queue. A Pong causes a simulant to be removed from the list of busy simulants. Start causes the simulation to begin, and Stop causes the clock to stop:

  def reactToOneMessage() {
    react {
      case AfterDelay(delay, msg, target) =>
        val item = WorkItem(currentTime + delay, msg, target)
        agenda = insert(agenda, item)
  
    case Pong(time, sim) =>       assert(time == currentTime)       assert(busySimulants contains sim)       busySimulants -= sim
    case Start => running = true
    case Stop =>       for (sim <- allSimulants)         sim ! Stop       exit()   } }
The insert method, not shown, is exactly like that of Listing 18.8. It inserts its argument into the agenda while being careful to keep the agenda sorted.

That's the complete implementation of Clock. Now consider how to implement Simulant. Boiled down to its essence, a Simulant is any actor that understands and cooperates with the simulation messages Stop and Ping. Its act method can therefore be as simple as this:

  def act() {
    loop {
      react {
        case Stop => exit()
        case Ping(time) =>
          if (time == 1) simStarting()
          clock ! Pong(time, self)
        case msg => handleSimMessage(msg)
      }
    }
  }
Whenever a simulant receives Stop, it exits. If it receives a Ping, it responds with a Pong. If the Ping is for time 1, then simStarting is called before the Pong is sent back, allowing subclasses to define behavior that should happen when the simulation starts running. Any other message must be interpreted by subclasses, so it defers to an abstract handleSimMessage method.

There are two abstract members of a simulant: handleSimMessage and clock. A simulant must know its clock so that it can reply to Ping messages and schedule new work items. Putting it all together, the Simulant trait is as shown in Listing 30.7.

    trait Simulant extends Actor {
      val clock: Clock
      def handleSimMessage(msg: Any)
      def simStarting() { }
      def act() {
        loop {
          react {
            case Stop => exit()
            case Ping(time) =>
              if (time == 1) simStarting()
              clock ! Pong(time, self)
            case msg => handleSimMessage(msg)
          }
        }
      }
      start()
    }
Listing 30.7 - The Simulant trait.

Note that a simulant goes ahead and starts running the moment it is created. This is safe and convenient, because it will not actually do anything until its clock sends it a message, and that should not happen until the simulation starts and the clock receives a Start message.

That completes the framework for parallel event simulation. Like its sequential cousin in Chapter 18, it takes surprisingly little code.

Implementing a circuit simulation

Now that the simulation framework is complete, it's time to work on the implementation of circuits. A circuit has a number of wires and gates, which will be simulants, and a clock for managing the simulation. A wire holds a boolean signal—either high (true) or low (false). Gates are connected to a number of wires, some of which are inputs and others outputs. Gates compute a signal for their output wires based on the state of their input wires.

Since the wire, gates, etc., of a circuit are only used for that particular circuit, their classes can be defined as members of a Circuit class, just as with the currency objects of Section 20.9. The overall Circuit class will therefore have a number of members:

  class Circuit {
    val clock = new Clock
    // simulation messages
    // delay constants
    // Wire and Gate classes and methods
    // misc. utility methods
  }
Now look at each of these members, one group at a time. First, there are the simulation messages. Once the simulation starts running, wires and gates can only communicate via message sends, so they will need a message type for each kind of information they want to send each other. There are only two such kinds of information. Gates need to tell their output wires to change state, and wires need to inform the gates they are inputs to whenever their state changes:
  case class SetSignal(sig: Boolean)
  case class SignalChanged(wire: Wire, sig: Boolean)
Next, there are several delays that must be chosen. Any work item scheduled with the simulation framework—including propagation of a signal to or from a wire—must be scheduled at some time in the future. It is unclear what the precise delays should be, so those delays are worth putting into vals. This way, they can be easily adjusted in the future:
  val WireDelay = 1
  val InverterDelay = 2
  val OrGateDelay = 3
  val AndGateDelay = 3
At this point it is time to look at the Wire and Gate classes. Consider wires, first. A wire is a simulant that has a current signal state (high or low) and a list of gates that are observing that state. It mixes in the Simulant trait, so it also needs to specify a clock to use:
  class Wire(name: String, init: Booleanextends Simulant {
    def this(name: String) { this(name, false) }
    def this() { this("unnamed") }
  
  val clock = Circuit.this.clock   clock.add(this)
  private var sigVal = init   private var observers: List[Actor] = List()
The class also needs a handleSimMessage method to specify how it should respond to simulation messages. The only message a wire should receive is SetSignal, the message for changing a wire's signal. The response should be that if the signal is different from the current signal, the current state changes, and the new signal is propagated:
  def handleSimMessage(msg: Any) {
    msg match {
      case SetSignal(s) =>
        if (s != sigVal) {
          sigVal = s
          signalObservers()
        }
    }
  }
  
def signalObservers() {   for (obs <- observers)     clock ! AfterDelay(       WireDelay,       SignalChanged(this, sigVal),       obs) }
The above code shows how changes in a wire's signal are propagated to any gates watching it. It's also important to pass the initial state of a wire to any observing gates. This only needs to be done once, when the simulation starts up. After that, gates can simply store the result of the most recent SignalChanged they have received. Sending out the initial signal when the simulation starts is as simple as providing a simStarting() method:
  override def simStarting() { signalObservers() }
There are now just a few more odds and ends about wires. Wires need a method for connecting new gates, and they could use a nice toString method:
  def addObserver(obs: Actor) {
    observers = obs :: observers
  }
  
override def toString = "Wire("+ name +")"
That is everything you need for wires. Now consider gates, the other major class of objects in a circuit. There are three fundamental gates that would be nice to define: And, Or, and Not. All of these share a lot of behavior, so it is worth defining an abstract Gate class to hold the commonality.

A difficulty in defining this Gate class is that some gates have two input wires (And, Or) while others have just one (Not). It would be possible to model this difference explicitly. However, it simplifies the code to think of all gates as having two inputs, where Not gates simply ignore their second input. The ignored second input can be set to some dummy wire that never changes state from false:

  private object DummyWire extends Wire("dummy")
Given this trick, the gate class will come together straightforwardly. It mixes in the Simulant trait, and its one constructor accepts two input wires and one output wire:
  abstract class Gate(in1: Wire, in2: Wire, out: Wire)
      extends Simulant {
There are two abstract members of Gate that specific subclasses will have to fill in. The most obvious is that different kinds of gates compute a different function of their inputs. Thus, there should be an abstract method for computing an output based on inputs:
  def computeOutput(s1: Boolean, s2: Boolean): Boolean
Second, different kinds of gates have different propagation delays. Thus, the delay of the gate should be an abstract val:
  val delay: Int
The delay could be a def, but making it a val encodes the fact that a particular gate's delay should never change.

Because Gate mixes in Simulant, it is required to specify which clock it is using. As with Wire, Gate should specify the clock of the enclosing Circuit. For convenience, the Gate can go ahead and add itself to the clock when it is constructed:

  val clock = Circuit.this.clock
  clock.add(this)
Similarly, it makes sense to go ahead and connect the gate to the two input wires, using regular method calls:
  in1.addObserver(this)
  in2.addObserver(this)
The only local state of a gate is the most recent signal on each of its input wires. This state needs to be stored, because wires only send a signal when the state changes. If one input wire changes, only that one wire's state will be sent to the gate, but the new output will need to be computed from both wires' states:
  var s1, s2 = false
Now look at how gates respond to simulation messages. There is only one message they need to handle, and that's the SignalChanged message indicating that one of the input wires has changed. When a SignalChanged arrives, two things need to be done. First, the local notion of the wire states need to be updated according to the change. Second, the new output needs to be computed and then sent out to the output wire with a SetSignal message:

  def handleSimMessage(msg: Any) {
    msg match {
      case SignalChanged(w, sig) =>
        if (w == in1)
          s1 = sig
        if (w == in2)
          s2 = sig
        clock ! AfterDelay(delay,
            SetSignal(computeOutput(s1, s2)),
            out)
    }
  }
Given this abstract Gate class, it is now easy to define specific kinds of gates. As with the sequential simulation in Chapter 18, the gates can be created as side effects of calling some utility methods. All the methods need to do is create a Gate and fill in the appropriate delay and output computation. Everything else is common to all gates and is handled in the Gate class:
  def orGate(in1: Wire, in2: Wire, output: Wire) = 
    new Gate(in1, in2, output) {
      val delay = OrGateDelay
      def computeOutput(s1: Boolean, s2: Boolean) = s1 || s2 
    }
  
def andGate(in1: Wire, in2: Wire, output: Wire) =    new Gate(in1, in2, output) {     val delay = AndGateDelay     def computeOutput(s1: Boolean, s2: Boolean) = s1 && s2    }
In the case of Not gates, a dummy wire will be specified as the second input. This is an implementation detail from the point of view of a caller creating a Not gate, so the inverter method only takes one input wire instead of two:
  def inverter(input: Wire, output: Wire) = 
    new Gate(input, DummyWire, output) {
      val delay = InverterDelay
      def computeOutput(s1: Boolean, ignored: Boolean) = !s1
    }
At this point the library can simulate circuits, but, as described in Chapter 18, it is useful to add a wire-probing utility so that you can watch the circuit evolve. Without such a utility, the simulation would have no way to know which wires are worth logging and which are more like implementation details.

Define a probe method that takes a Wire as an argument and then prints out a line of text whenever that wire's signal changes. The method can be implemented by simply making a new simulant that connects itself to a specified wire. This simulant can respond to SignalChanged messages by printing out the new signal:

  def probe(wire: Wire) = new Simulant {
    val clock = Circuit.this.clock
    clock.add(this)
    wire.addObserver(this)
    def handleSimMessage(msg: Any) {
      msg match {
        case SignalChanged(w, s) =>
           println("signal "+ w +" changed to "+ s)
      }
    }
  }

That is the bulk of the Circuit class. Callers should create an instance of Circuit, create a bunch of wires and gates, call probe on a few wires of interest, and then start the simulation running. The one piece missing is how the simulation is started, and that can be as simple as sending the clock a Start message:

  def start() { clock ! Start }

    trait Adders extends Circuit {
      def halfAdder(a: Wire, b: Wire, s: Wire, c: Wire) {
        val d, e = new Wire
        orGate(a, b, d)
        andGate(a, b, c)
        inverter(c, e)
        andGate(d, e, s)
      }
  
    def fullAdder(a: Wire, b: Wire, cin: Wire,         sum: Wire, cout: Wire) {
      val s, c1, c2 = new Wire       halfAdder(a, cin, s, c1)       halfAdder(b, s, sum, c2)       orGate(c1, c2, cout)     }   }
Listing 30.8 - Adder components.

More complicated circuit components can be built from methods just as it was explained previously in Chapter 18. For instance Listing 30.8 shows again the half adder and full adder components that were already introduced then. Their implementation stays is the same, but as a small variation they are now packaged in a trait, named Adders, whereas in Chapter 18 they were contained in an abstract class. Because the trait is marked as extending Circuit, it can directly access members of Circuit such as Wire and orGate. Using the trait then looks like this:

  val circuit = new Circuit with Adders
This circuit variable holds a circuit that has all of the methods of Circuit and all of the methods of Adders. Note that with this coding pattern, based on a trait instead of a class, you set the stage to provide multiple component sets. Users mix in whichever component sets they plan to use, like this:
  val circuit =
    new Circuit 
      with Adders 
      with Multiplexers
      with FlipFlops
      with MultiCoreProcessors

Trying it all out

That's the whole framework. It includes a simulation framework, a circuit simulation class, and a small library of standard adder components. Here is a simple demo object that uses it:

  object Demo {
    def main(args: Array[String]) {
      val circuit = new Circuit with Adders
      import circuit._
  
    val ain = new Wire("ain"true)     val bin = new Wire("bin"false)     val cin = new Wire("cin"true)     val sout = new Wire("sout")     val cout = new Wire("cout")
    probe(ain)     probe(bin)     probe(cin)     probe(sout)     probe(cout)
    fullAdder(ain, bin, cin, sout, cout)
    circuit.start()   } }
This example creates a circuit that includes the Adders trait. It immediately imports all of the circuit's members, thus allowing easy accesses to methods like probe and fullAdder. Without the import, it would be necessary to write circuit.probe(ain) instead of just probe(ain).

The example then creates five wires. Three will be used as inputs (ain, bin, and cin), and two will be used as outputs (sout, cout). The three input wires are given arbitrary initial signals of true, false, and true. These inputs correspond to adding 1 to 0, with a carry in of 1.

The probe method gets applied to all five externally visible wires, so any changes in their state can be observed as the simulation runs. Finally the wires are plugged into a full adder, and the simulation is started. The output of the simulation is as follows:

  Advancing to time 1
  Advancing to time 2
  signal Wire(cout) changed to false
  signal Wire(cin) changed to true
  signal Wire(ain) changed to true
  signal Wire(sout) changed to false
  signal Wire(bin) changed to false
  Advancing to time 3
  Advancing to time 4
  Advancing to time 5
  Advancing to time 6
  Advancing to time 7
  Advancing to time 8
  Advancing to time 9
  Advancing to time 10
  signal Wire(cout) changed to true
  Advancing to time 11
  Advancing to time 12
  Advancing to time 13
  Advancing to time 14
  Advancing to time 15
  Advancing to time 16
  Advancing to time 17
  Advancing to time 18
  signal Wire(sout) changed to true
  Advancing to time 19
  Advancing to time 20
  Advancing to time 21
  signal Wire(sout) changed to false
  ** Agenda empty.  Clock exiting at time 21.
As expected, with inputs of 1, 0, and 1 (true, false and true), the outputs are a carry of 1 and sum of 0 (cout is true, and sout is false).

30.7 Conclusion [link]

Concurrent programming gives you great power. It lets you simplify your code, and it lets you take advantage of multiple processors. It is therefore unfortunate that the most widely used concurrency primitives, threads, locks, and monitors, are such a minefield of deadlocks and race conditions.

The actors style provides a way out of the minefield, letting you write concurrent programs without having such a great risk of deadlocks and race conditions. This chapter has introduced several fundamental constructs for working with actors in Scala, including how to create actors, how to send and receive messages, and how to conserve threads with react, among other nuts and bolts. It then showed you how to use these constructs as part of a general actors style.


Footnotes for Chapter 30:

[1] As described in Section 15.7, a partial function literal is expressed as a series of match alternatives or "cases." It looks like a match expression without the match keyword.

[2] Behind the scenes, react will throw an exception after its done.

[3] Another benefit is that a message send ensures the message object is safely published to other threads, as described in Goetz, et. al., Java Concurrency in Practice, p. 49. goetz:java-con-in-prac

[4] When using react, different messages could potentially be handled by different threads, but if so they will be handled sequentially and with sufficient synchronization to allow you to program under the simplifying assumption that each act method is confined to a single thread.





Google
  Web Artima.com   
Copyright © 1996-2019 Artima, Inc. All Rights Reserved. - Privacy Policy - Terms of Use