Projects‎ > ‎

PA 9: Asynchrony with Rx

Dealing with asynchrony is hard.  As asynchronous programs grow, they become a spaghetti maze of callbacks that is hard to maintain and extend. For this project, you will implement a library for reactive programming similar to Rx from Microsoft. Rx is a library around two core patterns, Observable and Enumerable, that allow programmers to write code in a data-flow fashion. Slides from the lectures on the topic are here.


Monday, April 22, 2013 11:59 PM

Suggested Steps

  1. Read the Rx tutorial and understand Rx. The lecture slides elaborate on SelectMany which might be a bit terse in the tutorial.  Optionally, download Microsoft Rx and write sample programs.
  2. Implement the Rx library following the list of functions we give below.  For each function, write a few test programs.
  3. Implement the simple twitter application.  Once that is working, move on to the advanced one.
  4. Design your own reactive TML webpage. The page needs to combine your bQuery and Rx implementations. Describe your page in a README file.
  5. Think about how Rx is different from Bquery?

Using Rx to write Twitter Applications

This section gives examples of programs that you will be able to write with Rx. 

A simple twitter application:

We want a little program that keeps asking Twitter for their list of most recent tweets.  Assume that we have a library of two data sources: a timer that ticks in regular intervals, and a function that returns a "channel" (i.e., an observable) through which a web server pushes its response.  With Rx, we can compose the two components as if we were wiring them in a dataflow program, in one line of code: 

def tweets = timer:observe(createTweetObservable):flatten():map(simplifyTweet)

The timer, modeled as an observable source, drives the whole program.  It sends regular requests (ticks) to a "dataflow block" createTweetObservable, which returns for every tick an observable connected to the server.  

Through that observable, the server will push its response, which is a list of recent tweets.  

The function observe turns the observable of observables (i.e., a stream of channels connected to the server, each channel containing a single response) into an observable of responses, each of which is a list of tweets. That is, observe "unwraps" each observable, transforming the observable into the first element emitted from it.

Next, flatten converts an observable of iterables (like a list of tweets) into an observable of the elements of those iterables, akin to the flatten procedure introduced in CS61A. Therefore, it turns the observable containing lists of tweets into an observable containing all the tweets.

Finally, map throws out from each response those fields of a tweet that we are not interested in, keeping only the user name and the tweet text.  The variable tweets (defined in the code sample above) contains a reference to an observable with these cleaned-up tweets.

Here is the rest of the program:

def tweetUrl = ""
def createTweetObservable(ev) { Stream.fromHttpRequest(tweetUrl) }
def simplifyTweet(tweet) { {, text=tweet.text} }

You can construct a timer observable with the static factory method, Stream.timer(milliseconds). This is described below in the section titled "Rx", which lists all methods you must implement.


Compare the Rx code above with an equivalent program written using AJAX call-backs.  Write the latter and compare their readability.

Advanced Twitter application:

We now want to extend the application so that a new tweet shows up only when the user clicks on a button.  Each time she clicks, one tweet is displayed; remaining tweets (if any) wait for more clicks in the observable.  If there are no tweets, the click is queued.  

We will use the zip primitive to synchronize clicks and tweets.  Again, what synchronization do we want? When there are no tweets, a click is queued; when there is no click when a tweet arrives, the click waits for a click.  In functional programming, zip is a function that turns two lists into a list of pairs.  This is what we are doing with the stream of clicks and the stream of tweets.  The zip function outputs an element only if both of its inputs are available.  You can view a demo below.



Your library must implement the following functions. The return types of the functions are denoted in [brackets].

  1. [void] stream:subscribe(fun)
    Register fun as a callback. Fun will be called on each element coming out of the stream. It is possible to subscribe to a single stream multiple times.
  2. [void] stream:push(element)
    Inject element into a stream, causing it to be emitted from the stream.
  3. [void] stream:pushMany(elist)
    Inject a list of elements individually into a stream.
  4. [Stream] stream:map(fun)
    Apply fun on every element. The return value of this method is a new stream, whose emitted values are those of the original stream with fun applied on them.
  5. [Stream] stream:observe(fun)   
    Constructs a new stream, similar to map(), except that the output of the mapper function should return an observable stream, from which the first emitted element is pushed into the stream returned by this method.

    def a = Stream:new({}); def b = a:observe(createTweetObservable)
    # a will emit "hi", invoking createTweetObservable("hi"), which returns

    # an observable stream. When Twitter sends back the response, the stream
    # created by createTweetObservable will emit the response, and then
    # b will emit the response.
    # createTweetObservable is defined above in the Twitter example.
    # It returns an observable that will emit one element, the response of an HTTP
    # request.
    The HTTP response will be emitted from the observable when it arrives
    # from the web.

  6. [Stream] stream:flatten()   
    Constructs a new stream from the stream stream. You can assume that the stream stream emits iterables (e.g., lists).  flatten pushes the elements of these iterables into the returned stream.
    • type signature of flatten: Stream[Iterable[T]] -> Stream[T]
  7. [Stream] stream:filter(fun)
    Filters a stream using predicate fun. The return value of this method is a new stream that only emits elements emitted by the original stream and that pass the filter predicate (i.e., fun(element) is truthy).
  8. [void] stream:consume(stream0) 
    Plugs stream0 into the current stream; every time a value is emitted from stream0, the current stream should emit the value as well. It is acceptable if this method returns the current stream in order to support call-chaining, but note that this method injects the elements from stream0 into the current stream, rather than constructing a new stream. This method is similar to multiplex, except that it modifies the current stream.
      def a = Stream:new({}); def b = Stream:new({});
      a:consume(b); b:push(1)  # b will emit 1, and then a will emit 1. 
  9. [Stream] stream:multiplex(stream0)
    Constructs and returns a new stream that emits an element whenever the current stream or stream0 emits an element.
       def a = Stream:new({}); def b = Stream:new({}); def c = a:multiplex(b)
      a:push(1) # a will emit 1, and then c will emit 1.
    Works much like tee in a terminal (see man tee).
  10. [Stream] stream:zip(stream0)
    Merges two streams together. The return value is a stream of pairs of elements, each of which come from one of the two input streams. Contrast this with multiplex, which emits an element whenever either of the two input streams does so. On the other hand, zip only emits an element after both the current stream and stream0 have each emitted an element.
        def a = Stream:new({}); def b = Stream:new({}); def c = a:zip(b)
      a:push(2); a:push(3) # a will emit 2 and 3, but c emits nothing so far.
      b:push("x")          # b will emit "x", and c will emit a pair (2, "x").
      b:push("y")          # b will emit "y" and c will emit the pair (3, "y").
  11. [Stream] Stream.timer(ms)   
    A static factory method to construct a timer stream, which asynchronously fires a timer event at the specified interval. You can build this from window:setInterval(ms) in browser/window.164.
  12. [Stream] Stream.fromDOMEvent(node, event)   
    A static factory method to construct a stream out of DOM events. This is similar to node:pass5_BindEventListeners() in browser/node.164, except that Stream.fromDOMEvent is not limited just to "onclick" event handlers. You can build this from node:addProxyEventListener(event, callback, context) in browser/node.164.
  13. [Stream] Stream.fromHttpRequest(url)   
    A static factory method to construct a stream that will emit the response from a remote HTTP server when it asynchronously arrives some time in the future. You can build this from window:http(url, callback) in browser/window.164.


The following files contain helper functions you might find useful

  • window.164


  1. Your Rx implementation, stored in pa9/browser/rx.164
  2. The advanced twitter feed reader, stored in pa9/tests/rx/twitter.tml and pa9/tests/rx/twitter.164
    The advanced shout app, stored in pa9/tests/rx/lastfm_adv.tml and pa9/tests/rx/lastfm_adv.164
  3. Your submission for the contest of the most impressive reactive web-page. stored in pa9/contest.tml



You can fetch the starter-kit by running the following Mercurial commands:

  1. hg pull https://<your username>
  2. hg merge or hg update, whichever Mercurial tells you to run.


Support for unicode strings must be added to your interpreter in order to handle foreign characters, which often show up in tweets. In our interpreter, it was sufficient to do the following changes:
  1. Replace all isinstance(o, str) by isinstance(o, basestring)
  2. When performing string concatenation (the operator '+' is defined on strings), convert string to unicode first:
    unicode(op1) + unicode(op2)


Teamwork and Repositories

You will work in the same team. Furthermore, we will reuse the same repository we used for all the PAs so far. See the Starter-kit section.

If you have changed partners, please email with the following information:

  • You and your partner's names and instructional accounts.
  • The URL of the bitbucket repository that you will use. You may create a new one by following the same steps as in PA1

Errata and Bug fixes

Once a bug is discovered in the starter-kit, we will publish a corrected version on Bitbucket. Then, you can update your starter-kit (and get the bugfix) with following commands:

  1. hg pull https://<your username>
  2. hg merge or hg update, whichever Mercurial tells you to run.



This assignment will be graded manually on a small set of test cases. Make sure your implementation is general enough and that you do not bake in support for the three tests included with the starter-kit.

Furthermore, you should not rely on our infrastructure any more. Make sure you use your own parser/interpreter/browser. Your submission must include your parser/interpreter/browser. Use hg add.


The Submission Itself

You will submit by tagging the revision you want us to grade with the following tag: "PA9_SUBMIT". Case matters. To do so, please follow the instructions below:

  1. Make sure you pass all the tests included with the starterkit. To do so, all debugging print statements must be removed.
  2. Please don't forget to add all your files to the repository (use hg add). This includes the parser code, the grammar, the interpreter, the browser and the test files. Do NOT add any *.pyc files.
  3. Commit any outstanding modifications with "hg commit -m message_here"
  4. Commit with  "hg commit -m message_here"
  5. Add the tag with the command "hg tag PA9_SUBMIT"
  6. Publish the project on the server with  "hg push"
  7. Go to<username>/cs164/changesets
    You should see the tag in the timeline. If you do, you are all set.

Do not forget steps 4 & 5, otherwise your tag will not be visible to our autograder.

 If you have discovered a last minute bug, you can remove the tag as follows:

  1. Run "hg tag --remove PA9_SUBMIT"
  2. Publish with  "hg push"

Then you can keep working on the project and add the tag again when you are ready to submit.