Skip to content

Instantly share code, notes, and snippets.

@TransparentLC
Last active April 23, 2023 01:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TransparentLC/83fa438907708b476e29930f5c76ddda to your computer and use it in GitHub Desktop.
Save TransparentLC/83fa438907708b476e29930f5c76ddda to your computer and use it in GitHub Desktop.
在一些账号有更新时使用 Windows 的通知卡片发送提醒
import abc
import bs4
import csv
import dataclasses
import functools
import hashlib
import lxml.etree
import orjson
import os
import re
import requests
import rich.console
import rich.table
import secrets
import subprocess
import tempfile
import typing
import warnings
import time
import zstandard
from concurrent.futures import ThreadPoolExecutor
__import__('requests.models').Response.json = lambda self, **kwargs: orjson.loads(self.content)
warnings.filterwarnings('ignore', category=bs4.MarkupResemblesLocatorWarning)
WATCH_TIMEOUT = 5
DOWNLOAD_CHUNK = 262144
@functools.total_ordering
@dataclasses.dataclass
class WatchItem:
id: typing.Hashable
time: time.struct_time
text: str
url: str
image: str | None
info: dict[str] | None
extra: dict[str] | None
def __hash__(self) -> int:
return hash(self.id)
def __eq__(self, other: 'WatchItem') -> bool:
return self.time == other.time
def __lt__(self, other: 'WatchItem') -> bool:
return self.time < other.time
class Watcher(abc.ABC):
sublatestItem: WatchItem | None = None
latestItem: WatchItem | None = None
session: requests.Session
name: str
icon: str | None = None
url: str
def createSession(self):
self.session = requests.Session()
self.session.hooks['response'].append(lambda r, *args, **kwargs: r.raise_for_status())
def notify(self) -> None:
if self.icon:
iconPath = os.path.join(tempfile.gettempdir(), hashlib.sha256(self.icon.encode()).hexdigest())
if not os.path.exists(iconPath):
with (
open(iconPath, 'wb') as f,
self.session.get(self.icon, stream=True) as r,
):
for chunk in r.iter_content(DOWNLOAD_CHUNK):
f.write(chunk)
if self.latestItem.image:
imagePath = os.path.join(tempfile.gettempdir(), secrets.token_hex(32))
with (
open(imagePath, 'wb') as f,
self.session.get(self.latestItem.image, stream=True) as r,
):
for chunk in r.iter_content(DOWNLOAD_CHUNK):
f.write(chunk)
xml = lxml.etree.XML('''
<toast activationType="protocol">
<visual>
<binding template="ToastGeneric">
<text></text>
<text></text>
<image placement="appLogoOverride" hint-crop="circle" src="" />
<image placement="hero" src="" />
</binding>
</visual>
<actions>
<action content="查看" activationType="protocol" arguments="" />
</actions>
<audio src="ms-winsoundevent:Notification.Reminder" />
</toast>
''', parser=lxml.etree.XMLParser(remove_blank_text=True))
xml.find('.//text[1]').text = self.name
xml.find('.//text[2]').text = self.latestItem.text
xml.find('.//action').set('arguments', self.latestItem.url)
if self.icon:
xml.find('.//image[@placement="appLogoOverride"]').set('src', iconPath)
else:
xml.find('.//binding').remove(xml.find('.//image[@placement="appLogoOverride"]'))
if self.latestItem.image:
xml.find('.//image[@placement="hero"]').set('src', imagePath)
else:
xml.find('.//binding').remove(xml.find('.//image[@placement="hero"]'))
script = f'''
[Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null
[Windows.UI.Notifications.ToastNotification, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null
[Windows.Data.Xml.Dom.XmlDocument, Windows.Data.Xml.Dom.XmlDocument, ContentType = WindowsRuntime] | Out-Null
$d = [Windows.Data.Xml.Dom.XmlDocument]::New()
$d.loadXml(@"
{lxml.etree.tostring(xml, pretty_print=False).decode()}
"@)
[Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier("Multi Notifier").Show([Windows.UI.Notifications.ToastNotification]::New($d))
'''.strip()
subprocess.run(
(
'powershell', '-Command', script,
),
)
if imagePath:
os.remove(imagePath)
@abc.abstractmethod
def watch(self) -> WatchItem:
pass
class WeiboWatcher(Watcher):
containerid: int
def __init__(self, uid: int) -> None:
self.createSession()
self.session.headers['Referer'] = 'https://m.weibo.cn/'
self.url = f'https://m.weibo.cn/u/{uid}'
try:
r = self.session.get(
'https://m.weibo.cn/api/container/getIndex',
params={
'type': 'uid',
'value': uid,
},
)
d = r.json()['data']
self.name = d['userInfo']['screen_name']
self.icon = d['userInfo']['profile_image_url'].split('?')[0]
for tab in d['tabsInfo']['tabs']:
if tab['tab_type'] == 'weibo':
self.containerid = int(tab['containerid'])
break
except:
self.name = self.containerid = self.icon = None
def watch(self) -> WatchItem:
r = self.session.get(
'https://m.weibo.cn/api/container/getIndex',
params={
'containerid': self.containerid,
},
timeout=WATCH_TIMEOUT,
)
self.sublatestItem = self.latestItem
self.latestItem = max(
(
WatchItem(
id=int(x['mblog']['id']),
time=time.strptime(x['mblog']['created_at'], '%a %b %d %H:%M:%S %z %Y'),
text=bs4.BeautifulSoup(
re.sub(
r'<span class="url-icon"><img alt=(.+?) src=".+?" .*?/></span>',
'\g<1>',
x['mblog']['text'],
),
features='lxml',
).text.strip(),
url=f"https://m.weibo.cn/detail/{x['mblog']['id']}",
image=(
x['mblog']['pics'][0]['large']['url']
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], list) else
(
x['mblog']['pics']['0']['large']['url']
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], dict) else
None
)
),
info={
':repeat:': x['mblog']['reposts_count'],
':speech_balloon:': x['mblog']['comments_count'],
':thumbs_up:': x['mblog']['attitudes_count'],
},
extra={
'images': (
tuple(x['large']['url'] for x in x['mblog']['pics'])
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], list) else
(
tuple(x['large']['url'] for i, x in x['mblog']['pics'].items() if i.isnumeric())
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], dict) else
tuple()
)
),
},
)
for x in r.json()['data']['cards']
if x['card_type'] == 9
),
key=lambda e: e.time,
)
return self.latestItem
class ZhihuWatcher(Watcher):
def __init__(self, username: str) -> None:
self.createSession()
self.url = f'https://www.zhihu.com/people/{username}'
try:
r = self.session.get(self.url)
d = orjson.loads(re.search(rb'<script id="js-initialData" type="text/json">(.+?)</script>', r.content).group(1))['initialState']['entities']
self.name = d['users'][username]['name']
self.icon = d['users'][username]['avatarUrl'].split('?')[0]
except:
self.name = self.icon = None
def watch(self) -> WatchItem:
r = self.session.get(self.url, timeout=WATCH_TIMEOUT)
d = orjson.loads(re.search(rb'<script id="js-initialData" type="text/json">(.+?)</script>', r.content).group(1))['initialState']['entities']
a = max(d['activities'].values(), key=lambda e: e['createdTime'])
self.sublatestItem = self.latestItem
match a['target']['schema']:
case 'answer':
self.latestItem = WatchItem(
id=int(a['target']['id']),
time=time.localtime(a['createdTime']),
text=a['actionText'] + ':' + d['answers'][str(a['target']['id'])]['question']['title'] + '\n' + bs4.BeautifulSoup(d['answers'][str(a['target']['id'])]['excerpt'], features='lxml').text.strip(),
url=f"https://www.zhihu.com/answer/{a['target']['id']}",
image=None,
info={
':speech_balloon:': d['answers'][str(a['target']['id'])]['commentCount'],
':thumbs_up:': d['answers'][str(a['target']['id'])]['voteupCount'],
},
extra=None,
)
case 'question':
self.latestItem = WatchItem(
id=int(a['target']['id']),
time=time.localtime(a['createdTime']),
text=a['actionText'] + ':' + d['questions'][str(a['target']['id'])]['title'],
url=f"https://www.zhihu.com/question/{a['target']['id']}",
image=None,
info=None,
extra=None,
)
case 'pin':
self.latestItem = WatchItem(
id=int(a['target']['id']),
time=time.localtime(a['createdTime']),
text=a['actionText'] + ':' + d['pins'][str(a['target']['id'])]['content'][0]['content'],
url=f"https://www.zhihu.com/pin/{a['target']['id']}",
image=d['pins'][str(a['target']['id'])]['content'][1]['url'] if len(d['pins'][str(a['target']['id'])]['content']) > 1 else None,
info={
':speech_balloon:': d['pins'][str(a['target']['id'])]['commentCount'],
':thumbs_up:': d['pins'][str(a['target']['id'])]['reactionCount'],
},
extra=None,
)
case 'article':
self.latestItem = WatchItem(
id=int(a['target']['id']),
time=time.localtime(a['createdTime']),
text=a['actionText'] + ':' + d['articles'][str(a['target']['id'])]['title'] + '\n' + bs4.BeautifulSoup(d['articles'][str(a['target']['id'])]['excerpt'], features='lxml').text.strip(),
url=f"https://zhuanlan.zhihu.com/p/{a['target']['id']}",
image=d['articles'][str(a['target']['id'])]['imageUrl'].split('?')[0] or None,
info={
':speech_balloon:': d['articles'][str(a['target']['id'])]['commentCount'],
':thumbs_up:': d['articles'][str(a['target']['id'])]['voteupCount'],
},
extra=None,
)
return self.latestItem
if __name__ == '__main__':
console = rich.console.Console(highlight=False)
with (
ThreadPoolExecutor(os.cpu_count() * 4) as executor,
zstandard.open('multi-watcher.csv.zst', 'w', encoding='utf-8', cctx=zstandard.ZstdCompressor(22)) as f,
):
csvfile = csv.DictWriter(f, (
'id',
'author',
'watcher',
'url',
'time',
'text',
'image',
'extra',
))
csvfile.writeheader()
watchers: tuple[Watcher, ...] = tuple(x for x in (
*executor.map(lambda e: ZhihuWatcher(e), (
'wu-qing-qi-fen-pi', # 这里是安全的
'reseted1676580842000', # 这里是安全滴
'ncc21382', # ncc21382
)),
*executor.map(lambda e: WeiboWatcher(e), (
5514879288, # 江江bot
6339370357, # 江图社
7166086720, # 冮冮bot
)),
) if x.name)
def wrappedWatch(watcher: Watcher):
try:
return watcher.watch()
except:
return watcher.latestItem
try:
while True:
ts = time.perf_counter()
for _ in executor.map(wrappedWatch, watchers):
pass
te = time.perf_counter()
table = rich.table.Table(
'author/time', 'content/link',
header_style='bold red',
show_lines=True,
)
for watcher in watchers:
if watcher.latestItem:
if (
(watcher.sublatestItem is None and time.time() - time.mktime(watcher.latestItem.time) < 3600) or
(watcher.sublatestItem is not None and watcher.sublatestItem < watcher.latestItem)
):
executor.submit(watcher.notify)
csvfile.writerow({
'id': watcher.latestItem.id,
'author': watcher.name,
'watcher': watcher.__class__.__name__,
'url': watcher.latestItem.url,
'time': time.strftime('%Y-%m-%d %H:%M:%S', watcher.latestItem.time),
'text': watcher.latestItem.text,
'image': watcher.latestItem.image or '',
'extra': orjson.dumps(watcher.latestItem.extra).decode() if watcher.latestItem.extra else '',
})
table.add_row(
f'[link={watcher.url}]{watcher.name}[/link]\n{time.strftime("%Y-%m-%d %H:%M:%S", watcher.latestItem.time)}',
watcher.latestItem.text + '\n' + ' '.join((
*(f'{k}{v:>4}' for k, v in watcher.latestItem.info.items()),
f':link:{watcher.latestItem.url}',
)),
)
os.system('cls' if os.name == 'nt' else 'clear')
console.print(table)
console.print('Updated', len(watchers), 'watchers at', time.strftime('%Y-%m-%d %H:%M:%S'), 'in', f'{(te - ts) * 1000:.2f}ms', style='italic bright_black', end='')
time.sleep(20)
except KeyboardInterrupt:
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment