Skip to content

Instantly share code, notes, and snippets.

@alaiacano
Last active September 5, 2021 10:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alaiacano/35a63e7631b60641fc0342f31461b80d to your computer and use it in GitHub Desktop.
Save alaiacano/35a63e7631b60641fc0342f31461b80d to your computer and use it in GitHub Desktop.
from abc import ABC, abstractmethod
from ll import LinkedList
class Task(ABC):
def __init__(self, name: str, lst: LinkedList):
self._name = name
self._lst: LinkedList = lst
@abstractmethod
def execute(self, *args, **kwargs):
"""
This method must be implemented by any Task subclass to carry out it's job-specific logic, such as printing
or reversing.
"""
pass
@property
def name(self) -> str:
return self._name
@property
def linked_list(self) -> LinkedList:
return self._lst
# Modified from the solution provided at https://www.geeksforgeeks.org/reverse-a-linked-list/
from dataclasses import dataclass
from typing import Any, Optional, Type
@dataclass
class Node:
value: int
data: Optional[Type["Node"]] = None
class LinkedList:
def __init__(self):
self.head: Optional[Node] = None
self.next: Optional[Node] = None
def reverse(self):
"""
The business! Reverses the linked list.
"""
prev: Optional[Node] = None
current: Node = self.head
while current is not None:
next = current.next
current.next = prev
prev = current
current = next
self.head = prev
def push(self, v):
"""
Adds a new node to the head of the linked list.
1. Makes a new Node
2. Sets the current head to the tail of the new node
3. Sets self.head to the new node.
"""
new_node = Node(v)
new_node.next = self.head
self.head = new_node
def __repr__(self):
"""
String representation of the list, from head to tail.
"""
values = []
node = self.head
while node is not None:
values.append(str(node.value))
node = node.next
return ", ".join(values)
import logging
import yaml
from typing import List, Optional
from tasks import task_factory, Task
class Parser:
"""
Parses the pipeline yaml into a list of TaksDefinitions, and provides methods to describe and ultimately execute the pipeline.
"""
def __init__(self, path: str):
self.pipeline: dict = yaml.safe_load(open(path))
self.task_definitions: List[TaskDefinition] = []
def parse(self):
api_version = self.pipeline["apiVersion"]
if api_version != 1:
raise ValueError("I only know API v1")
if "tasks" not in self.pipeline.keys():
raise ValueError("malformed yaml - you need an array of tasks")
# parse the task definitions
self.task_definitions = [
TaskDefinition.from_dict(t) for t in self.pipeline["tasks"]
]
def describe(self):
if len(self.task_definitions) == 0:
print("No tasks parsed yet")
return
print("~~ Tasks that will execute ~~")
for t in self.task_definitions:
print(f"{t.action} - {t.name} - {t.params}")
def run(self):
prev_task: Optional[Task] = None
for task_def in self.task_definitions:
prev_ll: LinkedList = prev_task.linked_list if prev_task is not None else LinkedList()
task: Task = task_factory(task_def.action, task_def.name, prev_ll)
logging.info("~~~")
task.execute(**task_def.params)
prev_task = task
class Parser2:
def __init__(self, path):
self.pipeline: dict = yaml.safe_load(open(path))
self.task_definitions: Dict[int, TaskDefinition] = {}
self.graph: Dict[int, List[TaskDefinition]] = defaultdict(list)
def run_task(self, task_id: Optional[int], input_ll: LinkedList, visited: Set[int]):
visited.add(task_id)
# run the task
task_definition = self.task_definitions[task_id]
logging.info(f"Visiting task {task_definition.id}")
parent_task = task_factory(
task_definition.action, task_definition.name, input_ll
)
parent_task.execute(**task_definition.params)
# If the task has children that have not yet been executed, run those recursively.
for child_task in self.graph[task_id]:
if child_task.id not in visited:
# kinda gross here, but python only does pass-by-reference and we need to be able to mutate
# the LinkedList within each task, so we deepcopy.
ll = copy.deepcopy(parent_task.linked_list)
self.run_task(child_task.id, ll, visited)
def run(self):
first_task_id = self.graph[None][0].id
self.run_task(first_task_id, LinkedList(), set([]))
apiVersion: 1
tasks:
- name: "populate the list"
action: push_values
config:
elements:
- 1
- 2
- 3
- 4
- name: "print the list"
action: print_list
- name: "reverse the list"
action: reverse_list
- name: "print the reversed list"
action: print_list
apiVersion: 2
tasks:
- name: "populate the list"
id: 1
action: push_values
config:
elements:
- 1
- 2
- 3
- 4
demo_sleep_time: 1
- name: "print the list"
action: print_list
id: 2
parent: 1
config:
demo_sleep_time: 4
- name: "reverse the list"
action: reverse_list
id: 3
parent: 1
config:
demo_sleep_time: 2
- name: "reverse the list a second time"
action: reverse_list
id: 5
parent: 3
config:
demo_sleep_time: 2
- name: "print the double reversed list"
action: print_list
id: 6
parent: 5
config:
demo_sleep_time: 2
- name: "print the single reversed list"
action: print_list
id: 4
parent: 3
config:
demo_sleep_time: 1
from ll import LinkedList
if __name__ == "__main__":
ll = LinkedList()
ll.push(1)
ll.push(2)
ll.push(3)
ll.push(4)
print(f"The list is: {ll}")
# The list is: 4, 3, 2, 1
ll.reverse()
print(f"The reversed list is {ll}")
# Tthe list is: 1, 2, 3, 4
from dataclasses import dataclass, field
@dataclass
class TaskDefinition:
"""
Converts a task from the yaml config into a dataclass.
Required fields are `name` and `action`. All other key -> value pairs listed in the yaml file will be passed to a `params` dict.
"""
name: str
action: str
params: dict = field(default_factory=dict)
@staticmethod
def from_dict(d: dict):
if "name" not in d.keys():
raise ValueError("task is missing a required 'name' field")
if "action" not in d.keys():
raise ValueError("task is missing a required 'action' field")
if "config" in d.keys():
params = {k: v for (k, v) in d["config"].items()}
else:
params = {}
return TaskDefinition(name=d["name"], action=d["action"], params=params)
def task_factory(action: str, name: str, lst: LinkedList):
"""
For the given `action`, produces the appropriate subclass of `Task`.
"""
if action == "push_values":
return PushValuesTask(name, lst)
if action == "print_list":
return PrintListTask(name, lst)
if action == "reverse_list":
return ReverseListTask(name, lst)
raise ValueError(f"Unknown action: {name}")
import logging
class PushValuesTask(Task):
def execute(self, *args, **kwargs):
logging.info(f"Populating the list with: {kwargs['elements']}")
for v in kwargs["elements"]:
self._lst.push(v)
class ReverseListTask(Task):
def execute(self, *args, **kwargs):
logging.info("Reversing the list")
self._lst = ReverseListTask.reverse(self._lst)
@staticmethod
def reverse(lst: LinkedList):
prev: Optional[Node] = None
current: Node = lst.head
while current is not None:
next = current.next
current.next = prev
prev = current
current = next
lst.head = prev
return lst
class PrintListTask(Task):
def execute(self, *args, **kwargs):
print("the list is")
print(self._lst)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment