Skip to content

Instantly share code, notes, and snippets.

@edhiley
Created March 17, 2013 22:00
Show Gist options
  • Save edhiley/5183834 to your computer and use it in GitHub Desktop.
Save edhiley/5183834 to your computer and use it in GitHub Desktop.
screening test python (first time)
Feature: scheduler screening test
Scenario: adding list tasks
Given we have added tasks to list
when the scheduler is run
then it will have valid tasks
Scenario: screening test
Given there are no tasks
when the scheduler is run
then it deals with no tasks
Given there are two tasks with no dependencies
when the scheduler is run
then it retains ordering
Given two tasks with dependencies
when the scheduler is run
then it orders two tasks by dependencies
Given four tasks with dependencies
when the scheduler is run
then it orders four tasks by dependencies
Given three tasks with dependencies
when the scheduler is run
then it orders three tasks by dependencies
Given tasks with cyclical dependencies
then it finds a cyclical dependency
from behave import *
from scheduler import Scheduler
@given('we have added tasks to list')
def step(context):
context.scheduler = Scheduler(['a', 'b'], {})
@when('the scheduler is run')
def step(context):
context.tasks = context.scheduler.run()
@then('it will have valid tasks')
def step(context):
count = len(context.scheduler.tasks)
assert count == 2, "expected two tasks, got %r" % count
name = context.tasks[0].name
assert name == "a", "expected task named a, got %r" % name
@given('there are no tasks')
def step(context):
context.scheduler = Scheduler([], {})
@then('it deals with no tasks')
def step(context):
assert len(context.tasks) == 0
@given('there are two tasks with no dependencies')
def step(context):
context.scheduler = Scheduler(['b', 'a'], {})
def join_tasks(tasks):
return ','.join(map((lambda task: task.name), tasks))
@then('it retains ordering')
def step(context):
assert join_tasks(context.tasks) == 'b,a'
@given('two tasks with dependencies')
def step(context):
context.scheduler = Scheduler(['a', 'b'], {'a': 'b'})
@then('it orders two tasks by dependencies')
def step(context):
assert join_tasks(context.tasks) == 'b,a'
@given('four tasks with dependencies')
def step(context):
context.scheduler = Scheduler(['a', 'b', 'c', 'd'], {'a': 'b', 'c': 'd'})
@then('it orders four tasks by dependencies')
def step(context):
assert join_tasks(context.tasks) == 'b,a,d,c'
@given('three tasks with dependencies')
def step(context):
context.scheduler = Scheduler(['a', 'b', 'c'], {'a': 'b', 'b': 'c'})
@then('it orders three tasks by dependencies')
def step(context):
assert join_tasks(context.tasks) == 'c,b,a'
@given('tasks with cyclical dependencies')
def step(context):
context.scheduler = Scheduler(['a','b','c','d'], {'a': 'b', 'b': 'c', 'c': 'a'})
@then('it finds a cyclical dependency')
def step(context):
try:
context.tasks = context.scheduler.run()
except Exception:
assert True
else:
assert False, "expected exception not thrown"

Scheduler

  1. Clone the repo to local dir

  2. Install required modules, e.g. Behave

  3. Run the BDD tool:

    $ behave
    
  4. That is all

from task import Task
class Scheduler(object):
def __init__(self, tasks, depends):
self.tasks = tasks
self.depends = depends
def create_tasks(self):
def find_depends(depend):
if depend == self.task:
return self.depends[depend]
def create_task(task):
self.task = task
depends_on = set(map(find_depends, self.depends))
return Task(task, depends_on)
return map(create_task, self.tasks)
def enqueue(self, task):
if self.is_enqueued(task.name):
return
for dependent_task in task.depends_on:
filtered_tasks = filter((lambda task: task.name == dependent_task), self.runables)
if len(filtered_tasks) > 0:
self.enqueue(filtered_tasks[0])
self.queue.append(task)
def is_enqueued(self, name):
if len(filter((lambda task: task.name == name), self.queue)) > 0:
return True
else:
return False
def run(self):
self.validate_dependencies()
self.runables = self.create_tasks()
self.queue = []
while len(self.runables) > 0:
self.enqueue(self.runables.pop(0))
return self.queue
def validate_dependencies(self):
for k, v in self.depends.items():
if self.depends.has_key(v):
self.validate_dependency(k, v)
def validate_dependency(self, parent_depends, current_depends):
if not self.depends.has_key(current_depends):
return
if self.depends[current_depends] == parent_depends:
raise Exception("The dependency of task {0} to {1} results in a cycle of doom".format(current_depends, parent_depends))
if current_depends is not None:
self.validate_dependency(parent_depends, self.depends[current_depends])
class Task(object):
def __init__(self, name, depends_on):
self.name = name
self.depends_on = depends_on
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment