Skip to content

Instantly share code, notes, and snippets.

@cyanolupus
Created March 25, 2022 13:56
Show Gist options
  • Save cyanolupus/245d77b6ca33c710fa5cb8155f6b1ff3 to your computer and use it in GitHub Desktop.
Save cyanolupus/245d77b6ca33c710fa5cb8155f6b1ff3 to your computer and use it in GitHub Desktop.
activate fc2-live-dl with gmail API... 治安良し
#!/usr/bin/env python3
from __future__ import print_function
import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import base64
import time
import re
import asyncio
import sys
import logging
import subprocess
# subprocess.PIPE
def execute(command_with_args):
try:
print("starting... ["+ command_with_args[0] + "]", flush=True)
proc = subprocess.Popen(
command_with_args,
stdout=sys.stdout,
stderr=sys.stderr)
return proc
except FileNotFoundError as not_found_e:
print("nf cmd", flush=True)
return None
except Exception as generic_e:
print("except", flush=True)
return None
def check(proc_list):
active = []
vids = 0
for proc in proc_list:
if proc == None:
continue
code = proc.poll()
if code == None:
active.append(proc)
else:
print("finished!!! [" + proc.args[0] + "]", flush=True)
vids = vids + 1
return active, vids
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
class Creds:
creds = None
def __init__(self, token_path, credential_path):
"""Shows basic usage of the Gmail API.
Lists the user's Gmail labels.
"""
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
credential_path, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(token_path, 'w') as token:
token.write(creds.to_json())
self.creds = creds
def fetch_mails(self, query):
try:
# Call the Gmail API
service = build('gmail', 'v1', credentials=self.creds)
liofmsg = service.users().messages().list(userId="me", labelIds="INBOX", q=query).execute()
mail = {}
if "messages" in liofmsg:
msg = liofmsg["messages"][0]
mail["id"] = msg["id"]
message = service.users().messages().get(userId="me", id=msg["id"]).execute()
for header in message["payload"]["headers"]:
if header["name"] == "Subject":
mail["title"] = header["value"]
break
if message["payload"]["body"]["size"] != 0:
mail["body"] = base64.urlsafe_b64decode(
message["payload"]["body"]["data"]).decode("UTF-8")
else:
mail["body"] = base64.urlsafe_b64decode(
message["payload"]['parts'][0]["body"]["data"]).decode("UTF-8")
return mail
except HttpError as error:
# TODO(developer) - Handle errors from gmail API.
print(f'An error occurred: {error}')
def reflesh(self, token_path):
if not self.creds or not self.creds.valid:
self.creds.refresh(Request())
with open(token_path, 'w') as token:
token.write(self.creds.to_json())
print("creds renewed!", flush=True)
def main():
urlre=re.compile(r"http://live.fc2.com/(\d+)")
token_path='path/to/token.json'
credential_path='path/to/credentials.json'
creds=Creds(token_path, credential_path)
rclone_cfg_path='path/to/rclone.conf'
download_path='path/to/download'
remote_path='remote:/rclone/path'
lastid=""
fc2path='path/to/fc2-live-dl'
numofvid = 0
rc_list = []
dl_list = []
while True:
creds.reflesh(token_path)
mail=creds.fetch_mails("from:cyanolupus@gmail.com OR from:info@livechat.fc2.com")
if mail and lastid != mail["id"]:
print("received!", flush=True)
match = urlre.search(mail["body"])
if match:
fc2_args = [fc2path, "https://live.fc2.com/" + match.groups()[0],
"-o", download_path + "'%(date)s-%(title)s-(%(channel_name)s).%(ext)s",
"--latency", "high", "--log-level", "warn", "--no-remux", "--wait"]
dl_list.append(execute(fc2_args))
lastid = mail["id"]
if len(rc_list) == 0 and numofvid > 0 and len(dl_list) == 0:
rclone_args = ["rclone", "move", "--config", rclone_cfg_path, download_path, remote_path]
rc_list.append(execute(rclone_args))
dl_list, vids = check(dl_list)
numofvid = numofvid + vids
rc_list, vids = check(rc_list)
if vids > 0:
numofvid = 0
time.sleep(2)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment