Skip to content

Instantly share code, notes, and snippets.

@AnyISalIn
Last active July 18, 2017 07:25
Show Gist options
  • Save AnyISalIn/f1a7190f14e99002e9d8301ad07fceb3 to your computer and use it in GitHub Desktop.
Save AnyISalIn/f1a7190f14e99002e9d8301ad07fceb3 to your computer and use it in GitHub Desktop.
tail and grep
from functools import wraps
from time import sleep
# coroutine 先要用 next 语句调用一次
def coroutine(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
ret = fn(*args, **kwargs)
next(ret)
return ret
return wrapper
def follow(file, target):
'''
类似 Unix 的 tail -f, 但是接受一个 target, 有新的数据,交给 target 处理
'''
file.seek(0, 2) # 直接到文件的最后一行,类似 tail -f -n 0
while True:
line = file.readline() # 读取行
if not line: # 如果是空行,直接 sleep 0.5s,然后 continue
sleep(0.5)
continue
target.send(line) # 如果有新数据,则交给 target 处理
def action():
'''do something'''
pass
@coroutine
def grep(pattern):
'''
grep 是一个 coroutine,接受一个 pattern 参数,需要从外部 send 数据
如果接受的数据匹配 pattern 通过了,则调用 action
'''
while True:
line = yield
if pattern in line:
action()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment