-
-
Save sae13/d9167bd58b77561eccb2079338d3ea39 to your computer and use it in GitHub Desktop.
send latest versions of ss apks to telegram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DEBUG=10 | |
TG_API_ID=61369 | |
TG_API_HASH=a2a6455ff461sdfsdf532ce03795278fb92f56 | |
TG_BOT_TOKEN=6281007953:AAHE4sdsdfsdfl9h3XsdfsdfJK6-yWhaTdxkCE8pxizJbXgqQ | |
TG_CHANNEL_ID=-1000000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pyrogram | |
tgcrypto | |
requests | |
tqdm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import shelve | |
from pathlib import Path | |
from typing import Callable, Dict, List | |
from pyrogram import Client | |
from requests import get | |
import logging | |
import os | |
from tqdm import tqdm | |
logging.basicConfig(filename=f"{os.environ['HOME']}/shadow_files_pyrogram.log", | |
level=int(os.getenv('DEBUG', '20')), | |
format="%(levelname)s - %(asctime)s - %(funcName)s - %(message)s" | |
) | |
db: shelve.Shelf = shelve.open("github.db") | |
def download_file(url, path): | |
response = get(url, stream=True) | |
total_size_in_bytes = int(response.headers.get('content-length', 0)) | |
block_size = 1024 # 1 Kibibyte | |
progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True) | |
with open(path, 'wb') as file: | |
for data in response.iter_content(block_size): | |
progress_bar.update(len(data)) | |
file.write(data) | |
progress_bar.close() | |
if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes: | |
print("ERROR, something went wrong") | |
class Github: | |
version: str = None | |
get_name: Callable[['Github'], str] = None | |
release: List[Dict[str, object]] = None | |
file_path: str = None | |
file_name: str = None | |
dl_url: str = None | |
def __init__(self, url, title, description, | |
get_name: Callable[['Github'], str], | |
find_download_link_by_file_title=False, | |
): | |
self.title = title | |
self.url = url | |
self.description = description | |
self.get_name = get_name | |
self.find_download_link_by_file_title = find_download_link_by_file_title | |
self.download_latest() | |
def download_latest(self): | |
logging.debug(f"download_latest {self.title} ") | |
if not self.find_download_link_by_file_title: | |
release = get(f"https://api.github.com/repos/{self.url}/releases").json()[0] | |
else: | |
release = list(filter(lambda x: str(x['name']).startswith(self.title), | |
get(f"https://api.github.com/repos/{self.url}/releases").json()))[0] | |
self.version = release['tag_name'] | |
self.release = release | |
self.file_name = self.get_name(self) | |
print(self.file_name) | |
self.dl_url = list(filter(lambda x: x['name'] == self.file_name, release['assets']))[0]['browser_download_url'] | |
self.file_path = f'/tmp/{self.file_name}' | |
if Path(self.file_path).exists(): | |
logging.info(f"download_latest file already exists {self.title} ") | |
return self.file_path | |
logging.debug(f"download_latest start downloading {self.title} from {self.dl_url} to {self.file_path}") | |
download_file(self.dl_url, self.file_path) | |
logging.info(f"download_latest finished downloading {self.title} from {self.dl_url} to {self.file_path}") | |
return self.file_path | |
def _get_name_ss(x: Github): | |
return f"{x.title}-{str(x.version)[1:]}.apk" | |
def _get_name_matsuri(x: Github): | |
return f"{x.title}-{str(x.version)}-arm64-v8a.apk" | |
def _get_name_matsuri_xray(x: Github): | |
return x.release['assets'][0]['name'] | |
def _get_name_xray(x: Github): | |
return x.release['assets'][0]['name'] | |
def _get_name_singbox(x: Github): | |
print(x) | |
return f"SFA-{str(x.version)[1:]}-universal.apk" | |
def get_apks(): | |
return [ | |
Github('MatsuriDayo/NekoBoxForAndroid', 'NB4A', 'nekobox for android', | |
get_name=_get_name_matsuri), | |
Github('SagerNet/sing-box', 'sing-box', | |
"""سینگ باکس . دانلود ایفون: | |
https://69b.ir/sf4i | |
کانفیگ yebekhe : | |
https://t.me/ss_apk/87 | |
""", | |
get_name=_get_name_singbox)] | |
#apks = db.get('apks') #get_apks() | |
apks = get_apks() | |
db['apks'] = apks | |
async def main(): | |
async with Client('shadow_files_pyrogram', | |
bot_token=os.getenv('TG_BOT_TOKEN'), | |
api_id=os.getenv('TG_API_ID'), | |
api_hash=os.getenv('TG_API_HASH'), | |
# proxy={ | |
# "scheme": "socks5", # "socks4", "socks5" and "http" are supported | |
# "hostname": "127.0.0.1", | |
# "port": 2080, | |
# # "username": "username", | |
# # "password": "password" | |
# } | |
) as telegram: | |
apk:Github | |
for apk in apks: | |
channel_id = os.getenv('TG_CHANNEL_ID') | |
main_key = f"tg_{channel_id}_{apk.title}" | |
db_key = f"{main_key}_{apk.file_name}" | |
if db.get(db_key) is None: | |
resp = await telegram.send_document(os.getenv('TG_CHANNEL_ID'), apk.file_path, | |
caption=f"{apk.description}\n{apk.dl_url}\n{apk.file_name}") | |
db[db_key] = resp | |
v: pyrogram.types.Message | |
for k, v in db.items(): | |
logging.info(f"deleting {k} db_key={db_key} main_key={main_key}") | |
if k.startswith(main_key) and k != db_key: | |
try: | |
await telegram.delete_messages(channel_id, v.id) | |
db[f"TG_ARCHIVE__{k}"] = db.get(k) | |
del db[k] | |
except Exception as e: | |
logging.exception("delete old apk file") | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
sh -c "cd /home/saeb/shadow_files_pyrogram/;export $(xargs < .env);python3 main.py"