Skip to content

Instantly share code, notes, and snippets.

@ryantanaka
Created July 22, 2021 22:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryantanaka/3e43c636958f972d19782172a1574306 to your computer and use it in GitHub Desktop.
Save ryantanaka/3e43c636958f972d19782172a1574306 to your computer and use it in GitHub Desktop.
possible implementation for pegasus-checkpoint
#!/bin/bash
for i in {1..3}
do
sleep 1
echo "$i .. doing some work ..., these are fake logs"
done
#!/usr/bin/env python3
import asyncio.subprocess
import datetime
import functools
import os
import struct
# enable debugging
os.environ["PYTHONASYNCIODEBUG"] = "1"
print("PID: {}".format(os.getpid()))
# interval in seconds at which we want to initiate saving/xfer of checkpoint
INTERVAL = 0
# create a named pipe so we can be "pinged" on when to initiat a saving/xfer
# of checkpoint
PIPE = "named_pipe"
os.mkfifo(PIPE)
# open the pipe, don't block
fifo = os.open(PIPE, os.O_RDONLY | os.O_NONBLOCK)
# do a tranfer
async def initiate_transfer():
print("--------> fake-pegasus-transfer starting {}".format(datetime.datetime.now()))
create = await asyncio.create_subprocess_exec(
"./fake-pegasus-transfer.sh",
stdout=asyncio.subprocess.PIPE
)
data = await create.communicate()
print(data[0].decode())
print("-------> PID: {} --> fake transfer done...".format(create.pid))
# task, which creates a transfer task, then reschedules itself to run in 'INTERVAL' seconds
# if transfer is in progress, should we start another one? or do nothing??
def add_reoccuring_transfer_task(end_time, loop):
print("add_reoccuring_transfer_task called: {}".format(datetime.datetime.now()))
if (loop.time() + INTERVAL) < end_time:
# schedule execution of coroutine 'initiate_transfer'
loop.create_task(initiate_transfer())
# set 'add_transfer_task' to be called again in 'INTERVAL' seconds
loop.call_later(
INTERVAL,
functools.partial(add_reoccuring_transfer_task, end_time, loop)
)
else:
loop.stop()
def handle_transfer_request(loop):
# data will be in pipe, read for msg
msg_size_bytes = os.read(fifo, 4)
msg_size = struct.unpack("<I", msg_size_bytes)[0]
msg = os.read(fifo, msg_size).decode("utf8")
print("got msg: {}, creating transfer task".format(msg))
# add 'initiate_transfer' task
loop.create_task(initiate_transfer())
# get a handle to the event loop so we can add to it/pass its reference
loop = asyncio.get_event_loop()
# add reader to monitor the fifo file descriptor for read availability
# 'handle_transfer_request' will be called whenever new data is added to the pipe
loop.add_reader(
fifo,
functools.partial(handle_transfer_request, loop)
)
# if interval is given, schedule saving/xfer of checkpoints at each interval
# and kill the event loop after some time (just for this example)
if INTERVAL > 0:
end_time = loop.time() + 25
loop.call_soon(add_reoccuring_transfer_task, end_time, loop)
# start up event loop
loop.run_forever()
loop.close()
os.remove(PIPE)
#!/usr/bin/env python3
import os
import struct
if __name__=="__main__":
pipe = "named_pipe"
# need to make sure that file is open on the other side so that we
# don't block the calling process indefinitely
fifo = os.open(pipe, os.O_WRONLY)
# create payload, can be anything
msg = "hello world".encode("utf-8")
msg_len = len(msg)
payload = struct.pack("<I", msg_len) + msg
# write to the pipe
os.write(fifo, payload)
# close the pipe
os.close(fifo)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment