Skip to content

Instantly share code, notes, and snippets.

@shihongzhi
Created September 14, 2012 15:12
Show Gist options
  • Save shihongzhi/3722535 to your computer and use it in GitHub Desktop.
Save shihongzhi/3722535 to your computer and use it in GitHub Desktop.
##Part 1
#iteration
class countdown_iter(object):
def __init__(self, start):
self.count = start
def __iter__(self):
return self
def next(self):
if self.count <= 0:
raise StopIteration
r = self.count
self.count -= 1
return r
#Calling a generator function creates an generator objct.
#However, it does not start running the function
#The function only executes on next()
def countdown_gen(n):
while n > 0:
yield n
n -= 1
c = countdown_gen(5)
for i in c:
print i
##Part 2
#Did't create large temporary lists
#like a pipeline, The key: Think big.....
wwwlog = open("access-log")
bytecolumn = (line.rsplit(None, 1)[1] for line in wwwlog)
bytes = (int(x) for x in bytecolumn if x != '-')
print "Total", sum(bytes)
import os
import fnmatch
import gzip, bz2
import re
def gen_find(filepat, top):
for path, dirlist, filelist in os.walk(top):
for name in fnmatch.filter(filelist, filepat):
yield os.path.join(path, name)
def gen_open(filenames):
for name in filenames:
if name.endswith(".gz"):
yield gzip.open(name)
elif name.endswith(".bz2"):
yield bz2.BZ2File(name)
else:
yield open(name)
def gen_cat(sources):
for s in sources:
for item in s:
yield item
def gen_grep(pat, lines):
patc = re.compile(pat)
for line in lines:
if patc.search(line):
yield line
def field_map(dictseq, name , func):
for d in dictseq:
d[name] = func(d[name])
yield d
def lines_from_dir(filepat, dirname):
names = gen_find(filepat, dirname)
files = gen_open(names)
lines = gen_cat(files)
return lines
def apache_log(lines):
groups = (logpat.match(line) for line in lines)
tuples = (g.groups() for g in groups if g)
colnames = ('host', 'referrer', 'user', 'datetime', 'method',
'request', 'proto', 'status', 'bytes')
log = (dict(zip(colnames, t)) for t in tuples)
log = field_map(log, "bytes", lambda s: int(s) if s != '-' else 0)
log = field_map(log, "status", int)
return log
lines = lines_from_dir("access-log*", "www")
#the beauty of generators is that you can plug filters in at almost any stage
lines = (line for line in lines if 'robots.txt' in line)
log = apache_log(lines)
for r in log:
print r
#some other queries
stat404 = set(r['request'] for r in log if r['status'] == 404)
large = (r for r in log if r['bytes'] > 1000000)
hosts = set(r['host'] for r in log) #collect all unique host IP addresses
##Part 3
##Processing Infinite Data
#tail -f
import time
def follow(thefile):
thefile.seek(0, 2) #Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) #Sleep briefly
continue
yield line
##Part 4
##Feeding the Pipeline
import socket
def receive_connections(addr):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(5)
while True:
client = s.accept()
yield client
#Example:
for c,a in receive_connections(("", 9000)):
c.send("Hello World\n")
c.close()
def receive_messages(addr, maxsize):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(addr)
while True:
msg = s.recvfrom(maxsize)
yield msg
#Example:
for msg, addr in receive_messages(("", 10000), 1024):
print msg, "from", addr
#I/O Multiplexing
import select
def gen_events(socks):
while True:
rdr, wrt, err = select.select(socks, socks, socks, 0.1)
for r in rdr:
yield "read", r
for w in wrt:
yield "write", w
for e in err:
yield "error", e
clientset = []
def acceptor(sockset, addr):
for c, a in receive_connections(addr):
sockset.append(c)
acc_thr = threading.Thread(target=acceptor, args==(clientset, ("", 12000)))
acc_thr.setDaemon(True)
acc_thr.start()
for evt, s in gen_events(clientset):
if evt == "read":
data = s.recv(1024)
if not data:
print "Closing", s
s.close()
clientset.remove(s)
else:
print s, data
#Consuming a Queue
def consume_queue(thequeue):
while True:
item = thequeue.get()
if item is StopIteration: break
yield item
import Queue, threading
def consumer(q):
for item in consume_queue(q):
print "Consumed", item
print "Done"
in_q = Queue.Queue()
con_thr = threading.Thread(target=consumer, args=(in_q, ))
con_thr.start()
for i in xrange(100):
in_q.put(i)
in_q.put(StopIteration)
##Part 5
##Extending the Pipeline
#Multiple consumers
#Broadcasting
def broadcast(source, consumers):
for item in source:
for c in consumers:
c.send(item)
#Sadly, inside consumers, it is not possible to continue the same processing pipeline idea
#You can do this with threads or distributed processes however
#Consumer Thread
import Queue, threading
class ConsumerThread(threading.Thread):
def __init__(self, target):
threading.Thread.__init__(self)
self.setDaemon(True)
self.in_queue = Queue.Queue()
self.target = target
def send(self, item):
self.in_queue.put(item)
def generate(self):
while True:
item = self.in_queue.get()
yield item
def run(self):
self.target(self.generate())
def find_404(log):
for r in (r for r in log if r['status'] == 404):
print r['status'], r['datetime'], r['request']
def bytes_transferred(log):
total = 0
for r in log:
total += r['bytes']
print "Total bytes", total
c1 = ConsumerThread(find_404)
c1.start()
c2 = ConsumerThread(bytes_transferred)
c2.start()
lines = follow(open("access-log"))
log = apache_log(lines)
broadcast(log, [c1, c2])
#Multiple Sources
def concatenate(sources):
for s in sources:
for item in s:
yield item
#or use parallel iteration
import itertools
z = itertools.izip(s1, s2, s3) #Terminates when the first exits
#Multiplexing generators
def gen_multiplex(genlist):
item_q = Queue.Queue()
def run_one(source):
for item in source:
item_q.put(item)
def run_all():
thrlist = []
for source in genlist:
t = threading.Thread(target=run_one, args=(source, ))
t.start()
thrlist.append(t)
for t in thrlist: t.join()
item_q.put(StopIteration)
threading.Thread(target=run_all).start()
while True:
item = item_q.get()
if item is StopIteration: return
yield item
##Part 6
##Co-routines
#To get a Co-routines to run properly, you have to ping it with a .next() operation first
#The .next() bit can be handled via decoration
def consumer(func):
def start(*args, **kwargs):
c = func(*args, **kwargs)
c.next()
return c
return start
@consumer
def find_404():
while True:
r = (yield)
if r['status'] == 404:
print r['status'], r['datetime'], r['request']
@consumer
def bytes_transferred():
total = 0
while True:
r = (yield)
total += r['bytes']
print "Total bytes", total
lines = follow(open('access-log'))
log = apache_log(lines)
broadcast(log, [find_404(), bytes_transferred()])
#Discussion: there were no threads, co-operative multitasking, concurrent programming without using threads
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment