Reactive Programming in Python

Keith Yang

What is Reactive?

Game
is reactive.

Presentation?
not

... if you fall into sleep

Reactive Programming
in Python


Keith Yang

yang@keitheis.org
@keitheis
PyCon Taiwan 2016

Keith Yang

Sofeware Engineering Artisan

@Taipeipy & @PyConTW

Outlines

I. What & Why

II. Dive into & play with

III. Thumbs up

Reactive Programming in Python

I. What is
Reactive Programming?

My hand... tell me!

Seen on web application

I mean...
programming?

Functional
Python
101

Map

Map a iterable, i.e., a list, to a function.

In [1]: str_map = map(str, [1, 2, 3])

In [2]: list(str_map)
Out[2]: ['1', '2', '3']

Lambda

Create small anonymous function inline.

In [1]: multiple_7 = lambda x: x * 7

In [2]: type(multiple_7)
Out[2]: function

In [3]: multiple_7(6)
Out[3]: 42

Function is first-class object in Python.

Map & Lambda

In [1]: list(map(lambda x: x * 2, [1, 2, 3]))

Out[1]: [2, 4, 6]

Programming Paradigms

Declarative: Dataflow: Reactive

Category of Reactive Programming Paradigm

Relatives:

(Events) Stream

Stream: Core Spirit of Reactive

“Everything can be a stream!”

The mantra of Reactive Programming.

In brief a sequence of events.

Stream

Stream in for in if

( expression for expr in sequence1
    if condition1
    for expr2 in sequence2
    if condition2
    for expr3 in sequence3 ...
    if condition3
    for exprN in sequenceN
    if conditionN )

Visual messive!

# Imperactive style.
from pathlib import Path
from zipfile import ZipFile

files_stream = (
    ZipFile(file.name)
    for file in Path(".").iterdir()
    if file.name.lower().endswith('.zip')
)

zipped_content_list = []
for a_zipfile in files_stream:
    for item_name in a_zipfile.namelist():
        zipped_content_list.append(item_name)

Rewrite with PyFunctional

from pathlib import Path
from zipfile import ZipFile
from functional import seq

zipped_content_list = (
    seq(Path(".").iterdir())
    .filter(lambda item: item.is_file())
    .filter(
      lambda file: file.name.lower().endswith('.zip')
    ).map(ZipFile)
    .flat_map(lambda a_zipfile: a_zipfile.namelist())
)

Parallel with PyFunctional

from pathlib import Path
from zipfile import ZipFile
from functional import pseq

zipped_content_list = (
    pseq(Path(".").iterdir())
    .filter(lambda item: item.is_file())
    .filter(
      lambda file: file.name.lower().endswith('.zip')
    ).map(ZipFile)
    .flat_map(lambda a_zipfile: a_zipfile.namelist())
)

pseq is new in PyFunctional 0.7

Feel like SQL query

db.query(users) \
    .filter(role == Cafe.customer) \
    .filter(cups_bought >= 100_000_000) \
    .update(state="caffeinated")

Better?
It depends.

Why Reactive Programming

Aannndddd...

Functional

Quicksort Example

Quicksort logic in Ocaml

let rec qsort = function
    | [] -> []
    | pivot :: rest ->
        let is_less x = x < pivot in
        let left, right = List.partition is_less rest in
        qsort left @ [pivot] @ qsort right;;

Code Smell

num_stream = [1, 2, 3, 4, 5, 6]

evens1 = []
for n in numbers:
    if (n % 2) == 0:
        evens1.append(n)  # Imperactive

evens2 = [n for n in num_stream if (n % 2) == 0]

# List comprehension result: [2, 4, 6]

Try
PyFunctional

Code Smell (Cont.)

Functional Reactive Programming (FRP)

from functional import seq

def is_even(num):  # Big business logic!
    return (num % 2) == 0

evens3 = seq(num_stream).filter(is_even)
# [2, 4, 6]

functional module is provided by PyFunctional

Real life example: find_kernels()

kernels = [
    f.name for f in folder_path.iterdir()
    if f.is_file() and f.name.startswith('vmlinuz')
]
# To add one more filter to exclude rescue kernel?
kernels = seq(folder_path.iterdir()) \
    .filter(lambda f: f.is_file()) \
    .filter(lambda f: f.name.startswith('vmlinuz')) \
    .filter(lambda f: 'rescue' not in k.name) \
    .map(lambda f: f.name)

A = B + C

Spreadsheet

Spreadsheet is “Reactive”

Recap: What Reactive Programming

Recap: Why Reactive Programming

II. Dive into Reactive

Stream Fun

  1. Alike workload
  2. Merge
  3. Re-use

A example powered by RxPy (rx)

import rx
num_stream = [1, 2, 3, 4]

num_flow = rx.Observable.from_(num_stream)
num_flow.subscribe(print)
In [1]: import rx

In [2]: num_stream = [1, 2, 3, 4]

In [3]: num_flow = rx.Observable.from_(num_stream)

In [4]: num_flow.subscribe(print)
1
2
3
4
Out[4]: <rx.disposables.AnonymousDisposable
    .AnonymousDisposable at 0x1041fcac8>

Merge

chars_flow = rx.Observable.from_(["a", "b", "c"])
numbers_flow = rx.Observable.from_([1, 2, 3, 4])
# or rx.Observable.range(1, 4)

printable_flow = numbers_flow.map(
    lambda num: num * 2
).merge(chars_flow)

printable_flow.subscribe(print)
# a 2 b 4 c 6 8 10

Most.js - Live Demo

Mouse position X, Y =

most.fromEvent('mousemove', document)
    .map(function(event) {
        return event.clientX + ',' + event.clientY;
    })
    .startWith('move the mouse, please')
    .observe(function(xy_str) {
        document.body.textContent = xy_str;
    });

Most.js - Monadic reactive streams

ReactiveX

ReactiveX (Cont.)

Even in C++: RxCpp

auto num_flow = rxcpp::observable<>::create(
    [](rxcpp::subscriber<int> s){
        for(i=0; i<=5; i++)
            s.on_next(i);
        s.on_completed();
    });

num_flow.subscribe(
    [](int v){printf("OnNext: %d\n", v);},
    [](){printf("OnCompleted\n");});

Abstract away

Focus
on biz logic

Concurrent

Parallel

Parallel execution is concurrent by definition, but concurrency is not necessarily parallelism.

Asynchronous Programming is Ha...Ha...Hard

How about concurrent.futures?

import time
import concurrent.futures
import rx

num_stream = [1, 2, 3, 4, 5]

def work_slowly(data):
    time.sleep(1)
    return data * 2

with concurrent.futures.ProcessPoolExecutor(5) as worker:
    rx.Observable.from_(num_stream) \
        .flat_map(
            lambda num: worker.submit(work_slowly, num)
        ).subscribe(print)

A bite of asynchronous coding mixed with RxPy.

Observable, flat_map, subscribe

with concurrent.futures.ProcessPoolExecutor(5) as worker:
    rx.Observable.from_(num_stream) \
        .flat_map(
            lambda num: worker.submit(work_slowly, num)
        ).subscribe(print)

Why & What
Observables

Why Observables

Observable model allows you to treat streams of Asynchronous events with the same sort of operations that you use for collections of data items like arrays. It frees you from callbacks, and thereby makes your code more readable and less prone to bugs.

What Observables

To support receiving events via push, an Observable/Observer pair connect via subscription. The Observable represents the stream of data and can be subscribed to by an Observer.

What Observables (Cont.)

An Rx Observable is the async “dual” of an Iterable. By “dual”, it means that the Observable provides all the functionality of an Iterable except in the reverse flow of data: it is push instead of pull.

Observable v.s. Iterable

Pull (Iterable)Push (Observable)
it.next()on_next(it)
raise Exceptionon_error(Exception)
returnson_completed()

flat_map

I want values,
inside each event.
To compose asynchronous streams
together concurrently.

Imaginate the behavior of flat_map()

str_lists = [["6", "03"], ["42"]]

list1 = []
for str_list in str_lists:
    for var in str_list:
        list1.append(int(var))

# [6, 3, 42]

This example is much simplified for imagination purpose.

flat_map in PyFunctional

from functional import seq

def int_list(iterable):
    return [int(var) for var in iterable]

list2 = seq(str_lists).flat_map(int_list)
# [6, 3, 42]

Another common usage of flat_map

For a function that takes a event and returns Observable stream, in order to continue the operations of upstream Observable, we usually want to flatMap the sequence of returned Observable stream to just one Observable stream.

Parallel with Rx Schedulers

Rx is concurrency-agnostic and as a matter of fact it does not introduce concurrency on its own.
You can use every scheduler you like to decide where to execute the work.

A RxJava example

Observable<Integer> num_flow = Observable
    .range(1, 6)
    .doOnEach(debug("Did"));

num_flow.subscribe(num -> System.out.println(num));

Parallel with RxJava

Observable<Integer> num_flow = Observable
    .range(1, 6)
    .flatMap(n -> Observable
        .range(n, 3)
        .subscribeOn(Schedulers.computation())
        .doOnEach(debug("Did"))
    );
num_flow.subscribe(num -> System.out.println(num));

The look of parallel output

RxComputationThreadPool-3|Did: >3
RxComputationThreadPool-1|Did: >1
RxComputationThreadPool-2|Did: >4
RxComputationThreadPool-3|Did: ->2
RxComputationThreadPool-1|Did: >7
RxComputationThreadPool-2|Did: ->6
RxComputationThreadPool-3|Did: -->5
RxComputationThreadPool-3|Did: --->|

Back to RxPy

rx.concurrency
.NewThreadScheduler

import time
import rx

def work_slowly(value):
    def go_make_big(subscriber):
        subscriber.on_next(make_big(value))
        subscriber.on_completed()
    return rx.Observable.create(go_make_big)

def make_big(value):
    print("Make big {}".format(value))
    time.sleep(1)
    print("Done for big {}".format(value))
    return value * 2

scheduler = rx.concurrency.NewThreadScheduler()
rx.Observable.from_([1, 2, 3, 4]) \
    .flat_map(lambda num: work_slowly(num) \
                          .subscribe_on(scheduler)) \
    .reduce(lambda a, b: a + b) \
    .subscribe(print)

time.sleep(1.01)

Output by using NewThreadScheduler

Make big 1
Make big 2
Make big 3
Make big 4
Done for big 2
Done for big 1
Done for big 3
Done for big 4
20

With rx.concurrency.Scheduler

import time
import rx

def work_slowly(x):
    print('Processing ' + str(x))
    time.sleep(1)

rx.Observable.range(1, 3) \
    .select_many(lambda i: rx.Observable.start(
            lambda: work_slowly(i),
            scheduler=rx.concurrency.Scheduler.timeout)
    ) \
    .observe_on(rx.concurrency.Scheduler.event_loop) \
    .subscribe(print)

time.sleep(1.01)  # wait to complete

RxPy Schedulers

In RxPY you can choose to run fully asynchronously or you may decide to schedule work and timeouts using threads.
RxPY also comes with batteries included, and has a number of Python specific mainloop schedulers to make it easier for you to use RxPY with your favorite Python framework.

RxPy Schedulers (Cont.)

Some
more things

Parallel Algorithms in PyToolz

PyToolz does not implement parallel processing systems. It does however provide parallel algorithms that can extend existing parallel systems. Our general solution is to build algorithms that operate around a user-supplied parallel map function.

from toolz.curried import map
from toolz import frequencies, compose, concat, merge_with

def stem(word):
    word.lower().rstrip(",.!)-*_?:;$'-"").lstrip("-*'"(_$'")

wordcount = compose(frequencies, map(stem), concat, map(str.split), open)
filenames = ['Book_%d.txt'%i for i in range(10000)]

# Advance to Multiprocessing map for heavy computation on single machine
# from multiprocessing import Pool
# p = Pool(8)
# pmap = p.map

# Finish with distributed parallel map for big data
from IPython.parallel import Client
p = Client()[:]
pmap = p.map_sync

total = merge_with(sum, pmap(wordcount, filenames))

Flexx
Python UI tookit

UI tookit based on web technology

from flexx import app, ui, event

class Example(ui.Widget):
    def init(self):
        self.count = 0
        with ui.HBox():
            self.button = ui.Button(text='Click me', flex=0)
            self.label = ui.Label(flex=1)

    @event.connect('button.mouse_down')
    def _handle_click(self, down):
        if down:
            self.count += 1
            self.label.text = 'clicked %i times' % self.count

main = app.launch(Example)
app.run()

This example requires flexx > 0.3.1

Reactive concept in flexx

@event.connect('button.mouse_down')
def _handle_click(self, down):
    ...

event.connect subscribes mouse-down event( button.mouse_down) to _handle_click function.

III. Summary

Try functional
& async with Rx

Try functional & async with Rx

References

Python Reactive Programming is written by Dag Brattli, RxPy author

References (Cont.)

Q&A

Thank You!