Skip to content

Instantly share code, notes, and snippets.

@mengyyy
Last active February 14, 2020 12:40
Show Gist options
  • Save mengyyy/39e30a8318d4b3ae9c07d13edb4b5724 to your computer and use it in GitHub Desktop.
Save mengyyy/39e30a8318d4b3ae9c07d13edb4b5724 to your computer and use it in GitHub Desktop.
aria2 rpc with aiohttp
import asyncio
import base64
import os
import aiohttp
import stackprinter
from loguru import logger
DEFAULT_KEYS = (
# "bitfield",
"bittorrent",
"completedLength",
# "connections",
"dir",
# "downloadSpeed",
"files",
"following",
"gid",
"infoHash",
# "numPieces",
# "numSeeders",
# "pieceLength",
"seeder",
"status",
"totalLength",
# "uploadLength",
# "uploadSpeed",
)
HOWS = (
"POS_SET", # If how is POS_SET, it moves the download to a position relative to the beginning of the queue.
"POS_CUR", # If how is POS_CUR, it moves the download to a position relative to the current position.
"POS_END", # If how is POS_END, it moves the download to a position relative to the end of the queue.
)
class MengyAria2AsyncAPI:
def __init__(self, token,):
self.token = f"token:{token}"
def _handle_fun(self, data):
logger.debug(f"get data | {data}")
return data
def _do(self, method, params=tuple(), multi=False):
if multi is True:
# token added in param in params
data = {"jsonrpc": "2.0", "method": method, "id": "mengy", "params": [params]}
else:
data = {"jsonrpc": "2.0", "method": method, "id": "mengy", "params": [self.token, *params]}
return self._handle_fun(data)
def addUri(self, uris, options=dict(), position=0):
params = [uris, options, position]
return self._do("aria2.addUri", params)
def addTorrent(self, torrent_file_path, uris=tuple(), options=dict(), position=0):
if not os.path.isfile(torrent_file_path):
logger.error(f"torrent file not found {torrent_file_path}")
return None
torrent = base64.b64encode(open(torrent_file_path, "rb").read()).decode()
params = [torrent, uris, options, position]
return self._do("aria2.addTorrent", params)
def addMetalink(self, metalink_file_path, options=dict(), position=0):
if not os.path.isfile(metalink_file_path):
logger.error(f"metalink file not found {metalink_file_path}")
return None
metalink = base64.b64encode(open(metalink_file_path, "rb").read()).decode()
params = [metalink, options, position]
return self._do("aria2.addMetalink", params)
def remove(self, gid):
return self._do("aria2.remove", params=[gid])
def forceRemove(self, gid):
return self._do("aria2.forceRemove", params=[gid])
def pause(self, gid):
return self._do("aria2.pause", params=[gid])
def pauseAll(self):
return self._do("aria2.pauseAll")
def forcePause(self, gid):
return self._do("aria2.forcePause", params=[gid])
def forcePauseAll(self):
return self._do("aria2.forcePauseAll")
def unpause(self, gid):
return self._do("aria2.unpause", params=[gid])
def unpauseAll(self):
return self._do("aria2.unpauseAll")
def tellStatus(self, gid, keys=DEFAULT_KEYS):
return self._do("aria2.tellStatus", params=[gid, keys])
def getUris(self, gid):
return self._do("aria2.getUris", params=[gid])
def getFiles(self, gid):
return self._do("aria2.getFiles", params=[gid])
def getPeers(self, gid):
return self._do("aria2.getPeers", params=[gid])
def getServers(self, gid):
return self._do("aria2.getServers", params=[gid])
def tellActive(self, keys=DEFAULT_KEYS):
return self._do("aria2.tellActive", params=[keys])
def tellWaiting(self, offset=0, num=300, keys=DEFAULT_KEYS):
return self._do("aria2.tellWaiting", params=[offset, num, keys])
def tellStopped(self, offset=0, num=300, keys=DEFAULT_KEYS):
return self._do("aria2.tellStopped", params=[offset, num, keys])
def changePosition(self, gid, pos=0, how="POS_END"):
if how not in HOWS:
logger.error(f"param how is not incalid | {how}")
return None
return self._do("aria2.changePosition", params=[gid, pos, how])
def changeUri(self, gid, fileIndex=1, delUris=tuple(), addUris=tuple(), position=0):
return self._do("aria2.changeUri", params=[gid, fileIndex, delUris, addUris, position])
def getOption(self, gid):
return self._do("aria2.getOption", params=[gid])
def changeOption(self, gid, options=dict()):
return self._do("aria2.changeOption", params=[gid, options])
def getGlobalOption(self):
return self._do("aria2.getGlobalOption")
def changeGlobalOption(self, options=dict()):
return self._do("aria2.changeGlobalOption", params=[options])
def getGlobalStat(self):
return self._do("aria2.getGlobalStat")
def purgeDownloadResult(self):
return self._do("aria2.purgeDownloadResult")
def removeDownloadResult(self, gid):
return self._do("aria2.removeDownloadResult", params=[gid])
def getVersion(self):
return self._do("aria2.getVersion")
def getSessionInfo(self):
return self._do("aria2.getSessionInfo")
def shutdown(self):
return self._do("arai2.shutdown")
def forceShutdown(self):
return self._do("arai2.forceShutdown")
def saveSession(self):
return self._do("arai2.saveSession")
def sys_multicall(self, method_params_pairs):
params = []
for m, p in method_params_pairs:
params.append(dict(methodName=m, params=[self.token, *p]))
return self._do("system.multicall", params=params, multi=True)
def sys_listMethods(self):
return self._do("system.listMethods")
def sys_listNotifications(self):
return self._do("system.listNotifications")
class MengyAria2AsyncHttpPost(MengyAria2AsyncAPI):
def __init__(self, server_url, token, timeout=180):
MengyAria2AsyncAPI.__init__(self, token)
self.server_url = server_url
self.timeout = timeout
async def _handle_fun(self, data):
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(self.server_url, json=data, timeout=self.timeout) as resp:
resp_dict = await resp.json()
status_code = resp.status
if status_code != 200:
logger.error(f"_request failed | status code {status_code} | {resp_dict}")
return resp_dict.get("result")
except Exception:
logger.error(f"reaquest failed | {data}\n{stackprinter.fromat()}")
return None
if __name__ == '__main__':
self = MengyAria2AsyncHttpPost("https://example.com:6800/jsonrpc", "my_token")
async def test():
tellActive_result = await self.tellActive()
logger.debug(f"tellActive_result | {tellActive_result}")
addUri_result = await self.addUri(["http://releases.ubuntu.com/16.04/ubuntu-16.04.6-server-i386.iso.torrent"])
logger.debug(f"addUri_result | {addUri_result}")
addTorrent_result = await self.addTorrent("test.torrent")
logger.debug(f"addTorrent_result | {addTorrent_result}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment