Skip to content

Instantly share code, notes, and snippets.

@lagagain
Last active December 31, 2019 00:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lagagain/1f981ca98093a7a111f60b7a7923e298 to your computer and use it in GitHub Desktop.
Save lagagain/1f981ca98093a7a111f60b7a7923e298 to your computer and use it in GitHub Desktop.
Implement Pipe function in Python3
class CallableWrapper:
def __init__(self, wrap):
self.wrap = wrap
def __call__(self, f=None, *args, **kargs):
if f == None:
return self.wrap
result = f(self.wrap, *args, **kargs)
class __callable(type(result), Callable):
pass
return __callable(result, *args, **kargs)
@staticmethod
def new(wrap, *args, **kargs):
class __callable(type(wrap), Callable):
pass
return __callable(wrap, *args, **kargs)
class CallableMeta(type):
def __call__(defineclz, *args, **kargs):
class __callableClass:
def __call__(self, f = None, *args, **kargs):
if f == None: return self
result = f(self, *args, **kargs)
return CallableWrapper.new(result)
class __defineclz(defineclz, __callableClass):
pass
instance = __defineclz.__new__(__defineclz, *args, **kargs)
__defineclz.__init__(instance, *args, **kargs)
return instance
class Callable(metaclass = CallableMeta):
pass
#''' Example
class CallableList(list, Callable):
def hello(self):
print("Hello World")
a = CallableList([1,2,3,4,5,6]) # => CallableList(a)
# print(callable(a)) # => True
# print(a) # => [1,2,3,4,5,6]
# print(a()) # => [1,2,3,4,5,6]
# print(a(sum)()) # => 21
#'''
print(CallableList([1,2,3,4,5,6])
(sum, 10)
(float)
())
print(callable(CallableWrapper([1,2,3,4,5])))
_ = CallableWrapper.new
def year(y:int): return y*(365.25)*24*60*60
def month(m:int): return m*30*24*60*60
def day(d:int): return d*24*60*60
def hour(h:int): return h*60*60
def minute(m:int): return m*60
def second(s:int): return s
_(2019)(year) + _(12)(month) + _(31)(day) # => 63748576800.0
from itertools import groupby
from functools import reduce
a = [1,2,3,4,5,6,7,8,9,]
def pipe(data, *process):
result = data
for proce in process:
result = proce(result)
return result
''' Example:
p = pipe(a, lambda x: map(lambda y:y**2, x),
lambda x: sorted(x, key=lambda y: y%2),
lambda x: groupby(x, key=lambda y: y%2),)
for g, d in p:
l = list(d)
s = sum(l)
print("{g}: {d}, sum:{s}".format(g=g, d=l, s=s))
'''
from functools import reduce
def pipe(d, *funs): return reduce(lambda d, f: f(d), funs, d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment