Skip to content

Instantly share code, notes, and snippets.

@ekzhang
Last active January 29, 2024 22:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ekzhang/65310f0260c03d4ec340e0daf0ccb9d5 to your computer and use it in GitHub Desktop.
Save ekzhang/65310f0260c03d4ec340e0daf0ccb9d5 to your computer and use it in GitHub Desktop.
Python runtime library bindings for Maelstrom (https://github.com/jepsen-io/maelstrom), starter files used by Harvard Systems Reading Group — This library has been merged upstream

Gossip Glomers

Welcome! You can create your first file with the following command:

cp echo.py YOURNAME-echo.py

Then run the simulator environment with:

maelstrom test -w echo --bin YOURNAME-echo.py --node-count 1 --time-limit 10

You can now follow the rest of the instructions; just remember to replace the --bin with the path to your Python file for any commands in the documentation.

This folder is shared, so please prefix all files you create with your name as a courtesy to other users. Thank you!

"""Library for writing programs that interact with the maelstrom protocol.
See <https://github.com/jepsen-io/maelstrom> for reference. This file follows
the protocol exactly as given there.
Please do not edit this file.
"""
import asyncio
import copy
import json
from dataclasses import dataclass
from enum import IntEnum
from functools import wraps
from typing import Any, Awaitable, Callable, Optional, TypeAlias
import aiofiles
import uvloop
Body: TypeAlias = dict[str, Any]
@dataclass
class Request:
src: str # A string identifying the node this message came from
dest: str # A string identifying the node this message is to
body: Body # An object: the payload of the message
class Error(IntEnum):
"""Numeric error codes returned by Maelstrom in messages of type "error".
See <https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md> for
documentation on the protocol and a table of error codes.
"""
TIMEOUT = 0
NODE_NOT_FOUND = 1
NOT_SUPPORTED = 10
TEMPORARILY_UNAVAILABLE = 11
MALFORMED_REQUEST = 12
CRASH = 13
ABORT = 14
KEY_DOES_NOT_EXIST = 20
KEY_ALREADY_EXISTS = 21
PRECONDITION_FAILED = 22
TXN_CONFLICT = 30
def is_definite(self) -> bool:
"""Returns whether an error code is definite.
Errors are either definite or indefinite. A definite error means that
the requested operation definitely did not (and never will) happen. An
indefinite error means that the operation might have happened, or might
never happen, or might happen at some later time. Maelstrom uses this
information to interpret histories correctly, so it's important that
you never return a definite error under indefinite conditions.
When in doubt, indefinite is always safe. Custom error codes are always
indefinite.
"""
return self not in [Error.TIMEOUT, Error.CRASH]
class Node:
"""An application node within the distributed system."""
_handlers: dict[str, Callable[[Request], Awaitable[Body]]] # RPC handlers
_next_id: int # Monotonically increasing identifier
_stdout_lock: asyncio.Lock # Lock on stdout
_stderr_lock: asyncio.Lock # Lock on stderr
_tasks: set # Background tasks
_reply_handlers: dict[int, asyncio.Future]
node_id: str # Read-only, received from the init message.
node_ids: list[str] # Read-only, received from the init message.
def __init__(self) -> None:
self._handlers = {}
self._next_id = 1
self._stdout_lock = asyncio.Lock()
self._stderr_lock = asyncio.Lock()
self._tasks = set()
self._reply_handlers = {}
def handler(self, f: Callable[[Request], Awaitable[Body]]):
"""Decorator used to wrap a function that handles requests."""
self._handlers[f.__name__] = f
return f
async def rpc(self, dest: str, body: Body) -> Body:
"""Make an RPC call, blocking until a response is received."""
msg_id = body["msg_id"] = self._next_id
self._next_id += 1
fut = asyncio.get_running_loop().create_future()
self._reply_handlers[msg_id] = fut
req = Request(self.node_id, dest, body)
await self._send(req)
try:
async with asyncio.timeout(1.0): # Timeout RPCs after 1 second.
return await fut
except TimeoutError:
await self.log(f"[node] request timed out: {json.dumps(body)}")
return {
"type": "error",
"in_reply_to": msg_id,
"code": Error.TIMEOUT,
"text": "RPC request timed out",
}
finally:
self._reply_handlers.pop(msg_id, None)
async def log(self, *msg: Any) -> None:
"""Print a message to the standard error log."""
async with self._stderr_lock:
await aiofiles.stderr.write(" ".join(str(x) for x in msg) + "\n")
await aiofiles.stderr.flush()
def spawn(self, coro: Awaitable[None]) -> asyncio.Task:
"""Spawn a background task on the node's executor."""
task = asyncio.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
return task
async def _recv(self) -> Request:
line = (await aiofiles.stdin.readline()).strip()
if not line:
raise EOFError()
data = json.loads(line)
return Request(data["src"], data["dest"], data["body"])
async def _send(self, req: Request) -> None:
serialized = json.dumps({"src": req.src, "dest": req.dest, "body": req.body})
async with self._stdout_lock:
await aiofiles.stdout.write(serialized + "\n")
await aiofiles.stdout.flush()
async def _run(self, init: Optional[Callable[[], None]]) -> None:
req = await self._recv()
assert req.body["type"] == "init" # first message should be init
self.node_id = req.body["node_id"]
self.node_ids = req.body["node_ids"]
resp_body = {"type": "init_ok", "in_reply_to": req.body["msg_id"]}
await self._send(Request(self.node_id, req.src, resp_body))
if init is not None:
init()
while True:
try:
req = await self._recv()
except EOFError:
await self.log("[node] finishing execution")
for t in self._tasks:
t.cancel()
return # exiting node
if req.body.get("in_reply_to"):
reply_id = req.body["in_reply_to"]
fut = self._reply_handlers.pop(reply_id, None)
# If there's no handler, this might be a duplicate message--we'll quietly
# ignore it.
if fut is not None:
try:
fut.set_result(req.body)
except asyncio.InvalidStateError:
pass
continue
async def thunk(req: Request):
message_type = req.body["type"]
if message_type in self._handlers:
callback = self._handlers[message_type]
resp_body = await callback(copy.deepcopy(req))
else:
resp_body = {
"type": "error",
"code": Error.NOT_SUPPORTED,
"text": "RPC type is not supported",
}
resp_body["in_reply_to"] = req.body["msg_id"]
await self._send(Request(self.node_id, req.src, resp_body))
self.spawn(thunk(req))
def run(self, init: Optional[Callable[[], None]] = None) -> None:
"""Run the node, optionally with a function on startup."""
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
runner.run(self._run(init))
#!/usr/bin/python3
from _maelstrom import Node, Body, Request
node = Node()
@node.handler
async def echo(req: Request) -> Body:
return {"type": "echo_ok", "echo": req.body["echo"]}
node.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment