Last active
February 7, 2019 12:51
-
-
Save flohlen/c9024d1bc278f20c9722fadc4e43bd36 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from jobinterface import Job, JobException, TryAgainLaterException | |
import random | |
# | |
# signalisiert einen Underflow des Scheduler - nicht veraendern | |
# | |
class SchedulerUnderflowException(Exception): | |
pass | |
class Scheduler(object): | |
def __init__(self): # nicht veraendern | |
self.schlange = [] | |
self.log = [] | |
def neuerjob(self, job): #zu implementieren | |
assert isinstance(job, Job) == True, "Job ist kein Job." | |
self.schlange.append(job) | |
return job | |
def laufe(self, wieviele): # uzu implementieren | |
ret_vals = [] | |
try: | |
assert type(wieviele) == int and wieviele > 0, "Wie viele ist kein int oder nicht groesser als 0." | |
for x in range(wieviele): | |
if len(self.schlange) < 1: | |
raise SchedulerUnderflowException | |
jbb = self.schlange.pop(0) | |
try: | |
jbb_val = jbb.start() | |
assert type(jbb_val) == int and jbb_val > 1, "Der Rueckgabewert ist kein int oder nicht positiv." | |
ret_vals.append(jbb_val) | |
self.log.append("LOG: Job " + str(jbb.get_id()) + " wurde erfolgreich ausgefuehrt.") | |
except TryAgainLaterException: | |
self.schlange.append(jbb) | |
self.log.append("LOG: Job " + str(jbb.get_id()) + " konnte nicht ausgefuehrt werden und wurde erneut zur Schlange hinzugefuegt.") | |
except: | |
self.log.append("LOG: Job " + str(jbb.get_id()) + " konnte nicht ausgefuehrt werden und wurde entfernt.") | |
except SchedulerUnderflowException: | |
ret_vals = [] | |
finally: | |
return ret_vals | |
if __name__ == '__main__': | |
# | |
# Loesen Sie Aufgabenteil c) hier: | |
# | |
class BadTestException(JobException): | |
pass | |
class JBE1(Job): | |
def __init__(self): | |
super().__init__() | |
self.run = False | |
def start(self): | |
if self.run: | |
return random.randint(1,10) | |
else: | |
self.run = True | |
raise TryAgainLaterException | |
class JBE2(Job): | |
def start(self): | |
return random.randint(1,10) | |
class JBE3(Job): | |
def start(self): | |
return BadTestException | |
class JBE4(Job): | |
def start(self): | |
return "beeb boob" | |
# | |
# Loesen Sie Aufgabenteil d) hier: | |
# | |
test = Scheduler() | |
job_1 = JBE1() | |
job_2 = JBE2() | |
job_3 = JBE3() | |
job_4 = JBE4() | |
test.neuerjob(job_1) | |
test.neuerjob(job_2) | |
test.neuerjob(job_3) | |
print(test.laufe(1)) | |
print(test.log) | |
# Ihren globalen Testcode koennen Sie hier platzieren: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment