Skip to content

Instantly share code, notes, and snippets.

@Virace
Created February 18, 2021 16:18
Show Gist options
  • Save Virace/2e3b2a37acb7cd75af04b2d610be28f8 to your computer and use it in GitHub Desktop.
Save Virace/2e3b2a37acb7cd75af04b2d610be28f8 to your computer and use it in GitHub Desktop.
监测斗鱼直播间直播状态并微信推送
# -*- coding: utf-8 -*-
# Author: Virace
# 斗鱼订阅推送
import time
import requests
import logging
from typing import Union
logging.basicConfig(level=logging.INFO)
log = logging.getLogger()
# 修改为自己的推送token, 获取方法见下面官网
PUSH_PLUS_TOKEN = ''
def notification_push(title: str, msg: Union[str, dict], topic='', template='json'):
"""
push+推送 官网: https://pushplus.hxtrip.com/
:param title: 标题
:param msg: 消息
:param topic: 推送群组ID(一对多推送时使用)
:param template: json/html
:return:
"""
url = f'http://pushplus.hxtrip.com/send'
data = {'token': PUSH_PLUS_TOKEN,
'title': title,
'content': msg,
'topic': topic,
"template": template}
response = requests.post(url, json=data, headers={'Content-Type': 'application/json'})
response.raise_for_status()
log.debug(response.json())
def get_status(rid: str) -> tuple:
"""
通过斗鱼网页端搜索接口获取直播间状态
:param rid: 直播间ID
:return: 返回元组格式(状态, 源)
"""
url = f'https://www.douyu.com/japi/search/api/getSearchRec?kw={rid}'
response = requests.get(url)
response.raise_for_status()
data = response.json()['data']
log.debug(data)
for item in data['roomResult']:
if item['rid'] == int(rid):
return item['isLive'] == 1, item
def monitor_and_notify(rid: str, push_id='', t=10):
"""
监测并推送
:param rid: 直播间ID
:param push_id: push+ 推送群组ID
:param t: 检测循环时间(秒)
:return:
"""
while True:
status, data = get_status(rid)
data['_'] = '以下内容为DouYu getSearchRec接口返回内容'
if status:
notification_push(
f'{time.strftime("%Y年%m月%d日 %H点%M分%S秒", time.localtime())} - 您关注的主播 {data["nickName"]} 正在直播!',
data,
push_id
)
break
time.sleep(t)
if __name__ == '__main__':
monitor_and_notify('71415', '71415')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment