Skip to content

Instantly share code, notes, and snippets.

@dmgolembiowski
Created September 10, 2021 01:56
Show Gist options
  • Save dmgolembiowski/8cf06280ebfe668517d541b9f77450db to your computer and use it in GitHub Desktop.
Save dmgolembiowski/8cf06280ebfe668517d541b9f77450db to your computer and use it in GitHub Desktop.
Full service P2P and file upload bot. Requires Python v3.7.4.
#!/usr/bin/env python
import aiofiles
import tarfile
import pathlib
import uuid
import os
import asyncio
import functools
import logging
import random
import signal
import string
import uuid
import attr
import aiologger
from transmission_rpc.error import TransmissionAuthError
from transmission_rpc.client import Client
from transmission_rpc.lib_types import File
import functools
import aioitertools
LOG_FMT_STR = "%(asctime)s,%(msecs)d %(levelname)s: %(message)s"
LOG_DATEFMT_STR = "%H:%M:%S"
aio_formatter = aiologger.formatters.base.Formatter(
fmt=LOG_FMT_STR, datefmt=LOG_DATEFMT_STR,
)
logger = aiologger.Logger.with_default_handlers(formatter=aio_formatter)
# for the non-coroutine functions
logging.basicConfig(format=LOG_FMT_STR, datefmt=LOG_DATEFMT_STR)
class RestartFailed(Exception):
pass
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if stdout:
stdout = stdout.decode()
await logger.debug(f'Executed `{cmd}`.')
return stdout
if stderr:
await logger.error(f'Failed to execute `{cmd}`.')
return ""
async def start_nordvpn(event):
res = await run("nordvpn connect P2P")
if not res:
raise ChildProcessError("Cannot safely use torrenting services."
"Nordvpn networking is unavailable.")
event.set()
async def stop_nordvpn(p2p_down, self_event):
while True:
finished = p2p_down.is_set()
if not finished:
await logger.debug("Waiting on P2P to stop")
await asyncio.sleep(1)
else:
# self_event.set()
await run("nordvpn disconnect")
self_event.set()
return
async def start_transmission_daemon(vpn_up, self_event):
while True:
if not vpn_up.is_set():
await logger.debug("Waiting on VPN to come up...")
else:
await logger.info("VPN is established. Preparing to launch P2P daemon.")
break
await asyncio.sleep(1)
await run("service transmission-daemon start")
self_event.set()
async def stop_transmission_daemon(event):
await run("service transmission-daemon stop")
await asyncio.sleep(1)
event.set()
async def add_new_torrent(client, magnet_url, event, name=""):
if name:
await logger.info(f"Received '{name}' to download.")
else:
await logger.info("Received a new torrent to download.")
loop = asyncio.get_running_loop()
res_torrent = await loop.run_in_executor(None, functools.partial(client.add_torrent, magnet_url))
await logger.info(f"Queueing '{res_torrent.name}' for download. Total size = {res_torrent.size}")
await loop.run_in_executor(None, functools.partial(client.start_torrent, res_torrent.id_))
event.set()
await logger.info(f"Began leeching for '{res_torrent.name}'.")
async def send_file_or_directory(path, host, port, keep_artifacts=False):
def generate_sendfile(path):
def filtered_walk(source: pathlib.Path, exclusions=[]):
for root, dirs, files in os.walk(source, topdown=False):
try:
skip = set(exclusions)
for directory in dirs:
if directory in skip:
continue
tmp_dir = os.path.realpath(os.path.join(root, directory))
yield tmp_dir
for elem in files:
if elem in skip:
continue
tmp_file = os.path.realpath(os.path.join(root, elem))
yield tmp_file
except PermissionError as error_msg:
raise(error_msg)
except FileNotFoundError as error_msg:
raise(error_msg)
def tar_writer(target_dir: str, exclusions=[]) -> str:
path = pathlib.Path(f'{target_dir}').resolve()
name = str(path) + ".tar"
cwd = str(pathlib.Path(os.curdir).resolve())
with tarfile.open(name, 'w') as tar_f:
for path in filtered_walk(path, exclusions):
tar_f.add(os.path.relpath(path, cwd), recursive=False)
return name
def validate(path):
src = str(pathlib.Path(f"{path}"))
if os.path.isdir(src):
to_send = tar_writer(src)
return to_send
elif os.path.isfile(src):
name = str(pathlib.Path(f'{src}').resolve())
if os.path.exists(f"{name}.tar"):
postfixed = src + str(uuid.uuid4()) + ".tar"
name = str(pathlib.Path(f'{postfixed}').resolve())
else:
name += ".tar"
with tarfile.open(name, 'w') as tar_f:
tar_f.add(path, recursive=False)
return name
else:
raise FileNotFoundError(f"Unable to send mysterious path: '{path}'")
return validate(path)
delete_ltr = asyncio.Event()
if keep_artifacts:
delete_ltr.set()
func = functools.partial(generate_sendfile, path)
loop = asyncio.get_running_loop()
to_send = await loop.run_in_executor(None, func)
(tx, _px) = await loop.create_connection(asyncio.Protocol, host=host, port=port)
async with aiofiles.open(to_send, "rb") as stream:
await loop.sendfile(tx, stream)
if not keep_artifacts:
delete_ltr.set()
if not keep_artifacts:
while True:
if not delete_ltr.is_set():
await asyncio.sleep(0.0314)
else:
await loop.run_in_executor(None, os.remove, to_send)
break
async def pause_all_torrents(msg):
logger.info("Pausing all torrents")
event = asyncio.Event()
asyncio.create_task(extend(msg, event))
asyncio.create_task(cleanup(msg, event))
# Todo
get_all_torrent_ids = lambda: []
tasks = [
pause_torrent_with_id(id_, event)
for id_ in [1,23,4]
]
results = await asyncio.gather(*tasks, return_exceptions=True)
event.set()
async def publish(queue, coroutines=None):
await logger.debug("Upkeep check :: maintain any unfinished/damaged artifacts.")
if not coroutines:
coroutines = aioitertools.cycle([
poll,
interactive_rpc_poll,
get_download_statuses,
house_keeping,
publish
])
coroutines = coroutines.__aiter__()
queue, coroutines = await upkeep(queue, coroutines)
while True:
coro = await queue.get()
emission = await coro()
await emission(queue, coroutines)
async def upkeep(queue, coroutines):
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
await queue.put(coroutines.__anext__)
return queue, coroutines
async def poll(queue, coroutines):
await asyncio.sleep(1)
await logger.debug("Polling P2P addresses...")
async def interactive_rpc_poll(queue, _coroutines):
await asyncio.sleep(1)
await logger.debug("Checking if manual RPC instructions were dropped off.")
async def get_download_statuses(queue, _coroutines):
await asyncio.sleep(1)
await logger.debug("Querying if any torrents were recently completed.")
async def house_keeping(queue, _coroutines):
await asyncio.sleep(1)
await logger.debug("Housekeeping! Cleaning up any messes left behind.")
async def start_vpn_then_torrent_daemon(loop, queue):
await logger.info("Starting P2P services.")
vpn_up = asyncio.Event()
p2p_up = asyncio.Event()
vpn = asyncio.create_task(start_nordvpn(vpn_up))
p2p = asyncio.create_task(start_transmission_daemon(vpn_up, p2p_up))
asyncio.gather(vpn, p2p, return_exceptions=True)
while True:
if not vpn_up.is_set() and not p2p_up.is_set():
await logger.debug(f"VPN: {'up' if vpn_up.is_set() else 'down'}")
await logger.debug(f"P2P: {'up' if p2p_up.is_set() else 'down'}")
await asyncio.sleep(1)
else:
break
await logger.info("Ready to file share!")
loop.create_task(publish(queue))
await logger.info("Autonamous P2P agent is running...")
async def save(msg):
await asyncio.sleep(random.random())
if random.randrange(1, 5) == 3:
raise Exception(f"Could not save {msg}")
msg.saved = True
await logger.info(f"Saved {msg} into database")
async def cleanup(msg, event):
await event.wait()
await asyncio.sleep(random.random())
msg.acked = True
await logger.info(f"Done. Acked {msg}")
async def extend(msg, event):
while not event.is_set():
msg.extended_cnt += 1
await logger.info(f"Extended deadline by 3 seconds for {msg}")
await asyncio.sleep(2)
def handle_results(results, msg):
for result in results:
if isinstance(result, RestartFailed):
logging.error(f"Retrying for failure to restart: {msg.hostname}")
elif isinstance(result, Exception):
logging.error(f"Handling general error: {result}")
async def handle_message(msg):
event = asyncio.Event()
asyncio.create_task(extend(msg, event))
asyncio.create_task(cleanup(msg, event))
results = await asyncio.gather(
save(msg), restart_host(msg), return_exceptions=True
)
handle_results(results, msg)
event.set()
async def consume(queue):
while True:
msg = await queue.get()
await logger.info(f"Pulled {msg}")
asyncio.create_task(pause_all_torrents(msg))
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
logging.error(f"Caught exception: {msg}")
logging.info("Shutting down...")
asyncio.create_task(shutdown(loop))
async def shutdown(loop, signal=None):
if signal:
await logger.info(f"Received exit signal {signal.name}...")
await logger.info("Closing database connections")
await logger.info("Nacking outstanding messages")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
await logger.info("Shutting down P2P services and VPN connectivity.")
vpn_down = asyncio.Event()
p2p_down = asyncio.Event()
p2p = asyncio.create_task(stop_transmission_daemon(p2p_down))
vpn = asyncio.create_task(stop_nordvpn(p2p_down, vpn_down))
asyncio.gather(vpn, p2p, return_exceptions=True)
while True:
vpn_clean = vpn_down.is_set()
p2p_clean = p2p_down.is_set()
if vpn_clean and p2p_clean:
break
else:
await logger.debug(f"VPN: {'down' if vpn_down.is_set() else 'up'}")
await logger.debug(f"P2P: {'down' if p2p_down.is_set() else 'up'}")
await asyncio.sleep(1)
await logger.info("Able to gracefully shutdown!")
await logger.info("Cancelling outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
await logger.shutdown()
loop.stop()
def main():
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s))
)
loop.set_exception_handler(handle_exception)
queue = asyncio.Queue()
try:
loop.create_task(start_vpn_then_torrent_daemon(loop, queue))
loop.run_forever()
finally:
loop.close()
logging.info("Successfully shutdown the AutoP2P service.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment