Last active
December 15, 2015 13:19
-
-
Save zyga/5266468 to your computer and use it in GitHub Desktop.
Toy example for producer/consumer pattern that may either talk bytes or strings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Producers | |
def text_producer() -> str: | |
yield "I write" | |
yield "some text" | |
yield "some of which zażółć gęślą jaźń is in Polish" | |
def byte_producer() -> bytes: | |
yield b'\0\1\2\3' | |
# Consumers | |
def byte_consumer() -> bytes: | |
while True: | |
value = yield | |
print("Byte consumer got value {!r}".format(value)) | |
if value is None: | |
break | |
assert isinstance(value, bytes) | |
def text_consumer() -> str: | |
while True: | |
value = yield | |
print("Text consumer got value {!r}".format(value)) | |
if value is None: | |
break | |
assert isinstance(value, str) | |
# Pushers | |
def push_passthru(producer, consumer): | |
for value in producer: | |
consumer.send(value) | |
def push_text2bytes(producer, consumer, encoding='UTF-8'): | |
for value in producer: | |
assert isinstance(value, str) | |
consumer.send(value.encode(encoding)) | |
def push_bytes2text(producer, consumer, encoding='UTF-8'): | |
for value in producer: | |
assert isinstance(value, bytes) | |
# This is wrong (partial decoding) but this is just a toy | |
consumer.send(value.decode(encoding)) | |
# Converter factory | |
def mediator(producer, consumer): | |
if producer.__annotations__['return'] == consumer.__annotations__['return']: | |
return push_passthru | |
elif producer.__annotations__['return'] == str: | |
return push_text2bytes | |
else: | |
return push_bytes2text | |
def smart_pipe(producer, consumer): | |
pusher = mediator(producer, consumer) | |
tap = producer() | |
sink = consumer() | |
next(sink) # unplug the sink | |
pusher(tap, sink) | |
# Example application | |
def main(): | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("producer", choices=['text', 'byte']) | |
parser.add_argument("consumer", choices=['text', 'byte']) | |
ns = parser.parse_args() | |
smart_pipe({ | |
'text': text_producer, | |
'byte': byte_producer, | |
}[ns.producer], { | |
'text': text_consumer, | |
'byte': byte_consumer, | |
}[ns.consumer]) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment