Skip to content

Instantly share code, notes, and snippets.

@agumonkey
Last active March 9, 2019 02:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save agumonkey/6cd72c08b2e2135fecea4190957f6507 to your computer and use it in GitHub Desktop.
Save agumonkey/6cd72c08b2e2135fecea4190957f6507 to your computer and use it in GitHub Desktop.
pipes in python
import urllib.request as rq
class ConfusedThreader(Exception):
pass
class Fun:
def __getattr__(self,attribute):
def aux(o):
return getattr(o,attribute)()
return aux
fn = Fun()
done = lambda t: t.val()
class Threader:
def __init__(self, v):
self.v = v
self.fns = []
def val(self):
r = self.v
for f in self.fns:
r = f(r)
return r
def __or__(self,other):
if other is done:
return done(self)
elif callable(other):
self.fns.append(other)
return self
elif other is None:
return self.val()
else:
raise ConfusedThreader()
class DebuggingThreader(Threader):
def __init__(self, v, verbose=True):
super().__init__(v)
self.verbose = verbose
self.trace = {}
def val(self):
r = self.v
for f in self.fns:
try:
o = r
r = f(r)
self.trace[f] = (o, r)
if self.verbose:
print('[debug]',f,o,r)
except Exception as e:
print(e)
return r
def Thread(o, debug=False):
return DebuggingThreader(o) if debug else Threader(o)
DEBUG=True
def demo():
Thread("https://www.example.com",DEBUG) \
| rq.urlopen \
| fn.readlines \
| b''.join \
| fn.decode \
| print \
| done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment