Skip to content

Instantly share code, notes, and snippets.

@dmgolembiowski
Last active September 10, 2021 03:58
Show Gist options
  • Save dmgolembiowski/f75ee84a196110bc673ecda05d74a3d1 to your computer and use it in GitHub Desktop.
Save dmgolembiowski/f75ee84a196110bc673ecda05d74a3d1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Using Python v3.7.4
from imap_tools import (AND, MailBox)
import aiofiles
import tarfile
import pathlib
import uuid
import os
import asyncio
import functools
import logging
import random
import signal
import string
import uuid
import attr
import aiologger
from transmission_rpc.error import TransmissionAuthError
from transmission_rpc.client import Client
from transmission_rpc.lib_types import File
import functools
import aioitertools
LOG_FMT_STR = "%(asctime)s,%(msecs)d %(levelname)s: %(message)s"
LOG_DATEFMT_STR = "%H:%M:%S"
aio_formatter = aiologger.formatters.base.Formatter(
fmt=LOG_FMT_STR, datefmt=LOG_DATEFMT_STR,
)
logger = aiologger.Logger.with_default_handlers(formatter=aio_formatter)
# for the non-coroutine functions
logging.basicConfig(format=LOG_FMT_STR, datefmt=LOG_DATEFMT_STR)
class RestartFailed(Exception):
pass
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if stdout:
stdout = stdout.decode()
await logger.debug(f'Executed `{cmd}`.')
return stdout
if stderr:
await logger.error(f'Failed to execute `{cmd}`.')
return ""
async def start_nordvpn(event):
res = await run("nordvpn connect P2P")
if not res:
raise ChildProcessError("Cannot safely use torrenting services."
"Nordvpn networking is unavailable.")
event.set()
async def stop_nordvpn(p2p_down, self_event):
while True:
finished = p2p_down.is_set()
if not finished:
await logger.debug("Waiting on P2P to stop")
await asyncio.sleep(1)
else:
await run("nordvpn disconnect")
self_event.set()
return
async def start_transmission_daemon(vpn_up, self_event):
while True:
if not vpn_up.is_set():
await logger.debug("Waiting on VPN to come up...")
else:
await logger.info("VPN is established. Preparing to launch P2P daemon.")
break
await asyncio.sleep(1)
await run("service transmission-daemon start")
self_event.set()
async def stop_transmission_daemon(event):
await run("service transmission-daemon stop")
await asyncio.sleep(1)
event.set()
async def add_new_torrent(client, magnet_url, name=""):
if name:
await logger.info(f"Received '{name}' to download.")
else:
await logger.info("Received a new torrent to download.")
loop = asyncio.get_running_loop()
res_torrent = await loop.run_in_executor(None, functools.partial(client.add_torrent, magnet_url))
await logger.info(f"Queueing '{res_torrent.name}' for download. Total size = {res_torrent.size}")
await loop.run_in_executor(None, functools.partial(client.start_torrent, res_torrent.id_))
await logger.info(f"Began leeching for '{res_torrent.name}'.")
async def send_file_or_directory(path, host, port, keep_artifacts=False):
def generate_sendfile(path):
def filtered_walk(source: pathlib.Path, exclusions=[]):
for root, dirs, files in os.walk(source, topdown=False):
try:
skip = set(exclusions)
for directory in dirs:
if directory in skip:
continue
tmp_dir = os.path.realpath(os.path.join(root, directory))
yield tmp_dir
for elem in files:
if elem in skip:
continue
tmp_file = os.path.realpath(os.path.join(root, elem))
yield tmp_file
except PermissionError as error_msg:
raise(error_msg)
except FileNotFoundError as error_msg:
raise(error_msg)
def tar_writer(target_dir: str, exclusions=[]) -> str:
path = pathlib.Path(f'{target_dir}').resolve()
name = str(path) + ".tar"
cwd = str(pathlib.Path(os.curdir).resolve())
with tarfile.open(name, 'w') as tar_f:
for path in filtered_walk(path, exclusions):
tar_f.add(os.path.relpath(path, cwd), recursive=False)
return name
def validate(path):
src = str(pathlib.Path(f"{path}"))
if os.path.isdir(src):
to_send = tar_writer(src)
return to_send
elif os.path.isfile(src):
name = str(pathlib.Path(f'{src}').resolve())
if os.path.exists(f"{name}.tar"):
postfixed = src + str(uuid.uuid4()) + ".tar"
name = str(pathlib.Path(f'{postfixed}').resolve())
else:
name += ".tar"
with tarfile.open(name, 'w') as tar_f:
tar_f.add(path, recursive=False)
return name
else:
raise FileNotFoundError(f"Unable to send mysterious path: '{path}'")
return validate(path)
delete_ltr = asyncio.Event()
if keep_artifacts:
delete_ltr.set()
func = functools.partial(generate_sendfile, path)
loop = asyncio.get_running_loop()
to_send = await loop.run_in_executor(None, func)
(tx, _px) = await loop.create_connection(asyncio.Protocol, host=host, port=port)
async with aiofiles.open(to_send, "rb") as stream:
await loop.sendfile(tx, stream)
if not keep_artifacts:
delete_ltr.set()
if not keep_artifacts:
while True:
if not delete_ltr.is_set():
await asyncio.sleep(0.0314)
else:
await loop.run_in_executor(None, os.remove, to_send)
break
async def pause_all_torrents(msg):
logger.info("Pausing all torrents")
event = asyncio.Event()
asyncio.create_task(extend(msg, event))
asyncio.create_task(cleanup(msg, event))
# Todo
get_all_torrent_ids = lambda: []
tasks = [
pause_torrent_with_id(id_, event)
for id_ in [1,23,4]
]
results = await asyncio.gather(*tasks, return_exceptions=True)
event.set()
async def publish(queue, coroutines=None):
await logger.debug("Upkeep check :: maintain any unfinished/damaged artifacts.")
if not coroutines:
coroutines = aioitertools.cycle([
poll,
interactive_rpc_poll,
get_download_statuses,
house_keeping,
publish
])
coroutines = coroutines.__aiter__()
queue, coroutines = await upkeep(queue, coroutines)
while True:
coro = await queue.get()
emission = await coro()
await emission(queue, coroutines)
async def upkeep(queue, coroutines):
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
return queue, coroutines
def mbox():
return (
MailBox(host="imap.gmail.com", port=993)
.login( "user@gmail.com",
"app_password",
"INBOX"))
async def poll(queue, coroutines):
@functools.lru_cache(maxsize=1)
def client():
u = "***"
p = "***"
c = Client(username=u, password=p, host="0.0.0.0", timeout=120)
return c
def _poll():
c = client
with mbox() as mailbox:
return (c, [m.text for m in mailbox.fetch(AND(subject="download"))])
await logger.debug("Polling email for P2P addresses.")
(cli, dl) = await asyncio.get_running_loop().run_in_executor(None, _poll)
async for magnet in aioitertools.iter(dl):
await delete_mail(magnet)
await add_torrent(cli, magnet)
async def add_torrent(client: Client, magnet: str):
await logger.info("Allocating executor for the torrent download.")
await asyncio.get_running_loop().run_in_executor(None, client().add_torrent, magnet)
async def delete_mail(magnet: str):
await logger.info(f"Deleting magnet: {magnet}")
def del_magnet_email():
with mbox() as mailbox:
criteria = 'ALL'
found_nums = mailbox.numbers(criteria)
page_len = 3
pages = int(len(found_nums) // page_len) + 1 \
if len(found_nums) % page_len \
else int(len(found_nums) // page_len)
for page in range(pages):
page_limit = slice(page * page_len,
page * page_len + page_len)
for msg in mailbox.fetch(criteria,
bulk=True, limit=page_limit):
if msg.text == magnet:
mailbox.delete([str(msg.uid)])
await asyncio.get_running_loop().run_in_executor(None, del_magnet_email)
async def interactive_rpc_poll(queue, _coroutines):
await asyncio.sleep(1)
await logger.debug("Checking if manual RPC instructions were dropped off.")
async def get_download_statuses(queue, _coroutines):
await asyncio.sleep(1)
await logger.debug("Querying if any torrents were recently completed.")
async def house_keeping(queue, _coroutines):
await asyncio.sleep(1)
await logger.debug("Housekeeping! Cleaning up any messes left behind.")
async def start_vpn_then_torrent_daemon(loop, queue):
await logger.info("Starting P2P services.")
vpn_up = asyncio.Event()
p2p_up = asyncio.Event()
vpn = asyncio.create_task(start_nordvpn(vpn_up))
p2p = asyncio.create_task(start_transmission_daemon(vpn_up, p2p_up))
asyncio.gather(vpn, p2p, return_exceptions=True)
while True:
if not vpn_up.is_set() and not p2p_up.is_set():
await logger.debug(f"VPN: {'up' if vpn_up.is_set() else 'down'}")
await logger.debug(f"P2P: {'up' if p2p_up.is_set() else 'down'}")
await asyncio.sleep(1)
else:
break
await logger.info("Ready to file share!")
loop.create_task(publish(queue))
await logger.info("Autonamous P2P agent is running...")
async def save(msg):
await asyncio.sleep(random.random())
if random.randrange(1, 5) == 3:
raise Exception(f"Could not save {msg}")
msg.saved = True
await logger.info(f"Saved {msg} into database")
async def cleanup(msg, event):
await event.wait()
await asyncio.sleep(random.random())
msg.acked = True
await logger.info(f"Done. Acked {msg}")
async def extend(msg, event):
while not event.is_set():
msg.extended_cnt += 1
await logger.info(f"Extended deadline by 3 seconds for {msg}")
await asyncio.sleep(2)
def handle_results(results, msg=None):
for result in results:
if isinstance(result, RestartFailed):
logging.error(f"Encountered failure: {msg}")
elif isinstance(result, Exception):
logging.error(f"Handling general error: {result}")
async def handle_message(msg):
event = asyncio.Event()
asyncio.create_task(extend(msg, event))
asyncio.create_task(cleanup(msg, event))
results = await asyncio.gather(
save(msg), restart_host(msg), return_exceptions=True
)
handle_results(results, msg)
event.set()
async def consume(queue):
while True:
msg = await queue.get()
await logger.info(f"Pulled {msg}")
asyncio.create_task(pause_all_torrents(msg))
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
logging.error(f"Caught exception: {msg}")
logging.info("Shutting down...")
asyncio.create_task(shutdown(loop))
async def shutdown(loop, signal=None):
if signal:
await logger.info(f"Received exit signal {signal.name}...")
await logger.info("Closing database connections")
await logger.info("Nacking outstanding messages")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
await logger.info("Shutting down P2P services and VPN connectivity.")
vpn_down = asyncio.Event()
p2p_down = asyncio.Event()
p2p = asyncio.create_task(stop_transmission_daemon(p2p_down))
vpn = asyncio.create_task(stop_nordvpn(p2p_down, vpn_down))
asyncio.gather(vpn, p2p, return_exceptions=True)
while True:
vpn_clean = vpn_down.is_set()
p2p_clean = p2p_down.is_set()
if vpn_clean and p2p_clean:
break
else:
await logger.debug(f"VPN: {'down' if vpn_down.is_set() else 'up'}")
await logger.debug(f"P2P: {'down' if p2p_down.is_set() else 'up'}")
await asyncio.sleep(1)
await logger.info("Able to gracefully shutdown!")
await logger.info("Cancelling outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
await logger.shutdown()
loop.stop()
def main():
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s))
)
loop.set_exception_handler(handle_exception)
queue = asyncio.Queue()
try:
loop.create_task(start_vpn_then_torrent_daemon(loop, queue))
loop.run_forever()
finally:
loop.close()
logging.info("Successfully shutdown the AutoP2P service.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment