Tuesday, September 7, 2010

Scala Generators

An implementation of generators on top of Scala's delimited continuations.

In my previous post I described a library that supports coroutines on top of Scala's delimited continuations capability. In this post I show how you can easily create generators on top of that coroutine library (net.jimmc.scoroutine). This is a second example of the kind of interesting construct that can be built on top of Scala's delimited continuations.

As with my previous post on coroutines, you don't need to understand reset and shift if you just want to use the Generator class shown here to write and use your own generators. But, as with coroutines, you should have a basic understanding of CPS code and its restrictions when writing generators.

Contents

Generators

A generator is a routine that produces values like an iterator but is structured as a function. The generated values are returned by calling a special function, typically called yield, with each value that is generated. In our case, since yield is a reserved word in Scala, we will use yld instead.

Generators and coroutines are closely related. Depending on the implementation, generators and coroutines may be almost the same thing or fairly different, but in any case, if you have either one, you can implement the other one on top of it. Since we already have coroutines in the net.jimmc.scoroutine library described in my previous post, we will implement generators on top of coroutines using that library.

You can think of this approach as using the Producer-Consumer pattern, where we set up a generator as the producer and we allow the main code to act as the consumer. We create a generic Generator class that does the following:
  • Creates a CoScheduler that we use to control the generator.
  • Creates a CoQueue buffer into which we will place the generated values.
  • Provides convenience functions yld (in place of the reserved word yield) and generate.
  • Provides next and hasNext functions for the consuming code to call from a non-CPS context, and so that a Generator can be used as an Iterator.
This is all simple and straightforward. Here is the code for Generator:

package net.jimmc.scoroutine

import scala.collection.Iterator
import scala.util.continuations._

/** Generic generator class.
 */
class Generator[T] extends Iterator[T] {
    val sched = new DefaultCoScheduler
    val buf = new CoQueue[T](sched,1)

    /** Subclass calls this method to generate values.
     * @param body The code for your generator.
     */
    def generate(body: => Unit @suspendable) {
        sched.addRoutine("gen") { body }
        sched.run
    }

    /** Yield the next generated value.
     * Call this code from your generator to deliver the next value.
     */
    protected def yld(x:T):Unit @suspendable = {
        buf.blockingEnqueue(x)
    }

    /** Retrieve the next generated value.
     * Call this from your main code.
     */
    def next:T = {
        sched.run
        buf.dequeue
    }

    /** True if there is another value to retrieve.
     * Call this from your main code.
     */
    def hasNext:Boolean = {
        sched.run
        !buf.dequeueBlocker.isBlocked
    }
}
We are not concerning ourselves with performance here, so we are simply using the available DefaultCoScheduler as our scheduler. As a future optimization, we could develop a scheduler optimized for a single coroutine and use that as our scheduler for simple generators that fit that criterion. We could go further and use neither a scheduler nor CoQueue, packaging all of the functionality directly into the Generator class; but we are using the more expedient approach of using those two pieces, since we already have them and are familiar with their use from our experience with coroutines.

Integers Generator

Here is how we would use our generic Generator class to create a generator that will generate integers up to a specified maximum value:
import net.jimmc.scoroutine.Generator

class IntGen(max:Int) extends Generator[Int] {
    generate {
        var x = 1
        while (x<=max) {
            yld(x)
            x = x + 1
        }
    }
}
The one catch to remember here is that the body of the generate call is CPS code, so as with the body of a coroutine, there are some restrictions on what control constructs we can use. Thus we use a while loop with a var rather than a for loop, since the latter does not yet work with the continuations compiler plugin.

Given the above generator class, here is a simple GenInts object with a main function that creates an instance of that generator, then calls it to print out its values:
object GenInts {
    def main(args:Array[String]) = {
        val gen = new IntGen(4)
        for (i <- gen)
            println(i)
    }
}
Alternatively, we could replace the for loop with direct calls to hasNext and next:
object GenInts {
    def main(args:Array[String]) = {
        val gen = new IntGen(4)
        while (gen.hasNext)
            println(gen.next)
    }
}

Primes Generator

It is possible to use shift and reset directly to code up a generator, but because our coroutine library implements a scheduler to which new coroutines can be added at any time, this gives you the ability to create generators that include dynamic filter pipelines.

The example I use for this is the Sieve of Eratosthenes, a method of calculating primes in which, each time a prime is found, it is added to a list of prime divisors that are used for testing each new candidate. In this GenPrimes example, I create a new filter for each prime and add it to the pipeline. You can do this much more efficiently in Scala using a Stream, but this example illustrates the technique of dynamically building a pipeline within a generator.
import scala.util.continuations._

import net.jimmc.scoroutine.CoQueue
import net.jimmc.scoroutine.Generator

object GenPrimes {
    def main(args:Array[String]) = {
        val gen = new PrimeGen()
        for (i <- gen) {
            println("Prime: "+i)
        }
    }
}

class PrimeGen extends Generator[Int] {
    val bufSize = 1
    val out1 = new CoQueue[Int](sched,bufSize)

    sched.addRoutine("prime2")(nextPrime(2,out1))
    generate {
        def gen(n:Int):Unit @suspendable = {
            out1.blockingEnqueue(n)
            gen(n+1)
        }
        gen(2)
    }

    def nextPrime(p:Int, in:CoQueue[Int]):Unit @suspendable = {
        var out:Option[CoQueue[Int]] = None
        yld(p)
        def sieve():Unit @suspendable = {
            val n = in.blockingDequeue()
            if ((n%p)!=0) {
                if (!out.isDefined) {
                    out = Some(new CoQueue[Int](sched,bufSize))
                    val rName = "prime"+n
                    sched.addRoutine(rName)(nextPrime(n,out.get))
                }
                out.get.blockingEnqueue(n)
            } else {
                in.dequeueBlocker.waitUntilNotBlocked
            }
            sieve()
        }
        sieve()
    }
}
This example starts by setting up two coroutines: the addRoutine call sets up the first filter in the pipeline, which reads values from the out1 queue and filters our all numbers divisible by 2. The generator call sets up the other initial coroutine, which generates every integer and feeds it into the first filter in the pipeline. We start off this counting generator with the first prime number, 2.

The nextPrime function is called each time we see a new prime. It starts by outputting its prime parameter value p as a value of the GenPrimes generator. It then goes into a loop reading its input buffer and looking for values which are not divisible by its prime number. The first time it finds one (when out is not yet defined) it registers (with a call to addRoutine) a new coroutine based on a new instance of nextPrime that uses our output as its input. It then passes each candidate prime along to that next filter in the sieve pipeline.

You can tell this is CPS code because of the suspendable annotations, which is a cue to realizing that the code might not behave quite as you think. For example, the gen function within the body of the generate call is recursive, so you might think it would cause a stack overflow. But since we are in a CPS function and the call to blockingEnqueue is a call to a CPS function, the recursive call to gen is turned into a continuation and executed later from the scheduler, so it is in fact not recursive. Likewise the recursive call to sieve is not really recursive for the same reason.

Another CPS detail is the call to waitUntilNotBlocked. It would seem to be functionally unnecessary, since the first thing in the sieve function is a call to blockingDequeue. However, this is the same attempt to avoid blocking as discussed in my previous post; without this call our code will not work.

Same Fringe

The SameFringe problem has been called the "killer application" for coroutines. Given two trees, they have the same fringe if the leaves of the two trees, read from left to right, are the same.

With coroutines, or in this case generators, the simple solution to this problem is to create a generator that takes a tree and returns the sequence of leaves of that tree, then compare the outputs of two of those generators on the two trees to be compared.

We start with a simple tree definition:
sealed abstract class Tree[T]
case class Branch[T](left:Tree[T], right:Tree[T]) extends Tree[T]
case class Leaf[T](x:T) extends Tree[T]
Given this tree definition, we write a generator that walks a tree and yields all of the leaves:
import scala.util.continuations._
import net.jimmc.scoroutine.Generator

class TreeFringe[T](tree:Tree[T]) extends Generator[T] {
    generate {
        def walk(t:Tree[T]):Unit @suspendable = {
            t match {
                case Leaf(x) => yld(x)
                case Branch(left,right) => walk(left); walk(right)
            }
        }
        walk(tree)
    }
}
Since our generators implement the Iterator trait, we can compare two generators as two iterators with this little piece of code, making the assumption that the tree leaf values are never null:
    def sameFringe[T](tree1:Tree[T], tree2:Tree[T]):Boolean = {
        !((new TreeFringe(tree1)).zipAll(new TreeFringe(tree2),null,null).
            exists(p=>p._1!=p._2))
    }
Alternatively, we could use this more verbose version:
    def sameFringe[T](tree1:Tree[T], tree2:Tree[T]):Boolean = {
        val fringe1 = new TreeFringe(tree1)
        val fringe2 = new TreeFringe(tree2)
        while(fringe1.hasNext && fringe2.hasNext) {
            if (fringe1.next != fringe2.next)
                return false;
        }
        !(fringe1.hasNext || fringe2.hasNext)
    }
We add a SameFringe object with a main method that creates some test trees, prints out the leaves of each tree using our generator, then calls our sameFringe method to check for equality.
object SameFringe {
    def main(args:Array[String]) = {
        val t1 = Branch(Branch(Leaf(1),Leaf(2)),Leaf(3))
        val t2 = Branch(Leaf(1),Branch(Leaf(2),Leaf(3)))
        val t3 = Branch(Leaf(1),Branch(Leaf(2),Leaf(4)))
        println("t1:"); for (x <- (new TreeFringe(t1))) println(x)
        println("t2:"); for (x <- (new TreeFringe(t2))) println(x)
        println("t3:"); for (x <- (new TreeFringe(t3))) println(x)
        println("sameFringe(t1,t2)="+sameFringe(t1,t2))
        println("sameFringe(t1,t3)="+sameFringe(t1,t3))
    }
    //include the sameFringe method in this object
}

More Possibilities

Some other possible uses for generators or coroutines:
  • Pipelines: A sequence of tasks can operate on a stream of data, with each task reading data from an input queue and writing to an output queue which is the input queue of the next task in the sequence.
  • Fan-out: A single producer with multiple consumers can be implemented by using a fan-out coroutine that reads from its input queue and writes the same data to multiple output queues, each of which is the input queue to one of the multiple consumers.
  • Fan-in: Multiple producers can use a single shared output queue so that the coroutine using that queue as its input queue receives data from multiple sources. If you stick with a single-thread scheduler, you don't have to worry about synchronization or other concurrent access issues on the shared queue. By combining Pipelines, Fan-out and Fan-in, we can create arbitrary networks of communicating coroutines.
  • State machines: For any situation in which a task has to maintain state based on one or more inputs, a coroutine or generator can be used to allow some of that state to be stored as the location of current execution in the code, which often makes the code simpler to write and maintain.
  • Parsers: A parser is a typical example of a producer that reads an input stream and maintains state. As the parser collects input characters (which could be provided by another coroutine in a pipeline) and resolves them into tokens, it writes them to its output queue where the tokens are available to the routine handling the next level of analysis.

No comments: