Keith Yang
yang@keitheis.org
@keitheis
PyCon Taiwan 2016
Reactive Programming in Python
Map a iterable, i.e., a list, to a function.
In [1]: str_map = map(str, [1, 2, 3])
In [2]: list(str_map)
Out[2]: ['1', '2', '3']
Create small anonymous function inline.
In [1]: multiple_7 = lambda x: x * 7
In [2]: type(multiple_7)
Out[2]: function
In [3]: multiple_7(6)
Out[3]: 42
Function is first-class object in Python.
In [1]: list(map(lambda x: x * 2, [1, 2, 3]))
Out[1]: [2, 4, 6]
“Everything can be a stream!”
The mantra of Reactive Programming.
In brief a sequence of events.
for in if
( expression for expr in sequence1
if condition1
for expr2 in sequence2
if condition2
for expr3 in sequence3 ...
if condition3
for exprN in sequenceN
if conditionN )
Visual messive!
# Imperactive style.
from pathlib import Path
from zipfile import ZipFile
files_stream = (
ZipFile(file.name)
for file in Path(".").iterdir()
if file.name.lower().endswith('.zip')
)
zipped_content_list = []
for a_zipfile in files_stream:
for item_name in a_zipfile.namelist():
zipped_content_list.append(item_name)
from pathlib import Path
from zipfile import ZipFile
from functional import seq
zipped_content_list = (
seq(Path(".").iterdir())
.filter(lambda item: item.is_file())
.filter(
lambda file: file.name.lower().endswith('.zip')
).map(ZipFile)
.flat_map(lambda a_zipfile: a_zipfile.namelist())
)
from pathlib import Path
from zipfile import ZipFile
from functional import pseq
zipped_content_list = (
pseq(Path(".").iterdir())
.filter(lambda item: item.is_file())
.filter(
lambda file: file.name.lower().endswith('.zip')
).map(ZipFile)
.flat_map(lambda a_zipfile: a_zipfile.namelist())
)
pseq is new in PyFunctional 0.7
db.query(users) \
.filter(role == Cafe.customer) \
.filter(cups_bought >= 100_000_000) \
.update(state="caffeinated")
let rec qsort = function
| [] -> []
| pivot :: rest ->
let is_less x = x < pivot in
let left, right = List.partition is_less rest in
qsort left @ [pivot] @ qsort right;;
num_stream = [1, 2, 3, 4, 5, 6]
evens1 = []
for n in numbers:
if (n % 2) == 0:
evens1.append(n) # Imperactive
evens2 = [n for n in num_stream if (n % 2) == 0]
# List comprehension result: [2, 4, 6]
from functional import seq
def is_even(num): # Big business logic!
return (num % 2) == 0
evens3 = seq(num_stream).filter(is_even)
# [2, 4, 6]
functional
module is provided by PyFunctional
kernels = [
f.name for f in folder_path.iterdir()
if f.is_file() and f.name.startswith('vmlinuz')
]
# To add one more filter to exclude rescue kernel?
kernels = seq(folder_path.iterdir()) \
.filter(lambda f: f.is_file()) \
.filter(lambda f: f.name.startswith('vmlinuz')) \
.filter(lambda f: 'rescue' not in k.name) \
.map(lambda f: f.name)
import rx
num_stream = [1, 2, 3, 4]
num_flow = rx.Observable.from_(num_stream)
num_flow.subscribe(print)
In [1]: import rx
In [2]: num_stream = [1, 2, 3, 4]
In [3]: num_flow = rx.Observable.from_(num_stream)
In [4]: num_flow.subscribe(print)
1
2
3
4
Out[4]: <rx.disposables.AnonymousDisposable
.AnonymousDisposable at 0x1041fcac8>
chars_flow = rx.Observable.from_(["a", "b", "c"])
numbers_flow = rx.Observable.from_([1, 2, 3, 4])
# or rx.Observable.range(1, 4)
printable_flow = numbers_flow.map(
lambda num: num * 2
).merge(chars_flow)
printable_flow.subscribe(print)
# a 2 b 4 c 6 8 10
most.fromEvent('mousemove', document)
.map(function(event) {
return event.clientX + ',' + event.clientY;
})
.startWith('move the mouse, please')
.observe(function(xy_str) {
document.body.textContent = xy_str;
});
Most.js - Monadic reactive streams
auto num_flow = rxcpp::observable<>::create(
[](rxcpp::subscriber<int> s){
for(i=0; i<=5; i++)
s.on_next(i);
s.on_completed();
});
num_flow.subscribe(
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
Parallel execution is concurrent by definition, but concurrency is not necessarily parallelism.
concurrent.futures
?import time
import concurrent.futures
import rx
num_stream = [1, 2, 3, 4, 5]
def work_slowly(data):
time.sleep(1)
return data * 2
with concurrent.futures.ProcessPoolExecutor(5) as worker:
rx.Observable.from_(num_stream) \
.flat_map(
lambda num: worker.submit(work_slowly, num)
).subscribe(print)
A bite of asynchronous coding mixed with RxPy.
with concurrent.futures.ProcessPoolExecutor(5) as worker:
rx.Observable.from_(num_stream) \
.flat_map(
lambda num: worker.submit(work_slowly, num)
).subscribe(print)
Observable model allows you to treat streams of Asynchronous events with the same sort of operations that you use for collections of data items like arrays. It frees you from callbacks, and thereby makes your code more readable and less prone to bugs.
To support receiving events via push, an Observable/Observer pair connect via subscription. The Observable represents the stream of data and can be subscribed to by an Observer.
An Rx Observable is the async “dual” of an Iterable. By “dual”, it means that the Observable provides all the functionality of an Iterable except in the reverse flow of data: it is push instead of pull.
Pull (Iterable) | Push (Observable) |
---|---|
it.next() | on_next(it) |
raise Exception | on_error(Exception) |
returns | on_completed() |
I want values,
inside each event.
To compose asynchronous streams
together concurrently.
str_lists = [["6", "03"], ["42"]]
list1 = []
for str_list in str_lists:
for var in str_list:
list1.append(int(var))
# [6, 3, 42]
This example is much simplified for imagination purpose.
from functional import seq
def int_list(iterable):
return [int(var) for var in iterable]
list2 = seq(str_lists).flat_map(int_list)
# [6, 3, 42]
For a function that takes a event and returns Observable stream, in order to continue the operations of upstream Observable, we usually want to flatMap the sequence of returned Observable stream to just one Observable stream.
Observable<Integer> num_flow = Observable
.range(1, 6)
.doOnEach(debug("Did"));
num_flow.subscribe(num -> System.out.println(num));
Observable<Integer> num_flow = Observable
.range(1, 6)
.flatMap(n -> Observable
.range(n, 3)
.subscribeOn(Schedulers.computation())
.doOnEach(debug("Did"))
);
num_flow.subscribe(num -> System.out.println(num));
RxComputationThreadPool-3|Did: >3
RxComputationThreadPool-1|Did: >1
RxComputationThreadPool-2|Did: >4
RxComputationThreadPool-3|Did: ->2
RxComputationThreadPool-1|Did: >7
RxComputationThreadPool-2|Did: ->6
RxComputationThreadPool-3|Did: -->5
RxComputationThreadPool-3|Did: --->|
import time
import rx
def work_slowly(value):
def go_make_big(subscriber):
subscriber.on_next(make_big(value))
subscriber.on_completed()
return rx.Observable.create(go_make_big)
def make_big(value):
print("Make big {}".format(value))
time.sleep(1)
print("Done for big {}".format(value))
return value * 2
scheduler = rx.concurrency.NewThreadScheduler()
rx.Observable.from_([1, 2, 3, 4]) \
.flat_map(lambda num: work_slowly(num) \
.subscribe_on(scheduler)) \
.reduce(lambda a, b: a + b) \
.subscribe(print)
time.sleep(1.01)
Make big 1
Make big 2
Make big 3
Make big 4
Done for big 2
Done for big 1
Done for big 3
Done for big 4
20
rx.concurrency.Scheduler
import time
import rx
def work_slowly(x):
print('Processing ' + str(x))
time.sleep(1)
rx.Observable.range(1, 3) \
.select_many(lambda i: rx.Observable.start(
lambda: work_slowly(i),
scheduler=rx.concurrency.Scheduler.timeout)
) \
.observe_on(rx.concurrency.Scheduler.event_loop) \
.subscribe(print)
time.sleep(1.01) # wait to complete
PyToolz does not implement parallel processing systems. It does however provide parallel algorithms that can extend existing parallel systems. Our general solution is to build algorithms that operate around a user-supplied parallel map function.
from toolz.curried import map
from toolz import frequencies, compose, concat, merge_with
def stem(word):
word.lower().rstrip(",.!)-*_?:;$'-"").lstrip("-*'"(_$'")
wordcount = compose(frequencies, map(stem), concat, map(str.split), open)
filenames = ['Book_%d.txt'%i for i in range(10000)]
# Advance to Multiprocessing map for heavy computation on single machine
# from multiprocessing import Pool
# p = Pool(8)
# pmap = p.map
# Finish with distributed parallel map for big data
from IPython.parallel import Client
p = Client()[:]
pmap = p.map_sync
total = merge_with(sum, pmap(wordcount, filenames))
from flexx import app, ui, event
class Example(ui.Widget):
def init(self):
self.count = 0
with ui.HBox():
self.button = ui.Button(text='Click me', flex=0)
self.label = ui.Label(flex=1)
@event.connect('button.mouse_down')
def _handle_click(self, down):
if down:
self.count += 1
self.label.text = 'clicked %i times' % self.count
main = app.launch(Example)
app.run()
This example requires flexx > 0.3.1
@event.connect('button.mouse_down')
def _handle_click(self, down):
...
event.connect
subscribes mouse-down event(
button.mouse_down
)
to _handle_click
function.
Python Reactive Programming is written by Dag Brattli, RxPy author