Skip to content

Instantly share code, notes, and snippets.

@flohlen
Last active February 7, 2019 12:51
Show Gist options
  • Save flohlen/c9024d1bc278f20c9722fadc4e43bd36 to your computer and use it in GitHub Desktop.
Save flohlen/c9024d1bc278f20c9722fadc4e43bd36 to your computer and use it in GitHub Desktop.
from jobinterface import Job, JobException, TryAgainLaterException
import random
#
# signalisiert einen Underflow des Scheduler - nicht veraendern
#
class SchedulerUnderflowException(Exception):
pass
class Scheduler(object):
def __init__(self): # nicht veraendern
self.schlange = []
self.log = []
def neuerjob(self, job): #zu implementieren
assert isinstance(job, Job) == True, "Job ist kein Job."
self.schlange.append(job)
return job
def laufe(self, wieviele): # uzu implementieren
ret_vals = []
try:
assert type(wieviele) == int and wieviele > 0, "Wie viele ist kein int oder nicht groesser als 0."
for x in range(wieviele):
if len(self.schlange) < 1:
raise SchedulerUnderflowException
jbb = self.schlange.pop(0)
try:
jbb_val = jbb.start()
assert type(jbb_val) == int and jbb_val > 1, "Der Rueckgabewert ist kein int oder nicht positiv."
ret_vals.append(jbb_val)
self.log.append("LOG: Job " + str(jbb.get_id()) + " wurde erfolgreich ausgefuehrt.")
except TryAgainLaterException:
self.schlange.append(jbb)
self.log.append("LOG: Job " + str(jbb.get_id()) + " konnte nicht ausgefuehrt werden und wurde erneut zur Schlange hinzugefuegt.")
except:
self.log.append("LOG: Job " + str(jbb.get_id()) + " konnte nicht ausgefuehrt werden und wurde entfernt.")
except SchedulerUnderflowException:
ret_vals = []
finally:
return ret_vals
if __name__ == '__main__':
#
# Loesen Sie Aufgabenteil c) hier:
#
class BadTestException(JobException):
pass
class JBE1(Job):
def __init__(self):
super().__init__()
self.run = False
def start(self):
if self.run:
return random.randint(1,10)
else:
self.run = True
raise TryAgainLaterException
class JBE2(Job):
def start(self):
return random.randint(1,10)
class JBE3(Job):
def start(self):
return BadTestException
class JBE4(Job):
def start(self):
return "beeb boob"
#
# Loesen Sie Aufgabenteil d) hier:
#
test = Scheduler()
job_1 = JBE1()
job_2 = JBE2()
job_3 = JBE3()
job_4 = JBE4()
test.neuerjob(job_1)
test.neuerjob(job_2)
test.neuerjob(job_3)
print(test.laufe(1))
print(test.log)
# Ihren globalen Testcode koennen Sie hier platzieren:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment