Created
July 22, 2021 22:36
-
-
Save ryantanaka/3e43c636958f972d19782172a1574306 to your computer and use it in GitHub Desktop.
possible implementation for pegasus-checkpoint
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
for i in {1..3} | |
do | |
sleep 1 | |
echo "$i .. doing some work ..., these are fake logs" | |
done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio.subprocess | |
import datetime | |
import functools | |
import os | |
import struct | |
# enable debugging | |
os.environ["PYTHONASYNCIODEBUG"] = "1" | |
print("PID: {}".format(os.getpid())) | |
# interval in seconds at which we want to initiate saving/xfer of checkpoint | |
INTERVAL = 0 | |
# create a named pipe so we can be "pinged" on when to initiat a saving/xfer | |
# of checkpoint | |
PIPE = "named_pipe" | |
os.mkfifo(PIPE) | |
# open the pipe, don't block | |
fifo = os.open(PIPE, os.O_RDONLY | os.O_NONBLOCK) | |
# do a tranfer | |
async def initiate_transfer(): | |
print("--------> fake-pegasus-transfer starting {}".format(datetime.datetime.now())) | |
create = await asyncio.create_subprocess_exec( | |
"./fake-pegasus-transfer.sh", | |
stdout=asyncio.subprocess.PIPE | |
) | |
data = await create.communicate() | |
print(data[0].decode()) | |
print("-------> PID: {} --> fake transfer done...".format(create.pid)) | |
# task, which creates a transfer task, then reschedules itself to run in 'INTERVAL' seconds | |
# if transfer is in progress, should we start another one? or do nothing?? | |
def add_reoccuring_transfer_task(end_time, loop): | |
print("add_reoccuring_transfer_task called: {}".format(datetime.datetime.now())) | |
if (loop.time() + INTERVAL) < end_time: | |
# schedule execution of coroutine 'initiate_transfer' | |
loop.create_task(initiate_transfer()) | |
# set 'add_transfer_task' to be called again in 'INTERVAL' seconds | |
loop.call_later( | |
INTERVAL, | |
functools.partial(add_reoccuring_transfer_task, end_time, loop) | |
) | |
else: | |
loop.stop() | |
def handle_transfer_request(loop): | |
# data will be in pipe, read for msg | |
msg_size_bytes = os.read(fifo, 4) | |
msg_size = struct.unpack("<I", msg_size_bytes)[0] | |
msg = os.read(fifo, msg_size).decode("utf8") | |
print("got msg: {}, creating transfer task".format(msg)) | |
# add 'initiate_transfer' task | |
loop.create_task(initiate_transfer()) | |
# get a handle to the event loop so we can add to it/pass its reference | |
loop = asyncio.get_event_loop() | |
# add reader to monitor the fifo file descriptor for read availability | |
# 'handle_transfer_request' will be called whenever new data is added to the pipe | |
loop.add_reader( | |
fifo, | |
functools.partial(handle_transfer_request, loop) | |
) | |
# if interval is given, schedule saving/xfer of checkpoints at each interval | |
# and kill the event loop after some time (just for this example) | |
if INTERVAL > 0: | |
end_time = loop.time() + 25 | |
loop.call_soon(add_reoccuring_transfer_task, end_time, loop) | |
# start up event loop | |
loop.run_forever() | |
loop.close() | |
os.remove(PIPE) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import struct | |
if __name__=="__main__": | |
pipe = "named_pipe" | |
# need to make sure that file is open on the other side so that we | |
# don't block the calling process indefinitely | |
fifo = os.open(pipe, os.O_WRONLY) | |
# create payload, can be anything | |
msg = "hello world".encode("utf-8") | |
msg_len = len(msg) | |
payload = struct.pack("<I", msg_len) + msg | |
# write to the pipe | |
os.write(fifo, payload) | |
# close the pipe | |
os.close(fifo) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment