Core Streams

This document takes you through how to build basic streams and push data through them. We start with map and accumulate, talk about emitting data, then discuss flow control and finally back pressure. Examples are used throughout.

Map, emit, and sink

Stream.emit(x[, asynchronous]) Push data into the stream at this point
map(upstream, func, *args, **kwargs) Apply a function to every element in the stream
sink(upstream, func, *args, **kwargs) Apply a function on every element

You can create a basic pipeline by instantiating the Streamz object and then using methods like map, accumulate, and sink.

from streamz import Stream

def increment(x):
    return x + 1

source = Stream()
source.map(increment).sink(print)

The map and sink methods both take a function and apply that function to every element in the stream. The map method returns a new stream with the modified elements while sink is typically used at the end of a stream for final actions.

To push data through our pipeline we call emit

>>> source.emit(1)
2
>>> source.emit(2)
3
>>> source.emit(10)
11

As we can see, whenever we push data in at the source, our pipeline calls increment on that data, and then calls print on that data, resulting in incremented results being printed to the screen.

Often we call emit from some other continuous process, like reading lines from a file

import json

data = []

source = Stream()
source.map(json.loads).sink(data.append)

for line in open('myfile.json'):
    source.emit(line)

Accumulating State

accumulate(upstream, func[, start, …]) Accumulate results with previous state

Map and sink both pass data directly through a stream. One piece of data comes in, either one or zero pieces go out. Accumulate allows you to track some state within the pipeline. It takes an accumulation function that takes the previous state, the new element, and then returns a new state and a new element to emit. In the following example we make an accumulator that keeps a running total of the elements seen so far.

def add(x, y):
    return x + y

source = Stream()
source.accumulate(add).sink(print)
>>> source.emit(1)
1
>>> source.emit(2)
3
>>> source.emit(3)
6
>>> source.emit(4)
10

The accumulation function above is particularly simple, the state that we store and the value that we emit are the same. In more complex situations we might want to keep around different state than we emit. For example lets count the number of distinct elements that we have seen so far.

def num_distinct(state, new):
    state.add(new)
    return state, len(state)

 source = Stream()
 source.accumulate(num_distinct, returns_state=True, start=set()).sink(print)

 >>> source.emit('cat')
 1
 >>> source.emit('dog')
 2
 >>> source.emit('cat')
 2
 >>> source.emit('mouse')
 3

Accumulators allow us to build many interesting operations.

Flow Control

buffer(upstream, n[, loop]) Allow results to pile up at this point in the stream
flatten([upstream, upstreams, stream_name, …]) Flatten streams of lists or iterables into a stream of elements
partition(upstream, n, **kwargs) Partition stream into tuples of equal size
sliding_window(upstream, n, **kwargs) Produce overlapping tuples of size n
union(*upstreams, **kwargs) Combine multiple streams into one
unique(upstream[, history, key]) Avoid sending through repeated elements

You can batch and slice streams into streams of batches in various ways with operations like partition, buffer, and sliding_window

source = Stream()
source.sliding_window(3).sink(print)

>>> source.emit(1)
>>> source.emit(2)
>>> source.emit(3)
(1, 2, 3)
>>> source.emit(4)
(2, 3, 4)
>>> source.emit(5)
(3, 4, 5)

Branching and Joining

combine_latest(*upstreams, **kwargs) Combine multiple streams together to a stream of tuples
zip(*upstreams, **kwargs) Combine streams together into a stream of tuples
zip_latest(lossless, *upstreams, **kwargs) Combine multiple streams together to a stream of tuples

You can branch multiple streams off of a single stream. Elements that go into the input will pass through to both output streams.

def increment(x):
    return x + 1

def decrement(x):
    return x - 1

source = Stream()
a = source.map(increment).sink(print)
b = source.map(decrement).sink(print)
b.visualize(rankdir='LR')
a branching stream
>>> source.emit(1)
0
2
>>> source.emit(10)
9
11

Similarly you can also combine multiple streams together with operations like zip, which emits once both streams have provided a new element, or combine_latest which emits when either stream has provided a new element.

source = Stream()
a = source.map(increment)
b = source.map(decrement)
c = a.zip(b).map(sum).sink(print)

>>> source.emit(10)
20  # 9 + 11
a branching and zipped stream

This branching and combining is where Python iterators break down, and projects like streamz start becoming valuable.

Processing Time and Back Pressure

delay(upstream, interval[, loop]) Add a time delay to results
rate_limit(upstream, interval, **kwargs) Limit the flow of data
timed_window(upstream, interval[, loop]) Emit a tuple of collected results every interval

Time-based flow control depends on having an active Tornado event loop. Tornado is active by default within a Jupyter notebook, but otherwise you will need to learn at least a little about asynchronous programming in Python to use these features. Learning async programming is not mandatory, the rest of the project will work fine without Tornado.

You can control the flow of data through your stream over time. For example you may want to batch all elements that have arrived in the last minute, or slow down the flow of data through sensitive parts of the pipeline, particularly when they may be writing to slow resources like databases.

Streamz helps you do these operations both with operations like delay, rate_limit, and timed_window, and also by passing Torando futures back through the pipeline. As data moves forward through the pipeline, futures that signal work completed move backwards. In this way you can reliably avoid buildup of data in slower parts of your pipeline.

Lets consider the following example that reads JSON data from a file and inserts it into a database using an async-aware insertion function.

async def write_to_database(...):
    ...

# build pipeline
source = Source()
source.map(json.loads).sink(write_to_database)

async def process_file(fn):
    with open(fn) as f:
        for line in f:
            await source.emit(line)  # wait for pipeline to clear

As we call the write_to_database function on our parsed JSON data it produces a future for us to signal that the writing process has finished. Streamz will ensure that this future is passed all the way back to the source.emit call, so that user code at the start of our pipeline can await on it. This allows us to avoid buildup even in very large and complex streams. We always pass futures back to ensure responsiveness.

But wait, maybe we don’t mind having a few messages in memory at once, this will help steady the flow of data so that we can continue to work even if our sources or sinks become less productive for brief periods. We might add a buffer just before writing to the database.

source.map(json.loads).buffer(100).sink(write_to_database)

And if we are pulling from an API with known limits then we might want to introduce artificial rate limits at 10ms.

source.rate_limit(0.010).map(json.loads).buffer(100).sink(write_to_database)

Operations like these (and more) allow us to shape the flow of data through our pipelines.

Modifying and Cleaning up Streams

When you call Stream you create a stream. When you call any method on a Stream, like Stream.map, you also create a stream. All operations can be chained together. Additionally, as discussed in the section on Branching, you can split multiple streams off of any point. Streams will pass their outputs on to all downstream streams so that anyone can hook in at any point, and get a full view of what that stream is producing.

If you delete a part of a stream then it will stop getting data. Streamz follows normal Python garbage collection semantics so once all references to a stream have been lost those operations will no longer occur. The one counter example to this is sink, which is intended to be used with side effects and will stick around even without a reference.

Note

Sink streams store themselves in streamz.core._global_sinks. You can remove them permanently by clearing that collection.

>>> source.map(print)      # this doesn't do anything
>>> source.sink(print)     # this stays active even without a reference
>>> s = source.map(print)  # this works too because we have a handle to s

Recursion and Feedback

By connecting sources to sinks you can create feedback loops. As an example, here is a tiny web crawler:

from streamz import Stream
source = Stream()

pages = source.unique()
content = pages.map(requests.get).map(lambda x: x.content)
links = content.map(get_list_of_links).flatten()
links.sink(source.emit)  # pipe new links back into pages

pages.sink(print)

>>> source.emit('http://github.com')
http://github.com
http://github.com/features
http://github.com/business
http://github.com/explore
http://github.com/pricing
...

Performance

Streamz adds microsecond overhead to normal Python operations.

from streamz import Stream

source = Stream()

def inc(x):
    return x + 1

source.sink(inc)

In [5]: %timeit source.emit(1)
100000 loops, best of 3: 3.19 µs per loop

In [6]: %timeit inc(1)
10000000 loops, best of 3: 91.5 ns per loop

You may want to avoid pushing millions of individual elements per second through a stream. However, you can avoid performance issues by collecting lots of data into single elements, for example by pushing through Pandas dataframes instead of individual integers and strings. This will be faster regardless, just because projects like NumPy and Pandas can be much faster than Python generally.

In the following example we pass filenames through a stream, convert them to Pandas dataframes, and then map pandas-level functions on those dataframes. For operations like this Streamz adds virtually no overhead>

source = Stream()
s = source.map(pd.read_csv).map(lambda df: df.value.sum()).accumulate(add)

for fn in glob('data/2017-*-*.csv'):
    source.emit(fn)

Streams provides higher level APIs for situations just like this one. You may want to read further about collections