Streams represent an infinite sequence of data. Let's say S = s0, s1, s3.... Streams typically have a "next" function that when invoked returns the next element
Simple streams could be thought of functions from natural numbers. For example the stream of even numbers
Other streams like the fibonacci could be thought of as computing the current element from previous elements of the stream. In the case of fibonacci the window is 2, but it could be any constant number, or maybe even all past data produced by the stream. However streams are "causal" systems and cannot depend on "future" elements
In general an element of a stream could be represented as s_i = f(i, s_(i-1), ..., s_(i-N)) or even s_i = f(i, s_(i-1), ..., s_0)
Usual functions like map, filter, zip could have stream versions, where map_stream takes a map function, an input stream and outputs an output mapped stream.
Can represent infinite sequences
Lazy in nature, so uses less memory
One could pack prefetching inside the stream and have performant data readers. Using stream transforming functions, it could be arranged in a reactive pipeline. (eg, Tensorflow data loaders)
We can write streams in python using "yield" like so. Functions that "yield" instead of "return"ing are called generators.
def nat_nums(start):
while True:
yield start
start+=1
f = nat_nums(5)
print(next(f)) # 5
print(next(f)) # 6
print(next(f)) # 7
This is a fairly straightforward way, however it needs language support (in the form of the yield construct). In the next section we will construct streams without advanced language support
Streams can be implemented using thunks, (which are zero-argument functions) like this:
def nat_nums_from(start):
def helper():
return start, nat_nums_from(start+1)
return helper
nxt0 = nat_nums_from(5) # nxt0 has start=5 in its closure
n0, nxt1 = nxt0() # nxt1 has start=6 in its closure
n1, nxt2 = nxt1() # nxt2 has start=7 in its closure
n2, nxt3 = nxt2() # nxt3 has start=8 in its closure
print(n0, n1, n2) # 5 6 7
We wish to construct a function of type T, and T = () -> (i, T). Note the recursive (and compare with recursive datatypes like lists and trees). Now as a thunk type function it cannot take in any arguments, and we know that pure functions that don't take in any argument are essentially constants...except if they have something in their closure. We generally think of arguments as inputs to functions, but depending on where it was constructed, the environment/closure could also be an input to a function. In other words, you could curry to create a thunk.
def create_const_func(x):
def helper():
return x
return helper
Note the similarity with the stream function. helper just needs to output the next stream. It could output itself and type-wise it would match, but it would be a constant stream then. The only other option is to create a new stream, and for that helper calls nat_nums_from(start+1)
Because it is crucial for helper to get the value of start but it must be zero argument, hence the only way is helper be nested in nat_nums_from and get start from its environment closure.
Typically mutually recursive functions are non-nested and recurse by calling each other such as:
is_even = lambda x : True if x==0 else is_odd(x-1)
is_odd = lambda x : True if x==1 else is_even(x-1)
but this is an example where recursion is achieved by call-and-construct, rather than call-and-call:
helper calling nat_nums_from
nat_nums_from constructs helper (but does not call it). However since they are nested nat_nums_from influences/"passes inputs to" helper through the closure. It does not call, but when it will be called, the input passed from nat_nums_from will come into play
A very natural way of expressing natural numbers (forgive the pun) is:
def helper(i):
return i, helper(i+1)
The only problem is, it does not terminate. And to make it laze we just convert the second return into a thunk
def helper(i):
return i, lambda : helper(i+1)
a, nxt = helper(3)
b, nxt = nxt()
c, nxt = nxt()
d, nxt = nxt()
print(a,b,c,d) # 3 4 5 6
If you are willing to look past the non-similarity of the first call with subsequent calls, a single function is enough to do streams
Of course to homogenize the first call, you can add an extra function layer to hide the first call
def natnum():
def helper(i):
return i+1, lambda : helper(i+1)
return 0, lambda : helper(0)
a, natnum = natnum()
b, natnum = natnum()
print(a,b) # 0, 1
But now you have lost the ability to "define the start point", which you can overcome by wrapping it up in another function that accepts a "start"
A lot of streams can be thought of as functions on natural numbers. This
def nat_num_mapper(mapper):
def stream(nat_num):
def helper():
return mapper(nat_num), stream(nat_num+1)
return helper
return stream(0)
even_stream = nat_num_mapper(lambda x : 2*x)
n0, nxt = even_stream()
n1, nxt = nxt()
n2, nxt = nxt()
print(n0, n1, n2) # 0 2 4
These are all the "aleph-0" infinite sets
Here the stream generates based on a fixed window into the past. As a sample, fibonacci is implemented
def fixed_window_stream(ini_vals, combine_fn):
def stream(past_window):
def helper():
curr_val = combine_fn(past_window)
return curr_val, stream(past_window[1:] + [curr_val])
return helper
return stream(ini_vals)
# fibonacci is a 2 length window, with "sum" acting as the combining function
fibonaci = fixed_window_stream([0, 1], sum)
n0, nxt = fibonaci()
n1, nxt = nxt()
n2, nxt = nxt()
n3, nxt = nxt()
n4, nxt = nxt()
print(n0, n1, n2, n3, n4) # 1 2 3 5 8
Lets write map_stream. It accepts a stream and a map function and returns a new stream. So its definition is : def map_stream(mapper, stream)
Now internally it returns a stream. So inside map_stream we must have a stream and inside that a helper. Putting that together we have:
def map_stream(mapper, in_stream):
def out_stream(stream):
item, nxt = stream()
def helper():
return mapper(item), out_stream(nxt)
return helper
return out_stream(in_stream)
strm = nat_nums_from(5)
mapped_stream = map_stream(lambda x : 10*x, strm)
n0, nxt1 = mapped_stream()
a, nxt_strm = strm() # lets use the input stream a couple of times
b, _ = nxt_strm()
n1, nxt2 = nxt1()
n2, nxt3 = nxt2()
print(n0, n1, n2) # 50, 60, 70
print(a, b) # 5, 6
Note that even though we used the input stream in the middle, it didn't change the output stream (why?)
Consider a Finite-state-machines (FSM) shown below
a/x
b/m 0--->1 a/n
<----
b/y
It has 2 states 0 and 1. It takes 2 inputs a and b and outputs x, y, n and m
A crude, non-error-checked implementation for this FSM is:
def state0(inp):
return ('m','x')[inp=='a'], state1 if inp=='a' else state0
def state1(inp):
return ('y','n')[inp=='a'], state1 if inp=='a' else state0
To run this on a sequence of inputs, we could do something like:
inputs = ['a', 'a', 'b', 'b']
next_state = state0
for inp in inputs:
out, next_state = next_state(inp)
print(out) # prints x n y m
Next we could pack this into a function that accepts an input stream and a state like so:
def fsm_runner(state, inputs):
if len(inputs) == 0:
return
else:
out, next_state = state(inputs[0])
print(out)
fsm_runner(next_state, inputs[1:])
fsm_runner(state0, inputs) # prints x n y m
Note the inherent finiteness of fsm_runner with a condition on length of inputs. Also fsm_runner is too eager in its evaluation
Here is a simple stream representing the sequence a, a, a, a...
def in_stream():
def helper():
return 'a', stream
return 'a', helper
# in_stream = nat_num_mapper(lambda x : 'a') # if you want to use the stream constructors from the previous section
A modified version of fsm_runner accepts the input state, the input sequence stream and produces a stream that will produce the outputs if called
def fsm_runner_stream(state, in_stream):
inp, nxt_stream = in_stream()
out, next_state = state(inp)
def helper():
return out, fsm_runner_stream(next_state, nxt_stream)
return helper
out_stream = fsm_runner_stream(state0, stream)
out0, out_stream = out_stream()
out1, out_stream = out_stream()
out2, out_stream = out_stream()
out3, out_stream = out_stream()
print(out0, out1, out2, out3) # prints x n n n