Skip to content

Instantly share code, notes, and snippets.

@ivoreroman
Created November 2, 2017 17:19
Show Gist options
  • Save ivoreroman/22e56d540a39b50c82680105e99d5ce6 to your computer and use it in GitHub Desktop.
Save ivoreroman/22e56d540a39b50c82680105e99d5ce6 to your computer and use it in GitHub Desktop.
Python concurrency talk

Python concurrency: From iterators to async/await

Global Interpreter Lock (GIL)

import time

COUNT = 100000000

start_time = time.time()

def count(num):
    counter = 0
    while counter < num:
        counter += 1

count(COUNT)

return "time: %s seconds" % (time.time() - start_time)
import threading
import time

start_time = time.time()
COUNT = 100000000

def count(num):
    counter = 0
    while counter < num:
        counter += 1

t1 = threading.Thread(target=count, args=(COUNT//2,))
t2 = threading.Thread(target=count, args=(COUNT//2,))

t1.start()
t2.start()
t1.join()
t2.join()

return "time: %s seconds" % (time.time() - start_time)
import time

start_time = time.time()

def nap(duration):
    time.sleep(duration)
    print("Nap duration: {}".format(duration))

nap(2)
nap(2)
nap(2)

return "time: %s seconds" % (time.time() - start_time)
import time
import threading

start_time = time.time()

def nap(duration):
    time.sleep(duration)
    print("Nap duration: {}".format(duration))

t1 = threading.Thread(target=nap, args=(2,))
t2 = threading.Thread(target=nap, args=(2,))
t3 = threading.Thread(target=nap, args=(2,))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

return "time: %s seconds" % (time.time() - start_time)

Iterators & Generators

Iterators provide a way to fetch items lazily when scanning sequences that don’t fit in memory. They need to provide next and iter methods.

class Ip:
    def __init__(self, ip):
        self.ip = ip
        self.octets = ip.split('.')

    def __repr__(self):
        return 'IP(%s)' % self.ip

    def __iter__(self):
        return IpIterator(self.octets)

class IpIterator:
    def __init__(self, octets):
        self.octets = octets
        self.index = 0

    def __next__(self):
        try:
            octet = self.octets[self.index]
        except IndexError:
            raise StopIteration()
        self.index += 1
        return octet

    def __iter__(self):
        return self

    def __len__(self):
        return 4

ip = Ip('192.168.12.4')

for octet in ip:
    print(octet)

it = iter(Ip('192.167.1.3'))
for i in range(len(it)):
    octet = next(it)  # Same than it.__next__()
    print(octet)
192
168
12
4
192
167
1
3

Generators return an intermediate result, but maintain the function’s local state so it can be resumed where it left off.

def geo_prog(begin, ratio, limit=None):
    while limit is None or begin < limit:
        yield begin
        begin *= ratio

prog = geo_prog(1, 3, 100)
print(prog)

for i in prog:
    print(i)

# Generators share the same interface than iterators
del prog
prog = geo_prog(1, 4)
for i in range(5):
    print(prog.__next__())
<generator object geo_prog at 0x10c4b5728>
1
3
9
27
81
1
4
16
64
256

Coroutines

Coroutines share interface with generators but add the .send method.

def average():
    total = 0.0
    count = 0
    avg = None
    while True:
        element = yield avg
        total += element
        count += 1
        avg = total/count

avg = average()
next(avg)

for n in range(1, 11):
    print(avg.send(n))
1.0
1.5
2.0
2.5
3.0
3.5
4.0
4.5
5.0
5.5
from collections import namedtuple

Event = namedtuple('Event', 'time team description')

def dev_process(team, tickets, start_time=0):
    time = yield Event(start_time, team, 'TO DO')
    for i in range(tickets):
        time = yield Event(time, team, 'IN PROGRESS')
        time = yield Event(time, team, 'IN QA')
        time = yield Event(time, team, 'DONE')

    yield Event(time, team, 'ALL DONE')

dev = dev_process('pod 1', 2, 0)
todo = next(dev)
in_progress = dev.send(todo.time + 1)
in_qa = dev.send(in_progress.time + 7)
done = dev.send(in_qa.time + 4)

print(todo, in_progress, in_qa, done, sep='\n')
Event(time=0, team='pod 1', description='TO DO')
Event(time=1, team='pod 1', description='IN PROGRESS')
Event(time=8, team='pod 1', description='IN QA')
Event(time=12, team='pod 1', description='DONE')
from queue import PriorityQueue
from random import randint
from collections import namedtuple

Event = namedtuple('Event', 'time team description')

def dev_process(team, tickets, start_time=0):
    time = yield Event(start_time, team, 'TO DO')
    for i in range(tickets):
        time = yield Event(time, team, 'IN PROGRESS')
        time = yield Event(time, team, 'IN QA')
        time = yield Event(time, team, 'DONE')

    yield Event(time, team, 'ALL DONE')

teams = {1: dev_process(team=1, tickets=2, start_time=0),
         2: dev_process(team=2, tickets=3, start_time=3),
         3: dev_process(team=3, tickets=1, start_time=4)}

class Simulator:
    def __init__(self, procs):
        self.events = PriorityQueue()
        self.procs = procs

    def run(self, end_time):
        for key, proc in sorted(self.procs.items()):
            event = next(proc)
            self.events.put(event)

        time = 0
        while time < end_time:
            if self.events.empty():
                break

            event = self.events.get()
            time, proc_id, past_action = event
            print('team: ', proc_id, proc_id * '    ', event)
            active_proc = self.procs[proc_id]
            next_time = time + randint(1, 5)
            try:
                next_event = active_proc.send(next_time)
            except StopIteration:
                del self.procs[proc_id]
            else:
                self.events.put(next_event)
        else:
            print('%s events pending' % self.events.qsize())

sim = Simulator(teams)
sim.run(20)
team:  1      Event(time=0, team=1, description='TO DO')
team:  1      Event(time=1, team=1, description='IN PROGRESS')
team:  2          Event(time=3, team=2, description='TO DO')
team:  1      Event(time=4, team=1, description='IN QA')
team:  3              Event(time=4, team=3, description='TO DO')
team:  1      Event(time=5, team=1, description='DONE')
team:  2          Event(time=6, team=2, description='IN PROGRESS')
team:  1      Event(time=8, team=1, description='IN PROGRESS')
team:  2          Event(time=9, team=2, description='IN QA')
team:  3              Event(time=9, team=3, description='IN PROGRESS')
team:  1      Event(time=10, team=1, description='IN QA')
team:  3              Event(time=11, team=3, description='IN QA')
team:  2          Event(time=13, team=2, description='DONE')
team:  1      Event(time=14, team=1, description='DONE')
team:  2          Event(time=15, team=2, description='IN PROGRESS')
team:  3              Event(time=16, team=3, description='DONE')
team:  1      Event(time=18, team=1, description='ALL DONE')
team:  2          Event(time=18, team=2, description='IN QA')
team:  3              Event(time=18, team=3, description='ALL DONE')
team:  2          Event(time=21, team=2, description='DONE')
1 events pending

asyncio

asyncio implements concurrency using generators coordinated by an event loop.

import json
import asyncio
import aiohttp

@asyncio.coroutine
def get_pokemon(number):
    url = 'http://pokeapi.co/api/v2/pokemon/{}'.format(number)
    resp = yield from aiohttp.request('GET', url)
    data = yield from resp.read()
    return data

@asyncio.coroutine
def download_pokemon(number):
    pokemon = yield from get_pokemon(number)
    poke_json = json.loads(pokemon)
    return poke_json

def download_pokemons(pokemon_numbers):
    loop = asyncio.get_event_loop()
    downloads = [download_pokemon(number) for number in sorted(pokemon_numbers)]
    wait = asyncio.wait(downloads)
    response, _ = loop.run_until_complete(wait)
    loop.close()
    return response

poke_tasks = download_pokemons([4, 5, 6])
for task in poke_tasks:
    pokemon = task.result()
    print(pokemon['name'])
charmeleon
charizard
charmander

async/await

async and await are keywords added to…

This proposal aims to answer that need by making writing explicitly asynchronous, concurrent Python code easier and more Pythonic.

It is proposed to make coroutines a proper standalone concept in Python, and introduce new supporting syntax. The ultimate goal is to help establish a common, easily approachable, mental model of asynchronous programming in Python and make it as close to synchronous programming as possible.

import json
import aiohttp

async def get_pokemon(number):
    url = 'http://pokeapi.co/api/v2/pokemon/{}'.format(number)
    resp = await aiohttp.request('GET', url)
    data = await resp.read()
    return data

async def download_pokemon(number):
    pokemon = await get_pokemon(number)
    poke_json = json.loads(pokemon)
    return poke_json

def download_pokemons(pokemon_numbers):
    loop = asyncio.get_event_loop()
    downloads = [download_pokemon(number) for number in sorted(pokemon_numbers)]
    wait = asyncio.wait(downloads)
    response, _ = loop.run_until_complete(wait)
    loop.close()
    return response

poke_tasks = download_pokemons([4, 5, 6])
for task in poke_tasks:
    pokemon = task.result()
    print(pokemon['name'])
charizard
charmander
charmeleon
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment