Skip to content

Instantly share code, notes, and snippets.

@bijoythomas
Last active March 17, 2020 15:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bijoythomas/1d1544e01b1703c07709523e47fcd30b to your computer and use it in GitHub Desktop.
Save bijoythomas/1d1544e01b1703c07709523e47fcd30b to your computer and use it in GitHub Desktop.
Ramda like functions for Python
import re
from fn import F, recur
from datetime import datetime
from funcy import mapcat, walk_values, merge as funcymerge, group_by
# basic combinators
I = F(lambda x: x) # identity
K = F(lambda x: F(lambda y: x)) # constant
T = F(lambda x: F(lambda f: f(x))) # thrush
A = F(lambda f: F(lambda x: f(x))) # apply
S = F(lambda f: F(lambda g: F(lambda x: f(x)(g(x))))) # S combinator
def _partition_(pred, seq):
satisfy = []
remaining = []
for elem in seq:
if pred(elem):
satisfy.append(elem)
else:
remaining.append(elem)
return satisfy, remaining
append = F(lambda seq: F(lambda elem: concat(seq)([elem])))
partition = F(lambda pred: F(lambda seq: _partition_(pred, seq)))
complement = F(lambda fn: F(lambda x: not fn(x)))
is_empty = F(lambda seq: len(seq) == 0)
is_not_empty = complement(is_empty)
head = F(lambda seq: seq[0] if len(seq) > 0 else None)
tail = F(lambda seq: seq[1:])
init = F(lambda seq: seq[:-1])
last = F(lambda seq: seq[-1])
keys = F(lambda obj: obj.keys())
values = F(lambda obj: obj.values())
pick = F(lambda ks: F(lambda obj: {k: v for k, v in obj.items() if k in ks}))
omit = F(lambda ks: F(lambda obj: {k: v for k, v in obj.items() if k not in ks}))
get = F(lambda key: F(lambda obj: obj[key]))
get_or = F(lambda default: F(lambda key: F(lambda obj: obj.get(key, default))))
chain = F(lambda fn: F(mapcat, fn))
map = F(lambda fn:
F(lambda seq: foldl(lambda acc: lambda v: append(acc)(fn(v)))([])(seq)
)
)
filter = F(lambda native_filter: F(lambda pred: F(native_filter, pred)))(filter)
reject = F(lambda pred: F(lambda seq: filter(complement(pred))(seq)))
map_values = F(lambda fn: F(walk_values, fn))
groupby = F(lambda fn: F(group_by, fn))
merge = F(lambda obj: F(funcymerge, obj))
merge_all = F(lambda seq: foldl(merge)({})(seq))
inc = F(lambda n: n + 1)
dict_of = F(lambda key: F(lambda value: {key: value}))
is_odd = F(lambda n: n % 2 != 0)
to_lower = F(lambda string: string.lower())
to_upper = F(lambda string: string.upper())
equals = F(lambda a: F(lambda b: a == b))
equals_by = F(lambda fn: F(lambda a: F(lambda b: fn(a) == fn(b))))
zip = F(lambda native_zip: F(lambda seq1: F(native_zip, seq1)))(zip)
split = F(lambda sep: F(lambda s: s.split(sep)))
trim = F(lambda s: s.strip())
def __reduce_left__(fn, initial, seq):
tmp = initial
for s in seq:
tmp = fn(tmp)(s)
return tmp
def __fold_left__(fn, initial, seq):
return __reduce_left__(fn, initial, seq)
# the below code is logically correct but can cause stack overflows
# because of recursion
# if is_empty(seq):
# return initial
#
# return fn(__fold_left__(fn, initial, seq[:-1])) (seq[-1])
def __fold_right__(fn, initial, seq):
if is_empty(seq):
return initial
return fn(seq[0])(__fold_right__(fn, initial, seq[1:]))
f_reverse = F(lambda seq: seq[::-1])
foldl = F(lambda fn: F(lambda initial: F(lambda seq: __fold_left__(fn, initial, seq))))
foldr = F(lambda fn: F(lambda initial: F(lambda seq: __fold_right__(fn, initial, seq))))
re_split = F(lambda regex: F(lambda s: re.split(regex, s)))
concat = F(lambda seq1: F(lambda seq2: seq1 + seq2))
replace = F(lambda old: F(lambda new: F(lambda seq: seq.replace(old, new))))
length = F(lambda seq: len(seq))
prop_eq = F(lambda k: F(lambda v: F(lambda obj: k in obj and obj[k] == v)))
ascend = F(
lambda fn: F(
lambda a: F(
lambda b: fn(a) - fn(b))))
descend = F(
lambda fn: F(
lambda a: F(
lambda b: fn(b) - fn(a))))
pathEq = F(
lambda arr: F(
lambda v: F(
lambda obj: obj[arr[0]] == v if len(arr) == 1 else pathEq(tail(arr))(v)(obj[head(arr)]))))
gets = F(lambda ks: F(lambda obj: foldl(lambda acc: lambda k: concat(acc)([obj[k]]))([])(ks)))
join = F(lambda sep: F(lambda seq: sep.join(seq)))
find = F(lambda fn: F(lambda seq: next((i for i in seq if fn(i)), None)))
lens_prop = F(lambda k: F(lambda fn: F(lambda obj: merge(omit([k])(obj))({k: fn((obj[k]))}))))
lens_index = F(lambda idx: F(lambda f: F(lambda seq: seq[:idx] + [f(seq[idx])] + seq[idx+1:])))
over = F(lambda lens: F(lambda fn: F(lambda obj: lens(fn)(obj))))
zlfill = F(lambda n: F(lambda s: s.zfill(n)))
tap = F(lambda fn: F(lambda v: (fn(v) or True) and v))
ap = S
sort = F(lambda comparator: F(lambda seq: sorted(seq, cmp=comparator)))
sort_by = F(lambda ordinal_fn: F(lambda seq: sorted(seq, key=ordinal_fn)))
flip = F(lambda f: F(lambda b: F(lambda a: f(a)(b))))
fapply = F(lambda fn: F(lambda seq: foldl(lambda f: lambda e: f(e))(fn)(seq)))
def __for_each__(fn, seq):
for elem in seq:
fn(elem)
return seq
for_each = F(lambda fn: F(lambda seq: __for_each__(fn, seq)))
o = F(lambda f: F(lambda g: F(lambda x: f(g(x)))))
@recur.tco
def __aperture__(acc, seq, n):
if is_empty(seq):
return False, acc
if is_empty(acc):
return True, ([seq[0:n]], seq[n:], n)
rem = last(acc)[-(n-1):]
acc.append(rem + [head(seq)])
return True, (acc, tail(seq), n)
aperture = F(lambda n: F(lambda seq: __aperture__([], seq, n)))
to_pairs = F(lambda obj: [[k, v] for k, v in obj.items()])
from_pairs = F(lambda seq: {k: v for k, v in seq})
multi_group_by = F(lambda f: F(lambda g: F(lambda seq: (from_pairs << map(over(lens_index(1))(groupby(g))) << to_pairs << groupby(f))(seq))))
contains = F(lambda s: F(lambda seq: s in seq))
_or_ = F(lambda bool1: F(lambda bool2: bool1 or bool2))
_and_ = F(lambda bool1: F(lambda bool2: bool1 and bool2))
f_any = F(lambda pred: F(lambda seq: (foldl(_or_)(False) << map(pred))(seq)))
f_all = F(lambda pred: F(lambda seq: (foldl(_and_)(True) << map(pred))(seq)))
evolve = F(lambda t:
F(lambda obj:
foldl(lambda acc: lambda pair:
merge(acc)(over(lens_prop(pair[0]))(pair[1])(acc) if pair[0] in acc else {}))(obj)(to_pairs(t))))
repeat = F(lambda n: F(lambda s: [s for i in range(1, n+1)]))
intersperse = F(lambda s: F(lambda seq: foldl(lambda acc: lambda e: concat(acc)([e, s]))([])(seq)[:-1]))
intersperse_with = F(lambda f: F(lambda seq: concat(foldl(lambda acc: lambda pair: concat(acc)([pair[0], f(pair[0])(pair[1])]))([])(aperture(2)(seq)))([seq[-1]])))
add = F(lambda a: F(lambda b: a + b))
subtract = F(lambda a: F(lambda b: a - b))
multiply = F(lambda a: F(lambda b: a * b))
pluck = F(lambda k: F(lambda seq: map(get(k))(seq)))
compose = F(lambda *fns: F(lambda arg: foldr(A)(arg)(list(fns))))
def __split_when__(pred, seq):
matched = False
for idx, elem in enumerate(seq):
if pred(elem):
matched = True
break
return [seq[:idx], seq[idx:]] if matched else [seq, []]
split_when = F(lambda pred: F(lambda seq: __split_when__(pred, seq)))
between = F(lambda start: F(lambda end: F(lambda val: start <= val <= end)))
any_pass = F(lambda preds: F(lambda val: foldl(lambda acc: lambda pred: acc or pred(val))(False)(preds)))
all_pass = F(lambda preds: F(lambda val: foldl(lambda acc: lambda pred: acc and pred(val))(True)(preds)))
if_else = F(lambda cond: F(lambda on_true: F(lambda on_false: F(lambda v: on_true(v) if cond(v) else on_false(v)))))
zip_with = F(lambda f: F(lambda seq1: F(lambda seq2: map(fapply(f))(zip(seq1)(seq2)))))
converge = F(lambda f: F(lambda fns: F(lambda x: fapply(f)(map(T(x))(fns)))))
use_with = F(lambda f: F(lambda fns: F(lambda *vals: fapply(f)(zip_with(A)(fns)(list(vals))))))
def elapsed(fn):
def __helper__(*args):
start = datetime.now()
ret = apply(fn, args)
end = datetime.now()
print('Time taken for ' + fn.__name__ + ' => (hh:mm:ss.ms) {}'.format(end - start))
return ret
return __helper__
sum = F(lambda seq: foldl(add)(0)(seq))
unapply = F(lambda fn: F(lambda *args: fn(list(args))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment