Monday, August 15, 2011

Folding Stream with Scala

Introduction
Couple of weeks ago, I read a well known paper on expressiveness of fold (Hutton, 1999). The paper is a very interesting paper that explains how fold is a very powerful construct.

The codes in the paper are written in Haskell and therefore assumes laziness. This means that the execution of function in Haskell is delayed until it is really needed. This is not the case for Scala, where the operation is executed eagerly, or in other word, it's strict instead of lazy.

Stream is the implementation of lazy list in Scala. Using Stream, the elements are only evaluated when they are needed. So, we may expect that Stream is close to Haskell's list.

The use of scala Stream and fold right are the topic of this post.

Fold Right
Let's back to fold. The paper I mentioned above (Hutton, 1999) showed how expressive fold construct, especially foldRight. The objective of this post is to show the same things as the paper, but in Scala.

Let's see the problem we may encounter. The following Haskell code works:
   1 Prelude> foldr (||) False (repeat True)
   2 True

But not the following Scala code:
   1 Stream.continually(true).foldRight(false)(_ || _)

because the code above throws StackOverflowError.

The reason of the error is because Stream.foldRight is not actually lazy. When I read the article and wondered how the non-laziness of Stream.foldRight could be solved, I received Tony Morris' tweet telling that scalaz fixes the non-laziness. The following is the reimplementation of lazy foldRight for Stream inspired by Foldr scalaz implementation:

1 
2 def foldr[A, B]( combine: (A, =>B) => B, base: B )(xs: Stream[A]): B = {
3    if (xs.isEmpty) 
4      base
5    else 
6      combine(xs.head,  foldr(combine, base)(xs.tail))
7 }

The following code works without StackOverflowError:

   1 def combine(x: Boolean, y: =>Boolean) = x || y
   2 foldr(combine, false)(Stream.continually(true))  

The key of the foldr implementation above is in thetype definition of combine parameter: (A, =>B) instead of (A,B). This makes the second parameter evaluated lazily.

Some Fold Right Uses
The following shows some familiar functions implemented using foldr.
   1 def sum(xs: Stream[Int]) = {
   2     def combine(x: Int, y: =>Int) = x + y
   3     foldr(combine, 0)(xs)
   4 }
   5 def product(xs: Stream[Int]) = {
   6     def combine(x: Int, y: =>Int) = x * y
   7     foldr(combine, 1)(xs)
   8 }
   9 def and(xs: Stream[Boolean]) = {
  10    def combine(x: Boolean, y: =>Boolean) = x && y
  11    foldr(combine, true)(xs)  
  12 }
  13 def or(xs: Stream[Boolean]) = {
  14    def combine(x: Boolean, y: =>Boolean) = x || y
  15    foldr(combine, false)(xs)  
  16 }

Using Fold Right to Implement Scala Collection Functions
As in (Hutton, 1999), the following section shows how Fold Right can be used to implement some functions. At the first time, let's see length, filter, reverse, flatten, and map implementations.
   1 def map[A,B](f:A=>B, xs:Stream[A]) = {
   2    def combine(x:A, xs: =>Stream[B]) = f(x)  #:: xs
   3    val base:Stream[B] = Stream.empty
   4    foldr(combine, base)(xs)
   5 }
   6 def length[A](xs: Stream[A]) = {
   7     def combine(x: A, len: =>Int) = len + 1
   8     foldr(combine, 0)(xs)
   9 }
  10 def reverse[A](xs: Stream[A]) = {
  11     def combine(x: A, xs: =>Stream[A]) = Stream.concat(xs, Stream(x))
  12     val base:Stream[A] = Stream.empty
  13     foldr(combine, base)(xs)
  14 }
  15 def filter[A](p:A=>Boolean, xs: Stream[A]) = {
  16    def combine(x: A, xs: =>Stream[A]) = if (p(x)) x #:: xs else xs 
  17    foldr(combine , Stream.empty)(xs)
  18 }
  19 def filterNot[A](p:A=>Boolean, xs: Stream[A]) = {
  20    def combine(x: A, xs: =>Stream[A]) = if (p(x)) xs else x #:: xs 
  21    val base:Stream[A] = Stream.empty  
  22    foldr(combine, base)(xs)
  23 }
  24 def flatten[A](xss: Stream[Stream[A]]) = {
  25    def combine(xs: Stream[A], ys: =>Stream[A]) = xs #::: ys
  26    val base: Stream[A] = Stream.empty
  27    foldr(combine, base)(xss)
  28 }
  29 def partition[A](p:A=>Boolean, xs:Stream[A]) = {
  30    (filter(p, xs), filterNot(p, xs))
  31 }
  32 

Note that, the functions like flatten, filter, and map, work with infinite Streams. See the following examples:
   1 scala> map( (_:Int)+1, Stream.from(1))
   2 res2: Stream[Int] = Stream(2, ?)
   3 
   4 scala> res2.take(4).toList
   5 res3: List[Int] = List(2, 3, 4, 5)
   6 
   7 scala> filter( (_:Int) % 2 ==0, Stream.from(10)).take(5).toList
   8 res5: List[Int] = List(10, 12, 14, 16, 18)
   9 
  10 scala> partition( (_:Int) % 2 == 0, Stream.from(20))
  11 res6: (scala.collection.immutable.Stream[Int], Stream[Int]) = (Stream(20, ?),Stream(21, ?))
  12 
  13 scala>  flatten(Stream.continually(Stream(1, 3, 4))).take(10).toList
  14 res13: List[Int] = List(1, 3, 4, 1, 3, 4, 1, 3, 4, 1)

Implementing dropWhile and break
One of the most interesting part of (Hutton, 1999) is the dropWhile implementation using foldr. The implementation is shown in the following code:

   1 def dropWhile[A](pred: A => Boolean, xs: Stream[A])= {
   2    def combine(x: A, xs: =>(Stream[A], Stream[A])) = 
   3       if (pred(x)) 
   4          (xs._1, x #:: xs._2) 
   5       else 
   6          (x #:: xs._2, x #:: xs._2)
   7    val base:(Stream[A], Stream[A]) = (Stream.empty, Stream.empty)
   8    foldr(combine, base)(xs) _1
   9 }

Instead of directly producing the result, a tuple of Streams is returned. The first Stream is the result and the second one is used for bookkeeping purpose.
The following illustrates an execution of dropWhile:

   1 dropWhile( (_:Int)<5, Stream(1,2,7,3,4,5,6,7,8,9,10)) 
   2 10 => ( (10), (10))
   3 9  => ( (9,10), (9,10))
   4 8  => ( (8,9,10), (8,9,10))
   5 7  => ( (7,8,9,10), (7,8,9,10))
   6 6  => ( (6,7,8,9,10), (6,7,8,9,10))
   7 5  => ( (5,6,7,8,9,10), (5,6,7,8,9,10))
   8 4  => ( (5,6,7,8,9,10), (4,5,6,7,8,9,10))
   9 3  => ( (5,6,7,8,9,10), (3,4,5,6,7,8,9,10))
  10 7  => ( (7, 3,4,5,6,7,8,9,10), (7,3,4,5,6,7,8,9,10))
  11 2  => ( (7, 3,4,5,6,7,8,9,10), (2, 7,3,4,5,6,7,8,9,10))
  12 1  => ( (7, 3,4,5,6,7,8,9,10), (1, 2, 7,3,4,5,6,7,8,9,10)) 

What surprising is that the solution actually works for infinite Stream too as illustrated in the following code:
   1 scala> dropWhile( (_:Int)<5, Stream.from(1)).take(10).toList
   2 res17: List[Int] = List(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)

The last interesting function implemented here is break. The  equivalent of Haskell's break in Scala collection is called span (Update, Oct 24 thanks to oxbow). The break function creates a tuple of two lists separated at condition boundary. Example:

   1 scala> val (a,b) = break( (_:Int)<5, Stream.from(1))
   2 a: Stream[Int] = Stream(1, ?)
   3 b: Stream[Int] = Stream(5, ?)
   4 
   5 scala> a.toList
   6 res22: List[Int] = List(1, 2, 3, 4)
   7 
   8 scala> b.take(5).toList
   9 res23: List[Int] = List(5, 6, 7, 8, 9)
  10 

The implementation of break here, of course, highly inspired from dropWhile one:
   1 def break[A](pred: A=>Boolean, xs: Stream[A]) = {
   2    def combine(x: A, xs: =>(Stream[A], Stream[A], Stream[A])) =
   3       if (pred(x)) 
   4         (xs._1, x #:: xs._2, x #:: xs._3)             
   5       else
   6         (x #:: xs._3, Stream.empty, x #:: xs._3) 
   7              
   8    val base:(Stream[A], Stream[A], Stream[A]) = (Stream.empty, Stream.empty, Stream.empty)
   9    
  10    val result = foldr(combine, base)(xs) 
  11    (result._2, result._1)
  12 }

Summary 
In this blog post, I showed the result of playing around with Stream and fold right. First, I rewrote the implementation of foldRight, showed the use of foldRight, expressed some collection functions using foldRight, and finally provided dropWhile and break.

The blog post does not have intention to overcome the non-laziness of Scala language. Scala is not a lazy language, and I think it's not appropriate to use lazy evaluation as the main stream of the codes.

Pope (Pope, 2010 ?) in Monad Reader 6 discussed further the implementation of dropWhile. He presented two further solutions based on higher order function composition. He also presented the fold implementation using what is so called fix function. See http://en.wikibooks.org/wiki/Haskell/Fix_and_recursion to have an idea of what Fix function is.

References 

4 comments:

oxbow said...

scala does have Haskell's "break" - it is called "span"

voidmainargs said...

@oxbow thanks for the information. The blog is updated accordingly.

walter said...

what is the reason of currying stream here:

def foldr[A, B]( combine: (A, =>B) => B, base: B )(xs: Stream[A]): B ...

vs.

def foldr[A, B]( combine: (A, =>B) => B, base: B, xs: Stream[A]): B

Thank you,
walter

Unknown said...

walter: I don't think there's any good reason for that. What would be better would be:

def foldr[A, B](xs: Stream[A], base: B)( combine: (A, =>B) => B): B

This allows Scala type inference to figure out what A is from the value of xs, and what B is from the value of base, so that you don't need to specify those types when writing combine. But that requires currying: to infer type parameters, Scala takes the types of all arguments in a parameter list; the results of type inference can be used then only in *next* parameter list.
This way, you could just write all the function arguments of foldr as anonymous functions, without any type declaration for them.