Skip to content

Instantly share code, notes, and snippets.

@sundevilyang
Forked from youfou/ad_urls.json
Created April 18, 2017 03:00
Show Gist options
  • Save sundevilyang/139f158f529a21e4a24844e892ebbb8c to your computer and use it in GitHub Desktop.
Save sundevilyang/139f158f529a21e4a24844e892ebbb8c to your computer and use it in GitHub Desktop.
响应好友请求 / 自动聊天 / 限制频率 / 邀请入群 / 远程群管理 / 新人欢迎消息 / 关键词问答 / 发心跳 / 远程命令 / 远程执行代码
#!/usr/bin/env python3
# coding: utf-8
import datetime
import logging
import os
import re
import subprocess
import time
from functools import wraps
from pprint import pformat
from threading import Thread
"""
psutil 模块用于监控进程状态,例如内存占用情况,请自行安装:
pip3 install -U psutil
"""
import psutil
from wxpy import *
from wxpy.utils.misc import get_text_without_at_bot
logging.basicConfig(level=logging.INFO)
# 下面的 console_qr 参数,请自行按需调整
bot = Bot('bot.pkl', console_qr=-2)
# 防止登错账号
if 'wxpy' not in bot.self.name:
raise ValueError('Wrong User!')
# 定义远程管理员 (用于远程管理),使用备注名更安全
remote_admin = ensure_one(bot.friends().search(remark_name='youfou'))
# 使用 wxid 找到需要管理的微信群
bot.groups(True)
group_ids = (
# wxpy 交流群 🐰
'6411313640@chatroom',
# wxpy 交流群 🐱
'6788356306@chatroom',
# wxpy 交流群 🐨
'6737430866@chatroom',
)
wxpy_groups = list()
for wxid in group_ids:
g = bot.groups().search(wxid=wxid)[0]
wxpy_groups.append(g)
# 初始化聊天机器人
tuling = Tuling()
# 自动回答关键词
kw_replies = {
'wxpy 项目主页:\ngithub.com/youfou/wxpy': (
'项目', '主页', '官网', '网站', 'github', '地址', 'repo', '版本'
),
'wxpy 在线文档:\nwxpy.readthedocs.io': (
'请问', '文档', '帮助', '怎么', '如何', '请教', '安装', '说明'
),
'必看: 常见问题 FAQ:\nwxpy.readthedocs.io/faq.html': (
'faq', '常见', '问题', '问答', '什么'
)
}
# 新人入群的欢迎语
welcome_text = '''🎉 欢迎 @{} 的加入!
😃 请勿在本群使用机器人
📖 提问前请看 t.cn/R6VkJDy'''
# 新人入群通知的匹配正则
rp_new_member_name = (
re.compile(r'^"(.+)"通过'),
re.compile(r'邀请"(.+)"加入'),
)
# 远程踢人命令: @<机器人> 移出 @<需要被移出的人>
rp_kick = re.compile(r'^@.+移出\s*@(.+?)(?:\u2005?\s*$)')
def update_groups():
remote_admin.send('updating groups...')
for _group in wxpy_groups:
_group.update_group()
remote_admin.send('{}: {}'.format(_group.name, len(_group)))
return True
process = psutil.Process()
def status_text():
uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(process.create_time())
memory_usage = process.memory_info().rss
return '{uptime}, {memory}'.format(
uptime=str(uptime).split('.')[0],
memory='{:.2f} MB'.format(memory_usage / 1024 ** 2)
)
def send_status_text():
return remote_admin.send(status_text())
# 定时报告进程状态
def heartbeat():
while bot.alive:
time.sleep(600)
# noinspection PyBroadException
try:
send_status_text()
except:
logger.exception('failed to report heartbeat:')
heartbeat_thread = Thread(target=heartbeat, daemon=True, name='heartbeat')
heartbeat_thread.start()
def remote_eval(source):
try:
ret = eval(source, globals())
except (SyntaxError, NameError):
return
except Exception as e:
ret = e
logger.info('remote eval executed:\n{}'.format(source))
remote_admin.send(pformat(ret))
return True
def remote_shell(cmd):
if cmd.startswith('!'):
cmd = cmd[1:]
logger.info('executing remote shell cmd:\n{}'.format(cmd))
r = subprocess.run(
cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True
)
if r.stdout:
remote_admin.send(r.stdout)
else:
remote_admin.send('[OK]')
return True
def restart():
remote_admin.send('restarting bot...')
bot.dump_login_status()
os.execv(sys.executable, [sys.executable] + sys.argv)
# 远程命令 (单独发给机器人的消息)
remote_orders = {
'groups': update_groups,
'status': send_status_text,
'restart': restart
}
# 若消息文本为为远程命令,则执行对应函数
# 若消息文本以 ! 开头,则作为 shell 命令执行
# 若不满足以上,则尝试直接将 msg.text 作为 Python 代码执行
# 若有执行以上任何内容,则返回 True
def exec_remote_order(msg):
if msg.sender == remote_admin:
order = remote_orders.get(msg.text.lower().strip())
if order:
logger.info('executing remote order: {}'.format(order.__name__))
order()
return True
elif remote_shell(msg.text):
return True
elif remote_eval(msg.text):
return True
def reply_by_keyword(msg):
for reply, keywords in kw_replies.items():
for kw in keywords:
if kw in msg.text.lower():
logger.info('reply by keyword: \n{}: "{}"\nreplied: "{}"'.format(
(msg.member or msg.chat).name, msg.text, reply))
msg.reply(reply)
return reply
# 验证入群口令
def valid(msg):
return 'wxpy' in msg.text.lower()
# 自动选择未满的群
def get_group():
wxpy_groups.sort(key=len, reverse=True)
for _group in wxpy_groups:
if len(_group) < 495:
return _group
else:
logger.warning('群都满啦!')
return wxpy_groups[-1]
# 邀请入群
def invite(user):
joined = list()
for group in wxpy_groups:
if user in group:
joined.append(group)
if joined:
joined_nick_names = '\n'.join(map(lambda x: x.nick_name, joined))
logger.info('{} is already in\n{}'.format(user, joined_nick_names))
user.send('你已加入了\n{}'.format(joined_nick_names))
else:
group = get_group()
user.send('验证通过 [嘿哈]')
group.add_members(user, use_invitation=True)
# 限制频率: 指定周期内超过消息条数,直接回复 "🙊"
def freq_limit(period_secs=10, limit_msgs=3):
def decorator(func):
@wraps(func)
def wrapped(msg):
now = datetime.datetime.now()
period = datetime.timedelta(seconds=period_secs)
recent_received = 0
for m in msg.bot.messages[::-1]:
if m.sender == msg.sender:
if now - m.create_time > period:
break
recent_received += 1
if recent_received > limit_msgs:
if not isinstance(msg.chat, Group) or msg.is_at:
return '🙊'
return func(msg)
return wrapped
return decorator
def get_new_member_name(msg):
# itchat 1.2.32 版本未格式化群中的 Note 消息
from itchat.utils import msg_formatter
msg_formatter(msg.raw, 'Text')
for rp in rp_new_member_name:
match = rp.search(msg.raw['Text'])
if match:
return match.group(1)
def remote_kick(msg):
if msg.is_at and msg.type is TEXT:
match = rp_kick.search(msg.text)
if match:
if msg.member != remote_admin:
raise ValueError('Wrong admin: {}'.format(msg.member))
name_to_kick = match.group(1)
member_to_kick = ensure_one(list(filter(
lambda x: x.name == name_to_kick, msg.chat)))
if member_to_kick in (bot.self, remote_admin):
raise ValueError('Wrong member to kick: {}'.format(member_to_kick))
else:
member_to_kick.remove()
msg.chat.send('已移出 {}'.format(name_to_kick))
return True
def semi_sync(msg, groups):
if msg.is_at:
msg.text = get_text_without_at_bot(msg)
if msg.text:
sync_message_in_groups(
msg, groups, suffix='↑隔壁消息↑回复请@机器人')
# 判断消息是否为支持回复的消息类型
def supported_msg_type(msg, reply_unsupported=False):
supported = (TEXT,)
ignored = (SYSTEM, NOTE, FRIENDS)
fallback_replies = {
RECORDING: '🙉',
PICTURE: '🙈',
VIDEO: '🙈',
}
if msg.type in supported:
return True
elif msg.type not in ignored and reply_unsupported:
msg.reply(fallback_replies.get(msg.type, '🐒'))
# 响应好友请求
@bot.register(msg_types=FRIENDS)
def new_friends(msg):
user = msg.card.accept()
if valid(msg):
invite(user)
else:
user.send('Hello {},你忘了填写加群口令,快回去找找口令吧'.format(user.name))
# 响应好友消息,限制频率
@bot.register(Friend)
@freq_limit()
def exist_friends(msg):
if supported_msg_type(msg, reply_unsupported=True):
if valid(msg):
invite(msg.sender)
return
elif reply_by_keyword(msg):
return
tuling.do_reply(msg)
@bot.register(remote_admin, msg_types=TEXT)
def reply_remote_admin(msg):
"""
响应远程管理员
内容解析方式优先级:
1. 若为远程命令,则执行远程命令 (额外定义,一条命令对应一个函数)
2. 若消息文本以 ! 开头,则作为 shell 命令执行
3. 尝试作为 Python 代码执行 (可执行大部分 Python 代码)
4. 若以上不满足或尝试失败,则作为普通聊天内容回复
"""
# 上述的 1. 和 2.
if exec_remote_order(msg):
return
# 上述的 3.
return exist_friends(msg)
# 在其他群中回复被 @ 的消息
@bot.register(Group)
def reply_other_group(msg):
if msg.chat not in wxpy_groups and msg.is_at:
if supported_msg_type(msg, reply_unsupported=True):
tuling.do_reply(msg)
# wxpy 群的消息处理
@bot.register(wxpy_groups)
def wxpy_group(msg):
if msg.is_at and remote_kick(msg):
return
semi_sync(msg, wxpy_groups)
# 新人欢迎消息
@bot.register(wxpy_groups, NOTE)
def welcome(msg):
name = get_new_member_name(msg)
if name:
return welcome_text.format(name)
def get_logger(level=logging.DEBUG, file='bot.log', mode='w'):
log_formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
log_formatter_lite = logging.Formatter('%(name)s:%(levelname)s:%(message)s')
_logger = logging.getLogger()
for hdlr in _logger.handlers:
_logger.removeHandler(hdlr)
# 输出到文件
if file:
file_hdlr = logging.FileHandler(file, mode)
file_hdlr.setFormatter(log_formatter)
_logger.addHandler(file_hdlr)
# 输出到屏幕
console_hdlr = logging.StreamHandler()
console_hdlr.setLevel(logging.WARNING)
console_hdlr.setFormatter(log_formatter)
_logger.addHandler(console_hdlr)
# 输出到远程管理员微信
wechat_hdlr = WeChatLoggingHandler(remote_admin)
wechat_hdlr.setLevel(logging.WARNING)
wechat_hdlr.setFormatter(log_formatter_lite)
_logger.addHandler(wechat_hdlr)
# 将未捕捉异常也发送到日志中
sys.excepthook = lambda *args: logger.critical(
'UNCAUGHT EXCEPTION:', exc_info=args)
for m in 'requests', 'urllib3':
logging.getLogger(m).setLevel(logging.WARNING)
_logger.setLevel(level)
return _logger
logger = get_logger()
send_status_text()
bot.dump_login_status()
bot.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment