Created
July 25, 2015 17:25
-
-
Save radzhome/7de73dd3c61718413da2 to your computer and use it in GitHub Desktop.
An elevator sim, next steps add multiple elevators, use rabbitmq?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""single elevator shaft simulation, so queues are all in 1 elevator """ | |
import time | |
import random | |
AVAILABLE_FLOORS = [-1, 0, 1, 2, 3, 4, 5, 6, 7] # 0 = basement 1 (B1), -1 = B2 | |
FIRST_FLOOR = AVAILABLE_FLOORS[0] | |
LAST_FLOOR = AVAILABLE_FLOORS[-1] | |
AVAILABLE_DIRECTIONS = ['up', 'down'] | |
DIR_UP = AVAILABLE_DIRECTIONS[0] | |
DIR_DOWN = AVAILABLE_DIRECTIONS[1] | |
class Elevator: | |
def __init__(self): | |
self.cur_floor = 1 | |
self.int_queues = {DIR_UP: [], DIR_DOWN: []} # stores requested floor stops | |
self.cur_dir = DIR_UP | |
@property | |
def not_cur_dir(self): | |
if self.cur_dir == DIR_UP: | |
return DIR_DOWN | |
return DIR_UP | |
@staticmethod | |
def _open_door(): | |
print " door is open" | |
@staticmethod | |
def _close_door(): | |
print " door now closed" | |
def _add_floor_request(self, request_floor): | |
if request_floor > self.cur_floor: | |
self.int_queues[DIR_UP].append(request_floor) | |
else: | |
self.int_queues[DIR_DOWN].append(request_floor) | |
self._prioritize_queues() | |
print " queue is now %s" % self.int_queues | |
@staticmethod | |
def _floor_request_prompt(): | |
f = raw_input("ENTER request floor, null when done: ") | |
if not f: | |
print " no input, no more requests curerntly." | |
return f | |
def new_floor_request(self): | |
request_floor = self._floor_request_prompt() | |
while request_floor: | |
try: | |
request_floor = int(request_floor) | |
except ValueError: | |
pass | |
if request_floor in AVAILABLE_FLOORS and request_floor != self.cur_floor: | |
print " adding floor %s" % request_floor | |
self._add_floor_request(request_floor) | |
else: | |
print "That was not a valid floor." | |
request_floor = self._floor_request_prompt() | |
def _toggle_direction(self): | |
self.cur_dir = self.not_cur_dir | |
def _move(self): | |
# go up/down a floor if possible | |
if self.cur_dir == DIR_UP: | |
if self.cur_floor != LAST_FLOOR: | |
self.cur_floor += 1 | |
elif self.cur_dir == DIR_DOWN: # down | |
if self.cur_floor != FIRST_FLOOR: | |
self.cur_floor -= 1 | |
time.sleep(1) # simulate time to move | |
print "CONTINUING in direction %s, cur floor is now %s" % (self.cur_dir, self.cur_floor) | |
def _determine_action(self): | |
# queue and direction control, checks next action or switches queues, if both empty null dir | |
if self.int_queues.get(self.cur_dir): | |
pass | |
else: | |
print "CHECKING if other direction queue (%s) has requests" % self.not_cur_dir | |
if self.int_queues.get(self.not_cur_dir): | |
print " toggling direction, other queue has floors %s" % self.int_queues | |
self._toggle_direction() | |
else: | |
self.cur_dir = None | |
def serve(self): | |
self.new_floor_request() # always check for new floors | |
self._determine_action() # check for command & set direction | |
self._move() # iterate through floors | |
# stop at a floor or idle | |
if self.cur_dir and self.cur_floor in self.int_queues[self.cur_dir]: | |
self._floor_stop() | |
elif not self.cur_dir: | |
print ">>IDLING on floor %s" % self.cur_floor | |
def _prioritize_queues(self): | |
self.int_queues[DIR_UP] = list(set(self.int_queues['up'])) # rem dups | |
self.int_queues[DIR_DOWN] = list(set(self.int_queues['down'])) | |
self.int_queues[DIR_UP].sort(reverse=True) # sort | |
self.int_queues[DIR_DOWN].sort(reverse=False) | |
def _floor_stop(self): | |
print ">>STOPPED, arrived at floor %s" % self.cur_floor | |
self._open_door() | |
self.int_queues[self.cur_dir].pop() # set new floor | |
self._close_door() | |
e = Elevator() | |
while True: | |
e.serve() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment