Tuesday 31 January 2012

Data Flow Concurrency, STM and Message-Passing Concurrency - Alternate options to write concurrent program

Introduction

Last year Venkat Subramaniam wrote a book called Programming Concurrency in JVM. In this book he outlined the concurrency techniques using different JVM programming languages such as Java, Clojure, Scala, Groovy and JRuby.  In this book he talked about three concurrency style - synchronization model of JDK, Software Transactional Memory and Actor based concurrency.  An excellent book for someone who wants to learn the alternate options that are available in different JVM based programming languages. In this article I will write down Shared State Concurrency and problems associated with it and also briefly outline how these problems can be solved using alternative approaches like Dataflow concurrency, Software Transactional Memory and Message-passing concurrency. At first let me talk about Shared State concurrency and its problem.

Shared State Concurrency

One of the desired characteristic that we want to have in our program is the ability to deal with several independent activities, each of which executes at its own pace, which is called "concurrency". There should be no interference between the activities, unless we want them to communicate. How do we introduce concurrency in our program? It is simple. We can do this by creating threads. A thread is simply an executing program. A program can have more than one thread. All threads share the same underlying computer. They may access a set of shared passive objects. Threads coordinate with each other when accessing these shared objects. They do this by means of coarse-grained atomic actions such as locks, monitors, transactions etc. What I have written so far in this paragraph falls under Shared State Concurrency. A formal definition is given below:

"Shared State Concurrency is concurrency among two or more processes (here, a process is a flow of control), which have some shared state between them, which both processes can read to and write from." [1]

It adds explicit state in the form of cells, which are a kind of mutable variables.

Shared State Concurrency is hard, why?
  • Execution consists of multiple threads, all executing independently and all accessing shared cells.
  • Requires lots of synchronization primitives (locks, mutex, semaphores, etc) to protect mutable states
  • Much more difficult to model and prove a system is correct.
In Shared State Concurrency we need to protect mutable state with locks. Writing correct program using lock is not easy thing because of the below reasons:
  • enforcing lock ordering 
  • locks do not compose
  • taking too few locks or too many locks
  • taking the wrong locks
  • taking locks in the wrong order
  • error recovery and debuging is hard
As a Java programmer we start our concurrency journey using the concept outlined above. We use different concurrency primitives like lock, synchronized along with our business logic. Lot of concurrency related plumbing work which make our program difficult to understand. It is also difficult to write the unit test.  Java is an Object Oriented (OO) Programming language. OO typically unifies identity and state, i.e. an object (identity) is a pointer to the memory that contains the value of its state. Problem lies in the unification of identity and value, which are not same. In OO state is everywhere. There is no way to observe a stable state without blocking others from chaging it. We need to guard the state [2]. Immutable state is ok but mutable state is the problem. We need a better way of dealing mutable state. You can read more on this from http://clojure.org/state .

Alternate approaches

But if you want solve your concurrency problem using a JDK solution then you must read JAVA Concurrency in Practice by Brian Goetz. This book is the bible on how to deal with concurrency in Java and also try to use java.util.concurrent API.  But if you are open to use some other jvm based alternative concurrency solutions then you will be happy to know that there are options available like:
  • Dataflow Concurrency (Dataflow - Single Assignment variables)
  • Software Transactional Memory (Managed References)
  • Message-Passing Concurrency (Actors/Active Objects/Agents)
Dataflow Concurrency

"Operations (in Dataflow programs) consist of 'black boxes' with inputs and outputs, all of which are always explicitly defined. They run as soon as all of their inputs become valid, as opposed to when the program encounters them. Whereas a traditional program essentially consists of a series of statements saying "do this, now do this", a dataflow program is more like a series of workers on an assembly line, who will do their assigned task as soon as the materials arrive. This is why dataflow languages are inherently parallel; the operations have no hidden state to keep track of, and the operations are all 'ready' at the same time." [3]

The core idea:
  • Number of allowed variable assignment is limited to one
  • A variable (Dataflow variable) can only be assigned a value once in its lifetime, while the number of reads is unlimited
  • Reads preceding the write operations are postponed (blocked) until the value is set by a write (bind) action

    The beauty of Dataflow Concurrency is its determinism. This type of program always behave in a same way. If you execute the program once and it gives you 10 then it will always give you that value no matter how many times you run the same program. If it throws exception once then it will always throw that exception.   It is also called declarative or data driven concurrency model. In a declarative concurrent model, the nondeterminism is not visible to the programmer because of the two reasons:
    • Dataflow variables can be bound to only one value.
    • Any operation that needs the value of a variable has no choice but to wait until the variable is bound. If we allow operations that could choose whether to wait or not then the nondeterminism would become visible.[1]
    Three Operations:
    • Create a dataflow variable
                val x = new DataFlowVariable[Int]
    • Wait for the variable to be bound
                x()
    • Bind the variable
               x << 5

    Dataflow variables are monotonic. It means that they can be bound to just one value and once bound, the value does not change. On the other hand mutable variables are non-monotonic. They can be assigned any number of times to values that have no relation to each other. Threads that share a mutable variable cannot make any assumption about its content. At any time the content can be completely different from the previous content [1]. Dataflow variable does not suffer from this problem.

    Dataflow concurrency also does not introduce race condition. It works on-demand and lazy way. There are few limitations though:
    • It cannot have side effects like exceptions, IO (File, println, file socket), time etc.
    • Not general purpose. It is generally good for work flow related dependent processes like business processes.
    You can read more on this from  http://gpars.codehaus.org/Dataflow.

    Software Transactional Memory

    You might have heard about ACID properties of database management system. Using STM you get ACIDness without D. I mean you get atomicity, consistency and isolation in memory without durability. Since everything (begin-commit and rollback) happens in memory, it is extremely fast. A formal definition is given below:

    "software transactional memory (STM) is a concurrency control mechanism analogous to database transactions for controlling access to shared memory in concurrent computing." [4]

    STM actually turns Java heap into a transactional data set. A block of actions will be executed as a transaction. This block is atomic, isolated and consistent. Atomic means that every changes to this block within transaction occurs or none. Within this block a series of read and write can be executed and these reads and writes logically occur at a single instance of time. When we enter the block and start executing the code, other threads cannot see the changes made our threads until we exit. Our threads will also not see the changes made by other threads.  So completely isolated. And it is consistent, means each new value can be checked with a validator function before allowing the transaction to commit. Other threads can only see the committed changes. When a transaction has a conflict while running, it will automatically retried.

    Transactions can nest and compose:

    atomic {
        .........
        atomic {
            ......
        }
    }

    When you write atomic STM executes it automatically without deadlocks and races.

    You will find STM implemented in different programming languages like Scala, Java, Haskell, Clojure, Python, Common Lisp, Perl, C/C++. You will also get STM implementation in Akka and there is also ScalaSTM. You can get a full list in http://en.wikipedia.org/wiki/Software_transactional_Memory 

    Here I will show a simple example in Akka. More examples can be found http://akka.io/docs/akka/1.2/scala/stm.html . In Akka Ref is a transactional reference and can only be changed in a transaction, which is delimited by atomic. These updates to Ref are atomic and isolated.

    import akka.stm._

    val ref = Ref(0)

    atomic {
        ref.set(5)
    }

    // -> 0

    atomic {
       println(ref.get)
    }

    // -> 5

    Though STM allows us to write simple and beautiful code, it is not totally problem free. Jonas Boner of Akka (now TypeSafe) mentioned in his presentation http://www.slideshare.net/jboner/state-youre-doing-it-wrong-javaone-2009 - "high contention (many transaction collisions) can lead to: potential bad performance and too high latency, progress can not be guaranteed (e.g. live locking), Fairness is not mantained, implementation details hidden in black box".

    Message-Passing Concurrency

    In Message-Passing Concurrency two or more processes (here, process is a flow of control) communicate with each other by passing messages. There is no shared regions between these processes. All the computations are done in processes and only way to exchange data is through asynchronous message passing. 

    Message-Passing Concurrency have been popularized by Erlang language. Two other languages Oz and Occam also implemented this.
    Actor model concurrency implements Message-Passing Concurrency. Actor model enforces that there is nothing like shared state. Actor can run in different threads and the only way an actor can influence another actor is by sending him a message and have him update his state internally.It is not possible to go into it and temper his state. It provides really nice isolation of state as a very explicit state management. It removes all of the problems that we normally have with mutable shared state in a concurrent setting.

    This is how actor works in Akka:
    1. Each actor has a mail box. It can send and receive messages.
    2. There is a scheduler that can wake up an actor, resume it, have it being applied to the messages that it has in the mail box
    3. Since an actor has a mailbox, it can still receive messages that will be put in the mailbox while it is passive.
    4. An actor can execute X number of messages before it is suspended and another actor can use the same thread that the suspended actor was just using.
    Actors are extremely ligthweight. So actors only consumes memory, they do not sit on a thread. In thread based model, threads are extremely heavyweight. It is only possible to run X number of threads on the JVM while it is possible to run millions of actors easily since actor is only constrained by memory. Actors never do anything unless being told.

    Here is the preferred way sending a message to a actor:

    actor ! "Hello"

    Example in Akka:

    class MyActor extends Actor {
      def receive = {
        case "test" => println("received test")
        case _ => println("received unknown message")
      }
    }

    val myActor = Actor.actorOf[MyActor]
    myActor.start()

    myActor ! "test"


    Output: test

    In the above example we have created an actor called MyActor by extending Actor class. We have also implemented the receive method. We have provided a pattern match for all messages that this actor can accept and since we also want our actor to handle unknown messages, there is a default case for this in the example above. We send a message "test" to our actor in a fire-and-forget fashion.


    Using Actor model it is possible to avoid
    1. Race conditions
    2. Deadlocks
    3. Starvation
    4. Live locks

    References:
    1. Concepts, Techniques, and Models of Computer Programming by Peter Van Roy and Seif Harid, 2003
    2.  http://clojure.org/state
    3.  http://www.gpars.org/guide/guide/7.%20Dataflow%20Concurrency.html
    4.  http://en.wikipedia.org/wiki/Software_transactional_memory
    5.  http://www.slideshare.net/jboner/state-youre-doing-it-wrong-javaone-2009
    6.  http://akka.io