Skip to content

Instantly share code, notes, and snippets.

@DocGarbanzo
Last active February 6, 2023 00:50
Show Gist options
  • Save DocGarbanzo/f80c0ce9a8984d9f5f8e857ffd1cb244 to your computer and use it in GitHub Desktop.
Save DocGarbanzo/f80c0ce9a8984d9f5f8e857ffd1cb244 to your computer and use it in GitHub Desktop.
Mock up of an alternative parts and car setup for donkeycar.

Mock up of alternative donkeycar parts and car infrastructure.

Here we share a global data bus which allows to remove the explicit input and output arguments for current parts when added to the Vehicle. Each part runs in its own thread with its own frequency. Parts can write to and read from the data bus asynchronously and there is no global vehicle loop any longer. There is some resemblance to ROS whereby parts subscribe to topics. There is currently no control about type safety or allowed types in the data bus but that could be added easily.

import math
import time
import collections
from threading import Thread
from threading import Lock
# Not really needed, but a structure to combine data, its type and a timestamp
DataStruct = collections.namedtuple('DataStruct', 'data_type data time_stamp')
class DataBus:
""" Single object in the car that shares all data between parts"""
def __init__(self):
self.data_store = {}
def write(self, data_name, data_type, data):
""" Write data into the bus"""
d = DataStruct(data_type=data_type, data=data, time_stamp=time.time())
# here we check types but could check more like types don't change, etc
assert type(data) is data_type, f'{type(data).__name__} does not ' \
f'match {data_type.__name__}'
# we just replace data, don't keep history, but we could also keep
# the last n entries and tag it with an additional counter which gets
# cycled
self.data_store[data_name] = d
def read(self, data_name):
""" Return current data entry, return None when nothing found but don't throw """
return self.data_store.get(data_name)
class Part(object):
""" Part base class, provides asynchronous threads with individual loop
frequencies """
def __init__(self, loop_time):
self.data_bus = None
self.loop_time = loop_time
self.t = Thread(target=self.update, args=())
self.t.daemon = True
self.lock = Lock()
self.last_time = time.time()
self.loop_count = 0
self.time_diff_total = 0.
print('Created part', type(self).__name__, 'with loop time',
self.loop_time)
def update(self):
""" Only needs to be implemented here """
assert self.data_bus, "Need to set data bus first"
while True:
# lock share resource
with self.lock:
self.read_from_bus()
# lock shared resource
with self.lock:
self.write_to_bus()
now = time.time()
time_diff = now - self.last_time
# mechanically delay loop to match expected loop time - this is
# not exact but approximate
if time_diff < self.loop_time:
time.sleep(self.loop_time - time_diff)
now = time.time()
self.time_diff_total += now - self.last_time
self.last_time = now
self.loop_count += 1
def read_from_bus(self):
pass
def write_to_bus(self):
pass
def start(self):
self.t.start()
def set_data_bus(self, data_bus):
self.data_bus = data_bus
def stop(self):
# just check how exact the timing was
print('Stopped part', type(self).__name__, 'with avg loop time',
self.time_diff_total / self.loop_count)
class Odom(Part):
""" Mock part to give sin speed in [0,1] and distance in ticks """
def __init__(self):
super().__init__(loop_time=0.05)
self.tick = 0.
def write_to_bus(self):
""" Implementation of base class interface """
# mock up speed and distance and write to bus
speed = 0.5 + 0.5 * math.sin(self.tick - math.pi/2)
self.data_bus.write('speed', float, speed)
self.data_bus.write('distance', float, self.tick)
self.tick += 0.01
class SpeedGauge(Part):
""" Mock part to read speed and distance """
def __init__(self):
super().__init__(loop_time=0.2)
def read_from_bus(self):
speed = self.data_bus.read('speed').data
dist = self.data_bus.read('distance').data
# Gauge display
print(f'Speed: {speed:.4f} distance: {dist:.4f} ')
class Car:
def __init__(self):
self.parts = []
# the ar owns the data bus
self.data_bus = DataBus()
def add_part(self, part):
# here we set the bus into the part, no need for the child classed to
# do that
part.set_data_bus(self.data_bus)
self.parts.append(part)
def start(self):
for part in self.parts:
part.start()
try:
while True:
pass
except KeyboardInterrupt:
pass
except Exception as e:
print(e)
finally:
print(f'Stopped car.')
self.stop()
def stop(self):
for part in self.parts:
part.stop()
if __name__ == '__main__':
# create the car and two parts
car = Car()
o = Odom()
s = SpeedGauge()
# We have to add data writing parts before reading parts. This could be
# changed but then reading parts would need some fall back to deal with
# missing data in initial loop(s).
car.add_part(o)
car.add_part(s)
car.start()
@tawnkramer
Copy link

neat!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment