Skip to content

Instantly share code, notes, and snippets.

@radzhome
Created July 25, 2015 17:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save radzhome/7de73dd3c61718413da2 to your computer and use it in GitHub Desktop.
Save radzhome/7de73dd3c61718413da2 to your computer and use it in GitHub Desktop.
An elevator sim, next steps add multiple elevators, use rabbitmq?
"""single elevator shaft simulation, so queues are all in 1 elevator """
import time
import random
AVAILABLE_FLOORS = [-1, 0, 1, 2, 3, 4, 5, 6, 7] # 0 = basement 1 (B1), -1 = B2
FIRST_FLOOR = AVAILABLE_FLOORS[0]
LAST_FLOOR = AVAILABLE_FLOORS[-1]
AVAILABLE_DIRECTIONS = ['up', 'down']
DIR_UP = AVAILABLE_DIRECTIONS[0]
DIR_DOWN = AVAILABLE_DIRECTIONS[1]
class Elevator:
def __init__(self):
self.cur_floor = 1
self.int_queues = {DIR_UP: [], DIR_DOWN: []} # stores requested floor stops
self.cur_dir = DIR_UP
@property
def not_cur_dir(self):
if self.cur_dir == DIR_UP:
return DIR_DOWN
return DIR_UP
@staticmethod
def _open_door():
print " door is open"
@staticmethod
def _close_door():
print " door now closed"
def _add_floor_request(self, request_floor):
if request_floor > self.cur_floor:
self.int_queues[DIR_UP].append(request_floor)
else:
self.int_queues[DIR_DOWN].append(request_floor)
self._prioritize_queues()
print " queue is now %s" % self.int_queues
@staticmethod
def _floor_request_prompt():
f = raw_input("ENTER request floor, null when done: ")
if not f:
print " no input, no more requests curerntly."
return f
def new_floor_request(self):
request_floor = self._floor_request_prompt()
while request_floor:
try:
request_floor = int(request_floor)
except ValueError:
pass
if request_floor in AVAILABLE_FLOORS and request_floor != self.cur_floor:
print " adding floor %s" % request_floor
self._add_floor_request(request_floor)
else:
print "That was not a valid floor."
request_floor = self._floor_request_prompt()
def _toggle_direction(self):
self.cur_dir = self.not_cur_dir
def _move(self):
# go up/down a floor if possible
if self.cur_dir == DIR_UP:
if self.cur_floor != LAST_FLOOR:
self.cur_floor += 1
elif self.cur_dir == DIR_DOWN: # down
if self.cur_floor != FIRST_FLOOR:
self.cur_floor -= 1
time.sleep(1) # simulate time to move
print "CONTINUING in direction %s, cur floor is now %s" % (self.cur_dir, self.cur_floor)
def _determine_action(self):
# queue and direction control, checks next action or switches queues, if both empty null dir
if self.int_queues.get(self.cur_dir):
pass
else:
print "CHECKING if other direction queue (%s) has requests" % self.not_cur_dir
if self.int_queues.get(self.not_cur_dir):
print " toggling direction, other queue has floors %s" % self.int_queues
self._toggle_direction()
else:
self.cur_dir = None
def serve(self):
self.new_floor_request() # always check for new floors
self._determine_action() # check for command & set direction
self._move() # iterate through floors
# stop at a floor or idle
if self.cur_dir and self.cur_floor in self.int_queues[self.cur_dir]:
self._floor_stop()
elif not self.cur_dir:
print ">>IDLING on floor %s" % self.cur_floor
def _prioritize_queues(self):
self.int_queues[DIR_UP] = list(set(self.int_queues['up'])) # rem dups
self.int_queues[DIR_DOWN] = list(set(self.int_queues['down']))
self.int_queues[DIR_UP].sort(reverse=True) # sort
self.int_queues[DIR_DOWN].sort(reverse=False)
def _floor_stop(self):
print ">>STOPPED, arrived at floor %s" % self.cur_floor
self._open_door()
self.int_queues[self.cur_dir].pop() # set new floor
self._close_door()
e = Elevator()
while True:
e.serve()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment