Skip to content

Instantly share code, notes, and snippets.

@void4
Created April 18, 2018 21:00
Show Gist options
  • Save void4/c548e1b51a1b220ee32faf9a8ae72cdb to your computer and use it in GitHub Desktop.
Save void4/c548e1b51a1b220ee32faf9a8ae72cdb to your computer and use it in GitHub Desktop.
import types
import time
import datetime
# Defines a list of allowed/forwarded requests. Exists once per (child) process. Can be changed dynamically.
class Policy:
def __init__(self, white=""):
self.white = white.split(";")
def filter(self, request):
if request[0] in self.white:
return True
else:
return False
# Processes can be nested recursively. Typically, only the outermost process acts as a provider of external information
class Process:
def __init__(self, children=[], policy=None, isprovider=False):
self.children = children
self.policy = policy
self.isprovider = isprovider
def run(self):
print(id(self))
if self.children:
co = self.children[0].run()
request = next(co)
while True:
time.sleep(.1)
if request is None:
break
elif request[0] == "result":
print("Result: ", request)
break
elif self.policy and self.policy.filter(request):
if self.isprovider:
if request[0] == "/time":
request = co.send(datetime.datetime.now().timetuple().tm_yday)
else:
print("Unknown request: ", request)
break
else:
request = co.send((yield request))
else:
print("Blacklisted: ", request)
break
yield
# This process calculates the day of the year plus one
# To accomplish this, it needs to know what the current day is
# which is external information. The requests travels up the hierarchy
class DayPlusOne(Process):
def run(self, policy=None):
while True:
result = yield ["/time"]
result = yield ["result", result + 1]
# Here, all parent processes have whitelisted the request, so DayPlusOne can return the correct result
p = Process([Process([DayPlusOne()], Policy("/time"))], Policy("/time"), True)
next(p.run())
# Here, the outermost process has blacklisted access to the time endpoint. The request fails.
p = Process([Process([DayPlusOne()], Policy("/time"))], Policy(""), True)
next(p.run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment