Skip to content

Instantly share code, notes, and snippets.

@DDoSolitary
Last active April 21, 2021 11:41
Show Gist options
  • Save DDoSolitary/0c79a61132e43c66a07e6570979d2883 to your computer and use it in GitHub Desktop.
Save DDoSolitary/0c79a61132e43c66a07e6570979d2883 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import asyncio
import copy
import random
import re
from abc import ABC
from argparse import ArgumentParser
from asyncio import AbstractEventLoop, StreamReader, StreamWriter
from asyncio.subprocess import Process
from collections.abc import Coroutine
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional
MIN_FLOOR = 1
MAX_FLOOR = 20
MAX_ID = 10000
PREQ_COUNT = 50
MAX_REQ_INTERVAL = 5
TEST_COUNT = 20
@dataclass
class TimedMessage(ABC):
time: Decimal
real_time: Decimal
@dataclass
class TimedRequest(TimedMessage, ABC):
def __repr__(self):
return f'[{self.real_time}][{self.time}]{self}'
@dataclass
class ModeRequest(TimedRequest):
mode: str
def __repr__(self):
return super().__repr__()
def __str__(self):
return self.mode
@dataclass
class PersonRequest(TimedRequest):
pid: int
from_floor: int
to_floor: int
def __repr__(self):
return super().__repr__()
def __str__(self):
return f'{self.pid}-FROM-{self.from_floor}-TO-{self.to_floor}'
@dataclass
class ElevatorRequest(TimedRequest):
eid: int
model: str
def __repr__(self):
return super().__repr__()
def __str__(self):
return f'ADD-{self.eid}-{self.model}'
@dataclass
class TimedResponse(TimedMessage, ABC):
eid: int
op: str
floor: int
def __repr__(self):
return f'[{self.real_time}][{self.time}]{self}'
@dataclass
class ElevatorResponse(TimedResponse):
def __repr__(self):
return super().__repr__()
def __str__(self):
return f'{self.op}-{self.floor}-{self.eid}'
@dataclass
class PersonResponse(TimedResponse):
pid: int
def __repr__(self):
return TimedResponse.__repr__(self)
def __str__(self):
return f'{self.op}-{self.pid}-{self.floor}-{self.eid}'
class TestError(Exception):
def __init__(self, msg: str):
super().__init__(msg)
class ElevatorState:
_eid: int
_stops: list[int]
_move_delay: Decimal
_capacity: int
_floor: int
_door_open: bool
_last_time: Optional[Decimal]
_last_real_time: Decimal
_cabin: dict[int, PersonRequest]
def __init__(self, eid: int, model: str, real_time: Decimal):
self._eid = eid
self._floor = 1
self._door_open = False
self._last_time = None
self._last_real_time = real_time
self._cabin = dict()
if model == 'A':
self._stops = list(range(1, 21))
self._move_delay = Decimal('0.6')
self._capacity = 8
elif model == 'B':
self._stops = list(i * 2 + 1 for i in range(10))
self._move_delay = Decimal('0.4')
self._capacity = 6
elif model == 'C':
self._stops = [1, 2, 3, 18, 19, 20]
self._move_delay = Decimal('0.2')
self._capacity = 4
else:
raise TestError("invalid elevator model")
def check_delay(self, delay: Decimal, new_time: Decimal, new_real_time: Decimal) -> None:
if self._last_time is None:
if new_real_time - self._last_real_time < delay:
raise TestError('delay too short (real time)')
elif new_time - self._last_time < delay:
raise TestError('delay too short (output time)')
self._last_time = new_time
self._last_real_time = new_real_time
def check_arrive(self, new_floor: int, time: Decimal, real_time: Decimal) -> None:
self.check_delay(self._move_delay, time, real_time)
if abs(new_floor - self._floor) != 1 or new_floor not in range(MIN_FLOOR, MAX_FLOOR + 1):
raise TestError('invalid new floor')
self._floor = new_floor
def check_open(self, floor: int, time: Decimal, real_time: Decimal) -> None:
self.check_delay(Decimal(0), time, real_time)
if floor != self._floor:
raise TestError('not at that floor')
if floor not in self._stops:
raise TestError('not allowed to stop here')
if self._door_open:
raise TestError('door already open')
self._door_open = True
def check_close(self, floor: int, time: Decimal, real_time: Decimal) -> None:
self.check_delay(Decimal('0.4'), time, real_time)
if floor != self._floor:
raise TestError('invalid floor')
if not self._door_open:
raise TestError('door already closed')
self._door_open = False
def check_in(self, floor: int, req: PersonRequest) -> None:
if floor != self._floor:
raise TestError('invalid floor')
if req.from_floor != floor:
raise TestError('request floor mismatch')
if req.pid in self._cabin:
raise TestError('request already in cabin')
if len(self._cabin) == self._capacity:
raise TestError('cabin full')
self._cabin[req.pid] = req
def check_out(self, floor: int, pid: int) -> PersonRequest:
if floor != self._floor:
raise TestError('invalid floor')
req: Optional[PersonRequest] = self._cabin.pop(pid)
if req is None:
raise TestError('request not in cabin')
return req
def check_final(self):
if self._cabin:
raise TestError('cabin not empty')
if self._door_open:
raise TestError('door still open')
class SystemState:
_pending_requests: dict[int, PersonRequest]
_elevators: dict[int, ElevatorState]
def __init__(self, real_time: Decimal):
self._pending_requests = dict()
self._elevators = dict()
self.add_elevator(1, 'A', real_time)
self.add_elevator(2, 'B', real_time)
self.add_elevator(3, 'C', real_time)
def add_pending(self, req: PersonRequest) -> None:
self._pending_requests[req.pid] = req
def add_elevator(self, eid: int, model: str, real_time: Decimal) -> None:
self._elevators[eid] = ElevatorState(eid, model, real_time)
def check_res(self, res: TimedResponse) -> None:
elevator: Optional[ElevatorState] = self._elevators.get(res.eid)
if elevator is None:
raise TestError('elevator not found')
if isinstance(res, ElevatorResponse):
if res.op == 'ARRIVE':
elevator.check_arrive(res.floor, res.time, res.real_time)
elif res.op == 'OPEN':
elevator.check_open(res.floor, res.time, res.real_time)
elif res.op == 'CLOSE':
elevator.check_close(res.floor, res.time, res.real_time)
else:
raise TestError('invalid op')
elif isinstance(res, PersonResponse):
if res.op == 'IN':
req: Optional[PersonRequest] = self._pending_requests.pop(res.pid)
if req is None:
raise TestError('request not found')
elevator.check_in(res.floor, req)
elif res.op == 'OUT':
req: PersonRequest = elevator.check_out(res.floor, res.pid)
if req.to_floor != res.floor:
req = copy.copy(req)
req.from_floor = res.floor
self.add_pending(req)
else:
raise TestError('invalid op')
else:
assert False
def check_final(self):
for elevator in self._elevators.values():
elevator.check_final()
if self._pending_requests:
raise TestError('pending requests not processed')
@dataclass
class TestResult:
err_msg: Optional[str]
responses: list[TimedResponse]
raw_responses: list[str]
loop: AbstractEventLoop = asyncio.get_event_loop()
def gen_requests() -> list[TimedRequest]:
mode: str = random.choice(('Random', 'Morning', 'Night'))
requests: list[TimedRequest] = [ModeRequest(time=Decimal(1), real_time=Decimal(0), mode=mode)]
current_time: Decimal = Decimal(1)
for pid in random.sample(range(MAX_ID), PREQ_COUNT):
if mode != 'Night':
max_interval: int = MAX_REQ_INTERVAL
if mode == 'MORNING' and max_interval > 2:
max_interval = 2
current_time += round(Decimal(random.uniform(0, max_interval)), 1)
from_floor: int
to_floor: int
if mode == 'Random':
from_floor = random.randrange(MIN_FLOOR, MAX_FLOOR + 1)
to_floor = random.randrange(MIN_FLOOR, MAX_FLOOR)
if to_floor >= from_floor:
to_floor += 1
elif mode == 'Morning':
from_floor = 1
to_floor = random.randrange(MIN_FLOOR + 1, MAX_FLOOR + 1)
elif mode == 'Night':
from_floor = random.randrange(MIN_FLOOR + 1, MAX_FLOOR + 1)
to_floor = 1
else:
assert False
req: PersonRequest = PersonRequest(
time=current_time,
real_time=Decimal(0),
pid=pid,
from_floor=from_floor,
to_floor=to_floor)
requests.append(req)
max_time: Decimal = requests[-1].time
ereq_count: int = random.randrange(3)
for eid in random.sample(range(4, MAX_ID), ereq_count):
req: ElevatorRequest = ElevatorRequest(
time=round(Decimal(random.uniform(1, float(max_time) + MAX_REQ_INTERVAL)), 1), # noqa
real_time=Decimal(0),
eid=eid,
model=random.choice(('A', 'B', 'C')))
requests.append(req)
requests.sort(key=lambda x: x.time)
return requests
def parse_requests(path: str) -> list[TimedRequest]:
requests: list[TimedRequest] = []
with open(path) as f:
while True:
line: str = f.readline()
if not line:
break
if not line.strip():
continue
match: re.Match = re.match(r'\[(.*)](.*)\n?', line)
time: Decimal = Decimal(match.group(1))
req_str = match.group(2)
if req_str in ('Random', 'Morning', 'Night'):
requests.append(ModeRequest(time=time, real_time=Decimal(0), mode=req_str))
elif req_str.startswith('ADD-'):
fields: list[str] = req_str.split('-')
requests.append(ElevatorRequest(time=time, real_time=Decimal(0), eid=int(fields[1]), model=fields[2]))
else:
fields: list[str] = req_str.split('-')
assert fields[1] == 'FROM' and fields[3] == 'TO'
requests.append(PersonRequest(
time=time,
real_time=Decimal(0),
pid=int(fields[0]),
from_floor=int(fields[2]),
to_floor=int(fields[4])))
return requests
def get_loop_time() -> Decimal:
return round(Decimal(loop.time()), 9) # noqa
def parse_response(line: str) -> TimedResponse:
try:
match: re.Match = re.match(r'\[(.*)](.*)', line)
fields: list[str] = match.group(2).split('-')
if fields[0] == 'IN' or fields[0] == 'OUT':
return PersonResponse(
time=Decimal(match.group(1)),
real_time=get_loop_time(),
op=fields[0],
pid=int(fields[1]),
floor=int(fields[2]),
eid=int(fields[3]))
else:
return ElevatorResponse(
time=Decimal(match.group(1)),
real_time=get_loop_time(),
op=fields[0],
floor=int(fields[1]),
eid=int(fields[2]))
except: # noqa
raise TestError(f'failed to parse response {line!r}')
async def write_requests(writer: StreamWriter, requests: list[TimedRequest], state: SystemState) -> None:
start_time: Decimal = get_loop_time()
try:
for req in requests:
await asyncio.sleep(float(start_time + req.time - get_loop_time()))
req.real_time = get_loop_time()
if isinstance(req, PersonRequest):
state.add_pending(req)
elif isinstance(req, ElevatorRequest):
state.add_elevator(req.eid, req.model, req.real_time)
writer.write(f'{req}\n'.encode())
await writer.drain()
writer.write_eof()
await writer.drain()
except ConnectionResetError:
pass
async def check_stdout(reader: StreamReader, state: SystemState) -> TestResult:
err_msg: Optional[str] = None
responses: list[TimedResponse] = []
raw_responses: list[str] = []
idx: int = 0
while True:
try:
idx += 1
line: bytes = await reader.readline()
if not line:
break
line_str: str = line.decode().strip()
raw_responses.append(line_str)
if err_msg is None:
res: TimedResponse = parse_response(line_str)
responses.append(res)
state.check_res(res)
except TestError as e: # noqa
err_msg = f'invalid response at line {idx}: {e}'
if err_msg is None:
try:
state.check_final()
except TestError as e:
err_msg = f'final check failed: {e}'
return TestResult(err_msg, responses, raw_responses)
async def collect_stderr(reader: StreamReader) -> list[str]:
lines: list[str] = []
while True:
line: bytes = await reader.readline()
if not line:
break
lines.append(line.decode())
return lines
async def wait_proc(proc: Process, timeout: Decimal) -> None:
try:
await asyncio.wait_for(proc.wait(), float(timeout))
except asyncio.TimeoutError:
proc.terminate()
async def do_test(idx: int, cmd: list[str], requests: list[TimedRequest], verbose: bool) -> None:
timeout: int = sum(1 if isinstance(req, PersonRequest) else 0 for req in requests) * 10
proc: Process = await asyncio.create_subprocess_exec(
cmd[0],
*cmd[1:],
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
start_time: Decimal = get_loop_time()
state: SystemState = SystemState(get_loop_time())
test_res: TestResult
stderr: list[str]
_, test_res, stderr, _ = await asyncio.gather(
write_requests(proc.stdin, requests, state),
check_stdout(proc.stdout, state),
collect_stderr(proc.stderr),
wait_proc(proc, Decimal(timeout)))
elapsed_time: Decimal = get_loop_time() - start_time
print(f'---TEST {idx} RESULT BEGIN---')
print(f'EXIT CODE: {proc.returncode}')
print(f'ELAPSED TIME: {elapsed_time}')
if stderr:
print('STDERR:')
for line in stderr:
print(line, end='')
if test_res.err_msg is not None:
print('ERROR:')
print(test_res.err_msg)
if verbose or proc.returncode != 0 or test_res.err_msg is not None:
print('REQUESTS:')
for req in requests:
print(repr(req))
print('RESPONSE:')
for res in test_res.responses:
print(repr(res))
print('RAW RESPONSES:')
for res in test_res.raw_responses:
print(res)
print(f'---TEST {idx} RESULT END---\n')
async def do_tests(cmd: list[str]) -> None:
test_tasks: list[Coroutine] = []
for i in range(TEST_COUNT):
mode: str
requests: list[TimedRequest] = gen_requests()
test_tasks.append(do_test(i, cmd, requests, False))
await asyncio.gather(*test_tasks)
def main() -> None:
parser: ArgumentParser = ArgumentParser()
parser.add_argument('--input', '-i')
parser.add_argument('cmd')
parser.add_argument('args', nargs='*')
args: argparse.Namespace = parser.parse_args()
requests: Optional[list[TimedRequest]] = \
parse_requests(args.input) if args.input is not None else None
cmd = [args.cmd] + args.args
try:
if requests is None:
loop.run_until_complete(do_tests(cmd))
else:
loop.run_until_complete(do_test(0, cmd, requests, True))
finally:
loop.close()
if __name__ == '__main__':
main()
# vim: ts=4:sw=4:noet
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment