Skip to content

Instantly share code, notes, and snippets.

@ask
Last active July 26, 2022 08:11
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ask/ea3e665f8a6f7568fa61323315fec39a to your computer and use it in GitHub Desktop.
Save ask/ea3e665f8a6f7568fa61323315fec39a to your computer and use it in GitHub Desktop.
from enum import Enum
import faust
from faust import cli
from faust import web
class State(Enum):
# State of a shipment.
ISSUED = 'ISSUED'
CREATED = 'CREATED'
STARTED = 'STARTED'
FINISHED = 'FINISHED'
ERROR = 'ERROR'
def is_final(self) -> bool:
# this is true when the shipment is completed,
# either due to being finished or due to an error.
return self in FINAL_STATES
#: Set of final states.
FINAL_STATES = frozenset({State.FINISHED, State.ERROR})
# This is how shipments are stored in our `shipments` table.
# It is also the model for creating a new shipment
# in our web view.
class Shipment(faust.Record):
id: str
name: str
state: State = State.ISSUED
def __eq__(self, other: 'Shipment') -> bool:
# If the state is used for identity, then a SetTable will contain
# duplicates for every state, so we define custom __eq__/__hash__
# methods.
return (self.id, self.name) == (other.id, other.name)
def __hash__(self) -> int:
return hash((self.id, self.name))
async def mark_as_active(self):
"""Add shipment to set of active shipments."""
await shipment_sets.manager.add(key='active', member=self)
async def mark_as_inactive(self):
"""Remove shipment from set of active shipments."""
await shipment_sets.manager.discard(key='active', member=self)
class ShipmentUpdate(faust.Record):
"""A shipment update is sent when we want to transition to a new state."""
shipment_id: str
new_state: State
app = faust.App('shipments', topic_partitions=4)
shipments_table = app.Table('shipment', value_type=Shipment)
shipment_sets = app.SetTable('shipments', start_manager=True)
views = web.Blueprint('shipments')
# HTTP endpoint to update shipment to new state.
# Go to http://localhost:6066/shipment/{id}/update/{status}/
#
# $ curl -X PUT http://localhost:6066/shipment/69174feb/update/FINISHED/
@views.route('/{shipment_id}/update/{status}/')
class ShipmentUpdateView(web.View):
async def put(self,
request: web.Request,
shipment_id: str,
status: str) -> web.Response:
try:
new_state = State(status)
except ValueError:
raise self.NotFound(f'Unknown state: {status!r}')
await update_shipment.send(
key=shipment_id,
value=ShipmentUpdate(
shipment_id=shipment_id,
new_state=new_state,
),
)
return self.json({'status': 'OK'})
# HTTP endpoint to show shipment status.
# Go to http://localhost:6066/shipment/{id}/
#
# $ curl -X GET http://localhost:6066/shipment/69174feb/
@views.route('/{shipment_id}/')
class ShipmentDetailView(web.View):
@app.table_route(table=shipments_table, match_info='shipment_id')
async def get(self,
request: web.Request,
shipment_id: str) -> web.Response:
return self.json(shipments_table[shipment_id])
@views.route('/')
class ShipmentListView(web.View):
"""HTTP endpoint to list active shipments.
Go to http://localhost:6066/shipment/
.. sourcecode:: console
$ curl -X GET http://localhost:6066/shipment/
Do a POST on this endpoint to create a new shipment:
.. sourcecode:: console
$ curl -X POST http://localhost:6066/shipment/ -d '{"name": "foo"}'
{"status": "success",
"shipment_id": "4391ce93-b1ea-403a-8e44-a511451f8722"}
"""
@app.table_route(table=shipment_sets, exact_key='active')
async def get(self, request: web.Request) -> web.Response:
return self.json(set(shipment_sets['active']))
async def post(self, request: web.Request) -> web.Response:
vars = await request.json()
name = vars.get('name')
id = faust.uuid()
await start_shipment.send(key=id, value=Shipment(id=id, name=name))
return self.json({'status': 'success', 'shipment_id': id})
app.web.blueprints.add('/shipment/', views)
@app.agent(value_type=Shipment)
async def start_shipment(shipments: faust.Stream[Shipment]) -> None:
async for shipment in shipments:
# now our shipment transitions to the STARTED state.
print(f'NEW SHIPMENT: {shipment!r}')
shipments_table[shipment.id] = shipment.derive(state=State.STARTED)
await shipment.mark_as_active()
@app.agent(value_type=ShipmentUpdate)
async def update_shipment(updates: faust.Stream[ShipmentUpdate]) -> None:
# transition shipment to new state.
async for update in updates:
shipment_id = update.shipment_id
new_state = State(update.new_state)
# Note how this operation is immutable!
# This is stored in a log so you will have a versioned
# history of shipment state.
shipment = shipments_table[shipment_id]
new_shipment = shipment.derive(state=new_state)
print(f'Shipment {shipment_id} {shipment.state} -> {new_state}')
# save the shipment in the table (K/V store) with the new state.
shipments_table[shipment.id] = new_shipment
if new_state.is_final():
await shipment.mark_as_inactive()
# Run the command:
# $ python shipments.py finish_shipment <shipment_id>
# to finish a sbipment from the command-line.
@app.command(
cli.option('--final-state', default=State.FINISHED.value),
cli.argument('shipment_id'))
async def finish_shipment(self: cli.AppCommand, shipment_id: str,
final_state: str) -> None:
assert id, 'please add --id for shipment id'
await update_shipment.send(
key=shipment_id,
value=ShipmentUpdate(
shipment_id=shipment_id,
new_state=State(final_state),
),
)
if __name__ == '__main__':
app.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment