Created
September 10, 2021 01:56
-
-
Save dmgolembiowski/8cf06280ebfe668517d541b9f77450db to your computer and use it in GitHub Desktop.
Full service P2P and file upload bot. Requires Python v3.7.4.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import aiofiles | |
import tarfile | |
import pathlib | |
import uuid | |
import os | |
import asyncio | |
import functools | |
import logging | |
import random | |
import signal | |
import string | |
import uuid | |
import attr | |
import aiologger | |
from transmission_rpc.error import TransmissionAuthError | |
from transmission_rpc.client import Client | |
from transmission_rpc.lib_types import File | |
import functools | |
import aioitertools | |
LOG_FMT_STR = "%(asctime)s,%(msecs)d %(levelname)s: %(message)s" | |
LOG_DATEFMT_STR = "%H:%M:%S" | |
aio_formatter = aiologger.formatters.base.Formatter( | |
fmt=LOG_FMT_STR, datefmt=LOG_DATEFMT_STR, | |
) | |
logger = aiologger.Logger.with_default_handlers(formatter=aio_formatter) | |
# for the non-coroutine functions | |
logging.basicConfig(format=LOG_FMT_STR, datefmt=LOG_DATEFMT_STR) | |
class RestartFailed(Exception): | |
pass | |
async def run(cmd): | |
proc = await asyncio.create_subprocess_shell( | |
cmd, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE) | |
stdout, stderr = await proc.communicate() | |
if stdout: | |
stdout = stdout.decode() | |
await logger.debug(f'Executed `{cmd}`.') | |
return stdout | |
if stderr: | |
await logger.error(f'Failed to execute `{cmd}`.') | |
return "" | |
async def start_nordvpn(event): | |
res = await run("nordvpn connect P2P") | |
if not res: | |
raise ChildProcessError("Cannot safely use torrenting services." | |
"Nordvpn networking is unavailable.") | |
event.set() | |
async def stop_nordvpn(p2p_down, self_event): | |
while True: | |
finished = p2p_down.is_set() | |
if not finished: | |
await logger.debug("Waiting on P2P to stop") | |
await asyncio.sleep(1) | |
else: | |
# self_event.set() | |
await run("nordvpn disconnect") | |
self_event.set() | |
return | |
async def start_transmission_daemon(vpn_up, self_event): | |
while True: | |
if not vpn_up.is_set(): | |
await logger.debug("Waiting on VPN to come up...") | |
else: | |
await logger.info("VPN is established. Preparing to launch P2P daemon.") | |
break | |
await asyncio.sleep(1) | |
await run("service transmission-daemon start") | |
self_event.set() | |
async def stop_transmission_daemon(event): | |
await run("service transmission-daemon stop") | |
await asyncio.sleep(1) | |
event.set() | |
async def add_new_torrent(client, magnet_url, event, name=""): | |
if name: | |
await logger.info(f"Received '{name}' to download.") | |
else: | |
await logger.info("Received a new torrent to download.") | |
loop = asyncio.get_running_loop() | |
res_torrent = await loop.run_in_executor(None, functools.partial(client.add_torrent, magnet_url)) | |
await logger.info(f"Queueing '{res_torrent.name}' for download. Total size = {res_torrent.size}") | |
await loop.run_in_executor(None, functools.partial(client.start_torrent, res_torrent.id_)) | |
event.set() | |
await logger.info(f"Began leeching for '{res_torrent.name}'.") | |
async def send_file_or_directory(path, host, port, keep_artifacts=False): | |
def generate_sendfile(path): | |
def filtered_walk(source: pathlib.Path, exclusions=[]): | |
for root, dirs, files in os.walk(source, topdown=False): | |
try: | |
skip = set(exclusions) | |
for directory in dirs: | |
if directory in skip: | |
continue | |
tmp_dir = os.path.realpath(os.path.join(root, directory)) | |
yield tmp_dir | |
for elem in files: | |
if elem in skip: | |
continue | |
tmp_file = os.path.realpath(os.path.join(root, elem)) | |
yield tmp_file | |
except PermissionError as error_msg: | |
raise(error_msg) | |
except FileNotFoundError as error_msg: | |
raise(error_msg) | |
def tar_writer(target_dir: str, exclusions=[]) -> str: | |
path = pathlib.Path(f'{target_dir}').resolve() | |
name = str(path) + ".tar" | |
cwd = str(pathlib.Path(os.curdir).resolve()) | |
with tarfile.open(name, 'w') as tar_f: | |
for path in filtered_walk(path, exclusions): | |
tar_f.add(os.path.relpath(path, cwd), recursive=False) | |
return name | |
def validate(path): | |
src = str(pathlib.Path(f"{path}")) | |
if os.path.isdir(src): | |
to_send = tar_writer(src) | |
return to_send | |
elif os.path.isfile(src): | |
name = str(pathlib.Path(f'{src}').resolve()) | |
if os.path.exists(f"{name}.tar"): | |
postfixed = src + str(uuid.uuid4()) + ".tar" | |
name = str(pathlib.Path(f'{postfixed}').resolve()) | |
else: | |
name += ".tar" | |
with tarfile.open(name, 'w') as tar_f: | |
tar_f.add(path, recursive=False) | |
return name | |
else: | |
raise FileNotFoundError(f"Unable to send mysterious path: '{path}'") | |
return validate(path) | |
delete_ltr = asyncio.Event() | |
if keep_artifacts: | |
delete_ltr.set() | |
func = functools.partial(generate_sendfile, path) | |
loop = asyncio.get_running_loop() | |
to_send = await loop.run_in_executor(None, func) | |
(tx, _px) = await loop.create_connection(asyncio.Protocol, host=host, port=port) | |
async with aiofiles.open(to_send, "rb") as stream: | |
await loop.sendfile(tx, stream) | |
if not keep_artifacts: | |
delete_ltr.set() | |
if not keep_artifacts: | |
while True: | |
if not delete_ltr.is_set(): | |
await asyncio.sleep(0.0314) | |
else: | |
await loop.run_in_executor(None, os.remove, to_send) | |
break | |
async def pause_all_torrents(msg): | |
logger.info("Pausing all torrents") | |
event = asyncio.Event() | |
asyncio.create_task(extend(msg, event)) | |
asyncio.create_task(cleanup(msg, event)) | |
# Todo | |
get_all_torrent_ids = lambda: [] | |
tasks = [ | |
pause_torrent_with_id(id_, event) | |
for id_ in [1,23,4] | |
] | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
event.set() | |
async def publish(queue, coroutines=None): | |
await logger.debug("Upkeep check :: maintain any unfinished/damaged artifacts.") | |
if not coroutines: | |
coroutines = aioitertools.cycle([ | |
poll, | |
interactive_rpc_poll, | |
get_download_statuses, | |
house_keeping, | |
publish | |
]) | |
coroutines = coroutines.__aiter__() | |
queue, coroutines = await upkeep(queue, coroutines) | |
while True: | |
coro = await queue.get() | |
emission = await coro() | |
await emission(queue, coroutines) | |
async def upkeep(queue, coroutines): | |
await queue.put(coroutines.__anext__) | |
await queue.put(coroutines.__anext__) | |
await queue.put(coroutines.__anext__) | |
await queue.put(coroutines.__anext__) | |
await queue.put(coroutines.__anext__) | |
return queue, coroutines | |
async def poll(queue, coroutines): | |
await asyncio.sleep(1) | |
await logger.debug("Polling P2P addresses...") | |
async def interactive_rpc_poll(queue, _coroutines): | |
await asyncio.sleep(1) | |
await logger.debug("Checking if manual RPC instructions were dropped off.") | |
async def get_download_statuses(queue, _coroutines): | |
await asyncio.sleep(1) | |
await logger.debug("Querying if any torrents were recently completed.") | |
async def house_keeping(queue, _coroutines): | |
await asyncio.sleep(1) | |
await logger.debug("Housekeeping! Cleaning up any messes left behind.") | |
async def start_vpn_then_torrent_daemon(loop, queue): | |
await logger.info("Starting P2P services.") | |
vpn_up = asyncio.Event() | |
p2p_up = asyncio.Event() | |
vpn = asyncio.create_task(start_nordvpn(vpn_up)) | |
p2p = asyncio.create_task(start_transmission_daemon(vpn_up, p2p_up)) | |
asyncio.gather(vpn, p2p, return_exceptions=True) | |
while True: | |
if not vpn_up.is_set() and not p2p_up.is_set(): | |
await logger.debug(f"VPN: {'up' if vpn_up.is_set() else 'down'}") | |
await logger.debug(f"P2P: {'up' if p2p_up.is_set() else 'down'}") | |
await asyncio.sleep(1) | |
else: | |
break | |
await logger.info("Ready to file share!") | |
loop.create_task(publish(queue)) | |
await logger.info("Autonamous P2P agent is running...") | |
async def save(msg): | |
await asyncio.sleep(random.random()) | |
if random.randrange(1, 5) == 3: | |
raise Exception(f"Could not save {msg}") | |
msg.saved = True | |
await logger.info(f"Saved {msg} into database") | |
async def cleanup(msg, event): | |
await event.wait() | |
await asyncio.sleep(random.random()) | |
msg.acked = True | |
await logger.info(f"Done. Acked {msg}") | |
async def extend(msg, event): | |
while not event.is_set(): | |
msg.extended_cnt += 1 | |
await logger.info(f"Extended deadline by 3 seconds for {msg}") | |
await asyncio.sleep(2) | |
def handle_results(results, msg): | |
for result in results: | |
if isinstance(result, RestartFailed): | |
logging.error(f"Retrying for failure to restart: {msg.hostname}") | |
elif isinstance(result, Exception): | |
logging.error(f"Handling general error: {result}") | |
async def handle_message(msg): | |
event = asyncio.Event() | |
asyncio.create_task(extend(msg, event)) | |
asyncio.create_task(cleanup(msg, event)) | |
results = await asyncio.gather( | |
save(msg), restart_host(msg), return_exceptions=True | |
) | |
handle_results(results, msg) | |
event.set() | |
async def consume(queue): | |
while True: | |
msg = await queue.get() | |
await logger.info(f"Pulled {msg}") | |
asyncio.create_task(pause_all_torrents(msg)) | |
def handle_exception(loop, context): | |
msg = context.get("exception", context["message"]) | |
logging.error(f"Caught exception: {msg}") | |
logging.info("Shutting down...") | |
asyncio.create_task(shutdown(loop)) | |
async def shutdown(loop, signal=None): | |
if signal: | |
await logger.info(f"Received exit signal {signal.name}...") | |
await logger.info("Closing database connections") | |
await logger.info("Nacking outstanding messages") | |
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] | |
[task.cancel() for task in tasks] | |
await logger.info("Shutting down P2P services and VPN connectivity.") | |
vpn_down = asyncio.Event() | |
p2p_down = asyncio.Event() | |
p2p = asyncio.create_task(stop_transmission_daemon(p2p_down)) | |
vpn = asyncio.create_task(stop_nordvpn(p2p_down, vpn_down)) | |
asyncio.gather(vpn, p2p, return_exceptions=True) | |
while True: | |
vpn_clean = vpn_down.is_set() | |
p2p_clean = p2p_down.is_set() | |
if vpn_clean and p2p_clean: | |
break | |
else: | |
await logger.debug(f"VPN: {'down' if vpn_down.is_set() else 'up'}") | |
await logger.debug(f"P2P: {'down' if p2p_down.is_set() else 'up'}") | |
await asyncio.sleep(1) | |
await logger.info("Able to gracefully shutdown!") | |
await logger.info("Cancelling outstanding tasks") | |
await asyncio.gather(*tasks, return_exceptions=True) | |
await logger.shutdown() | |
loop.stop() | |
def main(): | |
loop = asyncio.get_event_loop() | |
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) | |
for s in signals: | |
loop.add_signal_handler( | |
s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s)) | |
) | |
loop.set_exception_handler(handle_exception) | |
queue = asyncio.Queue() | |
try: | |
loop.create_task(start_vpn_then_torrent_daemon(loop, queue)) | |
loop.run_forever() | |
finally: | |
loop.close() | |
logging.info("Successfully shutdown the AutoP2P service.") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment