https://rxpy.readthedocs.io/en/latest/get_started.html
Reactive Extensions (RX) is for composing and processing asynchronous event/message streams.
A data stream is a series of Observables (actual event values), and we can use pipable query operators on the data stream.
The query operators are implemented as functions, and can be chained as a piple of operations. Thus you can filter, map,
reduce, compose and perform and time-based operations on the stream of events easily.
we can also parameterize concurrency in data streams using Schedulers, and use Observer to subscribe to Observables (events).
#pip install rx #install the library
Create Observable
To create observables, you can use a create() factory and specify the function for generating events. The push_five_strings()
below simulates the process to generate events. There are 3 types of handling in generating events:
on_next() #emmits an event
on_completed() #Observable completes
on_error() #an error occurs
Note it is 'observer.on_next()' below so it's calling the observer?.
In reality, the generation function can be listening to a sensor or mornitoring temperature, etc.
The generated events are pushed out by the Observable to downstream consumers.
from rx import create
def push_five_strings(observer, scheduler):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
source = create(push_five_strings)
source.subscribe(
on_next = lambda i: print("Received {0}".format(i)),
on_error = lambda e: print("Error Occurred: {0}".format(e)),
on_completed = lambda: print("Done!"),
)
The 'source' here is an Observable created by the create() function, which creates a sequence of events.
Note there are many other types of Observable Factory, such as if_ten, case, concat, etc. different logics to create a stream.
When the Observable (source) is subscribed here, the generation function push_five_strings() is called and emits the sequence of events.
The subscribe function specifies three call backs functions (in lambda) to handle the next, error and complete separately.
In fact is only emitting a sequence of events, it can be further simplified by using the of() function, which
by default emits the items listed with the of().
from rx import of
source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(
on_next = lambda i: print("Received {0}".format(i)),
on_error = lambda e: print("Error Occurred: {0}".format(e)),
on_completed = lambda: print("Done!"),
)
And we dont have to handle next, error and completed separately. Can ignore error and completed and handle next only.
so here is the concise version:
from rx import of
source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(lambda value: print("Received {0}".format(value)))
Operators and pipe
You can feed an Observable into a pipe where a number of operators are used to transform the events.
Below we use 'map' operator in a pipe to convert an event to length of event, and use 'filter' operator
to keep only lengths >= 5.
Note every operator yields a new Observable, so there are an original source Observable, and another one from map, and
a third one from filter.
from rx import of, operators as op
source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
composed = source.pipe(
op.map(lambda s: len(s)),
op.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))
Those intermediate Observables yielded from operators may not be needed unless someone subscribes to the intermediate results.
If not needed, it can simplified as:
from rx import of, operators as op
of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
op.map(lambda s: len(s)),
op.filter(lambda i: i >= 5)
).subscribe(lambda value: print("Received {0}".format(value)))
Here the of(), pipe() and subscribe() are in one line of code. Pretty concise.