Skip to content

Instantly share code, notes, and snippets.

@nikicat
Created July 8, 2014 09:09
Show Gist options
  • Save nikicat/be2bbc889d7e3555d7d4 to your computer and use it in GitHub Desktop.
Save nikicat/be2bbc889d7e3555d7d4 to your computer and use it in GitHub Desktop.
gns prototype
import storage # storage interface
class Job:
"""Represents Job - some code to run
Could be used from different processes
to control job execution and status
"""
def __init__(
self,
version: '0123456789abcdefefdcba9876543210',
module: 'somepkg.somemod',
func: 'megafunction',
args: '{"arg1": 123, "var2": "blah"}',
wakeupat=0,
):
self._version = version
self._module = module
self._func = func
self._args = args
self._status = NEW
self._id = None
self._exception = None
self._retcode = None
self._wakeupat = wakeupat
self._cont = None
def __getstate__(self):
"""returns serialized representation"""
pass
def __setstate__(self, state):
"""load from serialized representation"""
pass
def run(self):
"""runs job continulet until it finishes
dumps state after each step
"""
while True:
if self._status == NEW:
monkey_patch_sys()
monkey_patch_sleep()
moneky_patch_socket()
sys.path.append(get_path(self._version))
module = import_module(self._module)
function = getattr(module, function)
def trampoline():
retval = function(**self.kwargs)
self._cont = continulet(trampoline)
self._status = SLEEPING
elif self._status == RUNNING:
try:
self._status, = self._cont.switch(self._exception)
except Exception as e:
self._exception = e
self._status = EXCEPTION
elif self._status == SLEEPING:
# FIXME: here should be something more intelligent...
time.sleep(time.time() - self._wakeupat)
self._status = RUNNING
storage.savejob(self)
def wait(self, timeout):
with self.lock(timeout):
storage.loadjob(self)
def stop_job(jobid, timeout=5):
"""sets special flag"""
storage.send(jobid, STOP)
storage.wait(self, timeout)
def create_job(**kwargs):
job = Job(**kwargs)
jobid = storage.savejob(job)
return jobid
def process_jobs():
while True:
for jobid in storage.getjobs():
process_job(jobid)
def process_job(jobid):
with storage.lockjob(jobid, timeout=0):
job = storage.loadjob(jobid)
job.run()
storage.removejob(jobid) # removes job to garbage (optional)
def gc_thread(self):
while True:
for jobid in storage.getremovedjobs():
if job_is_too_old(job):
storage.erasejob(jobid)
def monkey_patch_sleep():
def sleep(timeout):
exc = continulet.switch((SLEEPING, time.time() + timeout))
if exc is not None:
raise exc
time.sleep = sleep
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment