Skip to content

Instantly share code, notes, and snippets.

@lenvon2012
Forked from youfou/ad_urls.json
Created August 23, 2018 17:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lenvon2012/13a209488d9f15adf6d015952ec860e8 to your computer and use it in GitHub Desktop.
Save lenvon2012/13a209488d9f15adf6d015952ec860e8 to your computer and use it in GitHub Desktop.
响应好友请求 / 自动聊天 / 限制频率 / 邀请入群 / 远程群管理 / 新人欢迎消息 / 关键词问答 / 发心跳 / 远程命令 / 远程执行代码
{
"xiaohongshu.com": "小红书",
"vip.com": "唯品会",
"douguo.com": "豆果美食",
"youshu.cc": "有书",
"missfresh.cn": "每日优鲜",
"qnr.io": "去哪儿",
"kaola.com": "网易考拉",
"waimai.meituan.com": "美团外卖",
"qcs.meituan.com": "美团打车",
"ele.me": "饿了么",
"dianping.com": "大众点评",
"xiaojukeji.com": "滴滴出行",
"pagoda.com.cn": "百果园",
"ricebook.com": "ENJOY",
"ofo.so": "ofo",
"youzan.com": "有赞",
"jd.com": "京东",
"qr.alipay.com": "支付宝"
}
#!/usr/bin/env python3
# coding: utf-8
"""
wxpy 机器人正在使用的代码
** 这些代码无法独立运行,但可用于参考 **
需要安装新内核分支 new-core
pip3 install -U git+https://github.com/youfou/wxpy.git@new-core
"""
import datetime
import os
import re
import subprocess
import time
from collections import Counter
from functools import wraps
from io import BytesIO
from pprint import pformat
# 该模块可以用 pip 安装
import psutil
from requests import RequestException
import zbarlight
from PIL import Image
from wxpy import *
from wxpy.utils import ensure_list, start_new_thread
from is_ad import is_ad
# 以下两个模块可在 Gist 中获取 (自备梯子):
# http://gist.github.com/youfou/03c1e0204ac092f873730f51671ce0a8
from kick_votes import KickVotes
# 以下两个模块分别用于远程管理和发送短信提醒,但并未公开
from remote import run_flask_app
from sms import send_sms
from timed_list import TimedList
# ---------------- 配置开始 ----------------
# Bot 对象初始化时的 console_qr 参数值
console_qr = True
# 机器人昵称 (防止登错账号)
bot_name = 'wxpy 机器人'
# 入群口令
group_code = 'wxpy'
# 自动回答关键词
kw_replies = {
'wxpy 项目主页:\ngithub.com/youfou/wxpy': (
'项目', '主页', '官网', '网站', 'github', '地址', 'repo', '版本'
),
'wxpy 在线文档:\nwxpy.readthedocs.io': (
'请问', '文档', '帮助', '怎么', '如何', '请教', '安装', '说明', '运行'
),
'必看: 常见问题 FAQ:\nwxpy.readthedocs.io/faq.html': (
'faq', '常见', '问题', '问答', '什么'
),
(__file__, FILE): (
'源码', '代码'
)
}
# 新人入群的欢迎语
welcome_text = '''🎉 欢迎 @{} 的加入!
😃 请勿在本群使用机器人
📖 提问前请看 t.cn/R6VkJDy'''
help_info = '''😃 讨论主题
· 本群主题为 wxpy 与 Python
· 不限制其他话题,请区分优先级
· 支持分享对群员有价值的信息
⚠️ 注意事项
· 除群主外,勿在群内使用机器人
· 严禁灰产/黑产相关内容话题
· 请勿发布对群员无价值的广告
👮 投票移出
· 移出后将被拉黑 24 小时
· 请在了解事因后谨慎投票
· 命令格式: "移出 @人员"
🔧 实用链接
· 文档: url.cn/4660Oil
· 示例: url.cn/49t5O4x
· 项目: url.cn/463SJb8
'''
# ---------------- 配置结束 ----------------
logging.basicConfig(level=logging.DEBUG)
qr_path = 'static/qrcode.png'
sms_sent = False
def show_qrcode(core):
global sms_sent
Core.show_qrcode(core)
if not sms_sent:
# 发送短信
send_sms()
sms_sent = True
def logged_in(core):
core.remove_qrcode()
# noinspection PyUnusedLocal
def new_member(core, member):
# if member.group in groups:
# member.group.send(welcome_text.format(member.name))
pass
# noinspection PyUnusedLocal
def deleting_member(core, member):
admin_group.send('[member left]\nmember: {0.name}\ngroup: {1.name}'.format(member, member.group))
def _restart():
os.execv(sys.executable, [sys.executable] + sys.argv)
def logged_out(core, reason):
logger.critical('{} logged out:\n{}'.format(core, reason))
_restart()
process = psutil.Process()
def _status_text():
uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(process.create_time())
memory_usage = process.memory_info().rss
if globals().get('bot'):
messages = bot.messages
else:
messages = list()
return '[now] {now:%H:%M:%S}\n[uptime] {uptime}\n[memory] {memory}\n[messages] {messages}'.format(
now=datetime.datetime.now(),
uptime=str(uptime).split('.')[0],
memory='{:.2f} MB'.format(memory_usage / 1024 ** 2),
messages=len(messages)
)
start_new_thread(run_flask_app, (qr_path, _status_text))
bot = Bot('bot.pkl', console_qr=True, qr_path=qr_path, hooks=dict(
show_qrcode=show_qrcode,
logged_in=logged_in,
logged_out=logged_out,
new_member=new_member,
deleting_member=deleting_member,
))
if bot.name != bot_name:
logging.error('Wrong User!')
bot.logout()
_restart()
admins = bot.friends.get(remark_name='Youfou'), bot.self
admin_group = bot.groups.get('wxpy admins')
groups = list(map(lambda x: bot.groups.get(x), (
'wxpy 交流群 🐰',
'wxpy 交流群 🐱',
'wxpy 交流群 🐨',
'wxpy 交流群 🐹',
'wxpy 交流群 🐼',
)))
test_group = bot.groups.get('wxpy test')
preview_group = bot.groups.get('wxpy preview')
# 初始化图灵机器人
tuling = Tuling()
# 远程踢人命令: 移出 @<需要被移出的人>
rp_kick = re.compile(
# 普通踢人命令
r'^(?:移出|移除|踢出)\s*@(?P<name_to_kick>.+?)(?:\u2005?\s*$)|'
# 详细踢人命令:
'^👉 复制移出 \(#(?P<option_id>\d+)\)\n'
# 真实昵称
'真实昵称: (?P<nickname>.+?)\n'
# 群内昵称
'(?:群内昵称: (?P<display_name>.+?)\n)?'
# 省份 / 城市 / 性别
'(?P<province>.+?) / (?P<city>.+?) / (?P<sex>.+?)'
# 签名
'(?:\n签名: (?P<signature>.+))?$'
)
kick_votes = KickVotes(300)
votes_to_kick = 5
black_list = TimedList()
def from_admin(msg):
"""
判断 msg 的发送者是否为管理员
"""
from_user = msg.member if isinstance(msg.chat, Group) else msg.sender
return from_user in admins
def admin_auth(func):
"""
装饰器: 验证函数的第 1 个参数 msg 是否来自 admins
"""
@wraps(func)
def wrapped(*args, **kwargs):
msg = args[0]
if from_admin(msg):
return func(*args, **kwargs)
else:
raise ValueError('{} is not an admin!'.format(msg))
return wrapped
def send_iter(receiver, iterable):
"""
用迭代的方式发送多条消息
:param receiver: 接收者
:param iterable: 可迭代对象
"""
if isinstance(iterable, str):
raise TypeError
for msg in iterable:
receiver.send(msg)
def update_groups():
yield 'updating groups...'
for _group in groups:
_group.update()
yield '{}: {}'.format(_group.name, len(_group))
def status_text():
yield _status_text()
# 定时报告进程状态
def heartbeat():
while bot.alive:
time.sleep(3600)
# noinspection PyBroadException
try:
send_iter(admin_group, status_text())
except:
logger.exception('failed to report heartbeat:\n')
start_new_thread(heartbeat)
def remote_eval(source):
try:
ret = eval(source, globals())
except (SyntaxError, NameError):
raise ValueError('got SyntaxError or NameError in source')
logger.info('remote eval executed:\n{}'.format(source))
yield pformat(ret)
def remote_shell(command):
logger.info('executing remote shell cmd:\n{}'.format(command))
r = subprocess.run(
command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True
)
if r.stdout:
yield r.stdout
else:
yield '[OK]'
def restart():
yield 'restarting bot...'
bot.core.dump()
_restart()
def latency():
yield '{:.2f}'.format(bot.messages[-1].latency)
# 远程命令 (单独发给机器人的消息)
remote_orders = {
'g': update_groups,
's': status_text,
'r': restart,
'l': latency,
}
@admin_auth
def server_mgmt(msg):
"""
服务器管理:
若消息文本为为远程命令,则执行对应函数
若消息文本以 ! 开头,则作为 shell 命令执行
若不满足以上,则尝试直接将 msg.text 作为 Python 代码执行
"""
order = remote_orders.get(msg.text.strip())
if order:
logger.info('executing remote order: {}'.format(order.__name__))
send_iter(msg.chat, order())
elif msg.text.startswith('!'):
command = msg.text[1:]
send_iter(msg.chat, remote_shell(command))
else:
send_iter(msg.chat, remote_eval(msg.text))
def reply_by_keyword(msg):
for reply, keywords in kw_replies.items():
for kw in keywords:
if kw in msg.text.lower():
msg.reply(*ensure_list(reply))
return True
# 验证入群口令
def valid(msg):
return group_code in str(msg.text).lower()
# 自动选择未满的群
def get_group():
groups.sort(key=len, reverse=True)
for _group in groups[2:]:
if len(_group) < 490:
return _group
else:
logger.warning('群都满啦!')
return groups[-1]
# 计算每个用户被邀请的次数
invite_counter = Counter()
# 邀请入群
def invite(user):
joined = list()
for group in groups:
if user in group:
joined.append(group)
if joined:
joined_group_names = '\n'.join(map(lambda x: x.name, joined))
logger.info('{} is already in\n{}'.format(user, joined_group_names))
user.send('你已加入了\n{}'.format(joined_group_names))
else:
if invite_counter.get(user, 0) < 2:
group = get_group()
user.send('验证通过 [嘿哈]')
group.add(user, use_invitation=True)
invite_counter.update([user])
else:
user.send('你的受邀次数已达最大限制 😷')
# 限制频率: 指定周期内超过消息条数,直接回复 "🙊"
def freq_limit(period_secs=15, limit_msgs=5):
def decorator(func):
@wraps(func)
def wrapped(msg):
if msg.chat in black_list:
return
now = datetime.datetime.now()
period = datetime.timedelta(seconds=period_secs)
recent_received = 0
for m in msg.bot.messages[::-1]:
if m.sender == msg.sender:
if now - m.create_time > period:
break
recent_received += 1
if recent_received > 8:
black_list.set(msg.chat, 2 * 3600)
return '你说得好快,我都累了,休息一下吧'
elif recent_received > limit_msgs:
if not isinstance(msg.chat, Group) or msg.is_at:
return '🙊'
return func(msg)
return wrapped
return decorator
@dont_raise_response_error
def try_send(chat, msg):
"""尝试发送消息给指定聊天对象"""
if chat.is_friend:
chat.send(msg)
def _kick(to_kick, limit_secs=0, msg=None):
if to_kick in admins:
raise ValueError('Admins cannot be kicked!')
if limit_secs:
# 加入计时黑名单
black_list.set(to_kick, limit_secs)
to_kick.remove()
start_new_thread(try_send, kwargs=dict(chat=to_kick, msg=msg))
ret = '@{} 已被成功移出! 😔'.format(to_kick.name)
if to_kick in kick_votes:
voters = kick_votes[to_kick][0]
voters = '\n'.join(map(lambda x: '@{}'.format(x.name), voters))
ret += '\n\n投票人:\n{}'.format(voters)
return ret
def gen_detailed_kicks(found):
yield '🤔 找到了 {} 个 "{}" \n👇 请精准选择,复制并发送'.format(len(found), found[0].name)
for option_index, member in enumerate(found):
option_text = '👉 复制移出 (#{})\n'.format(option_index + 1)
option_text += '真实昵称: {}\n'.format(member.nickname)
if member.display_name:
option_text += '群内昵称: {}\n'.format(member.display_name)
option_text += '{} / {} / {}'.format(
member.province or '未知',
member.city or '未知',
{MALE: '男', FEMALE: '女'}.get(member.sex, '未知'))
if member.signature:
option_text += '\n签名: {}'.format(member.signature)
yield option_text
def remote_kick(msg):
info_msg = '抱歉,你已被{}移出,接下来的 24 小时内,机器人将对你保持沉默 😷'
limit_secs = 3600 * 24
info = rp_kick.match(msg.text).groupdict()
if info['name_to_kick']:
# 简单命令
found = msg.chat.search(name=info['name_to_kick'])
if not found:
return '查无此人,突然改名了吗 🤔'
elif len(found) > 1:
send_iter(msg.chat, gen_detailed_kicks(found))
return
else:
member_to_kick = found[0]
elif info['nickname']:
# 详细命令
info['sex'] = {'男': MALE, '女': FEMALE}.get(info['sex'])
for attr in 'province', 'city':
if info[attr] == '未知':
info[attr] = None
attributions = dict()
for attr in 'nickname', 'display_name', 'province', 'city', 'sex', 'signature':
attributions[attr] = info[attr]
logger.info('detailed kick: {}'.format(attributions))
found = msg.chat.search(**attributions)
if not found:
return '查无此人,难道又改名了 🤔'
elif len(found) > 1:
return '然而还是有重复的,呼叫群主本体吧… [捂脸]'
else:
member_to_kick = found[0]
else:
return
if member_to_kick in admins:
logger.error('{} tried to kick {} whom was an admin'.format(
msg.member.name, member_to_kick.name))
return '无法移出管理员 @{} 😷️'.format(member_to_kick.name)
if from_admin(msg):
# 管理员: 直接踢出
return _kick(member_to_kick, limit_secs, info_msg.format('管理员'))
else:
# 其他群成员: 投票踢出
votes, secs_left = kick_votes.vote(voter=msg.member, to_kick=member_to_kick)
now = time.time()
voted = 0
for voters, start in kick_votes.votes.values():
if msg.member in voters and now - start < 600:
# 10 分钟内尝试投票移出 3 个群员,则认为是恶意用户
voted += 1
if voted >= 3:
_kick(
msg.member, limit_secs,
'抱歉,你因恶意投票而被移出。接下来的 24 小时内,机器人将对你保持沉默 [悠闲]'
)
return '移出了恶意投票者 @{} [闪电]'.format(msg.member.name)
if votes < votes_to_kick:
voting = '正在投票移出 @{name}{id}' \
'\n当前 {votes} / {votes_to_kick} 票 ({secs_left:.0f} 秒内有效)' \
'\n移出将拉黑 24 小时 😵' \
'\n请谨慎投票 🤔'
return voting.format(
name=member_to_kick.name,
id=' (#{})'.format(info['option_id']) if info['option_id'] else '',
votes=votes,
votes_to_kick=votes_to_kick,
secs_left=secs_left)
else:
return _kick(member_to_kick, limit_secs, info_msg.format('投票'))
# 判断消息是否为支持回复的消息类型
def supported_msg_type(msg, reply_unsupported=False):
supported = (TEXT,)
ignored = (UNKNOWN, NOTICE, NEW_FRIEND)
fallback_replies = {
VOICE: '🙉',
IMAGE: '🙈',
VIDEO: '🙈',
}
if msg.type in supported:
return True
elif reply_unsupported and (msg.type not in ignored):
msg.reply(fallback_replies.get(msg.type, '🐒'))
# 响应好友消息,限制频率
@bot.register(Friend)
@freq_limit()
def exist_friends(msg):
if msg.chat in black_list:
return
elif supported_msg_type(msg, reply_unsupported=True):
if msg.type == TEXT:
if valid(msg):
invite(msg.sender)
elif str(msg.text).strip().lower() == 'preview':
if msg.chat in preview_group:
return '你已经加入了 preview 群'
else:
msg.reply_file('wxpy-0.5-api-changes.xlsx')
msg.reply('''wxpy new-core 解除了先前对 itchat 模块的依赖,完全重写了协议部分。新版本在登陆稳定性、数据及时性、可扩展性、运行效率等方面都有一定的提升。
安装 wxpy new-core 分支:
pip3 install -U git+https://github.com/youfou/wxpy.git@new-core
⚠️ 重要说明
部分接口已调整,请见附件表格
⏳ 已知问题
1. 文档仍为滞后状态,会逐步跟进
2. Chat.puid 暂未实现
3. Message.forward() 暂未实现
💬 提交 issue
1. 说明问题的复现步骤 (另附代码效果更佳)
2. 贴出完整的 traceback 信息
3. 为 issue 选择 [new-core] 标签''')
preview_group.add(msg.chat, use_invitation=True)
else:
reply_by_keyword(msg)
# 取消图灵自动回复,减少被封概率
# elif not reply_by_keyword(msg):
# tuling.do_reply(msg)
# 响应好友请求
@bot.register(msg_types=NEW_FRIEND)
def new_friend(msg):
if msg.card in black_list:
return
user = msg.card.accept()
if valid(msg):
invite(user)
# 手动加为好友后自动发送消息
@bot.register(Friend, NOTICE)
def manually_added(msg):
if '现在可以开始聊天了' in msg.text:
# 对于好友验证信息为 wxpy 的,会等待邀请完成 (并计入 invite_counter)
# 对于好友验证信息不为 wxpy 的,延迟发送更容易引起注意
time.sleep(3)
for group in groups:
if msg.chat in group:
break
else:
if msg.chat not in invite_counter:
return '你好呀,{},还记得咱们的入群口令吗?回复口令即可获取入群邀请。'.format(msg.chat.name)
# # 在其他群中回复被 @ 的消息
# @bot.register(Group, TEXT)
# def reply_other_group(msg):
# if msg.chat not in groups and msg.is_at:
# if supported_msg_type(msg, reply_unsupported=True):
# tuling.do_reply(msg)
def read_qrcode(msg):
try:
image = Image.open(BytesIO(msg.get_file()))
except RequestException:
pass
else:
urls = zbarlight.scan_codes('qrcode', image)
if urls:
urls = list(map(lambda x: x.decode(), urls))
logger.debug('got urls from qrcode: {}'.format(urls))
return urls
def get_ad_type(msg):
if msg.type == TEXT:
if re.search(r'(?:[^\da-z]|\b)[\da-z]{10}(?:[^\da-z]|\b)', msg.text, re.I) and re.search(
r'alipay|支付[宝寶]|[支吱]口令|[吱支]{3}', re.sub(r'\s+', '', msg.text), re.I):
return '支付宝'
elif msg.type == IMAGE:
urls = read_qrcode(msg)
if urls:
return is_ad(urls[0])
elif msg.type == URL:
return is_ad(msg.url)
to_recall_ids = list()
recall_limit_secs = 60
def wait_for_recalling(msg):
time.sleep(recall_limit_secs)
if msg.id in to_recall_ids and msg.member:
_kick(msg.member, 3600 * 24, '抱歉,你因发送广告消息且未能及时撤回而被移出。😔')
msg.reply('@{} 因发送广告消息且未能及时撤回而被移出!😔'.format(msg.member.name))
# wxpy 群的消息处理
@bot.register([*groups, preview_group, test_group], (TEXT, IMAGE, URL))
def wxpy_group(msg):
if msg.type == TEXT:
if rp_kick.match(msg.text):
return remote_kick(msg)
elif msg.text.lower().strip() in ('帮助', '说明', '规则', 'help', 'rule', 'rules'):
return help_info
ad_type = get_ad_type(msg)
if ad_type:
to_recall_ids.append(msg.id)
msg.reply('@{}\n请在 {} 秒内「撤回你的 {} 广告」[奸笑]'.format(
msg.member.name, recall_limit_secs, ad_type))
start_new_thread(wait_for_recalling, (msg,), daemon=True)
if msg.chat == test_group and msg.type == IMAGE:
return read_qrcode(msg)
@bot.register([*groups, preview_group, test_group], RECALLED)
def append_recalled_msg_ids(msg):
try:
to_recall_ids.remove(msg.recalled_id)
except ValueError:
pass
else:
msg.reply('感谢配合 [机智] @{}'.format(msg.member.name))
# @bot.register(test_group)
# def forward_test_msg(msg):
# if msg.type in (TEXT, IMAGE):
# ret = wxpy_group(msg)
# if ret:
# return ret
# elif msg.text == 'text':
# return 'Hello!'
# elif msg.text == 'at':
# return 'Hello @{} !'.format(msg.member.name)
@bot.register((*admins, admin_group), msg_types=TEXT, except_self=False)
def reply_admins(msg):
"""
响应远程管理员
内容解析方式优先级:
1. 若为远程命令,则执行远程命令 (额外定义,一条命令对应一个函数)
2. 若消息文本以 ! 开头,则作为 shell 命令执行
3. 尝试作为 Python 代码执行 (可执行大部分 Python 代码)
4. 若以上不满足或尝试失败,则作为普通聊天内容回复
"""
try:
# 上述的 1. 2. 3.
server_mgmt(msg)
except ValueError:
# 上述的 4.
if isinstance(msg.chat, User):
return exist_friends(msg)
# 新人入群时激活群数据的更新 (有新消息时才会更新)
@bot.register(groups, NOTICE)
def group_notice(msg):
admin_group.send('{}:\n{}'.format(msg.chat, msg.text))
def get_logger(level=logging.DEBUG, file='bot.log', mode='a'):
log_formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
log_formatter_lite = logging.Formatter('%(name)s:%(levelname)s:%(message)s')
_logger = logging.getLogger()
for hdlr in _logger.handlers:
_logger.removeHandler(hdlr)
# 输出到文件
if file:
file_hdlr = logging.FileHandler(file, mode)
file_hdlr.setFormatter(log_formatter)
_logger.addHandler(file_hdlr)
# 输出到屏幕
console_hdlr = logging.StreamHandler()
console_hdlr.setLevel(logging.INFO)
console_hdlr.setFormatter(log_formatter)
_logger.addHandler(console_hdlr)
# 输出到远程管理员微信
wechat_hdlr = WeChatLoggingHandler(admins[0])
wechat_hdlr.setLevel(logging.WARNING)
wechat_hdlr.setFormatter(log_formatter_lite)
_logger.addHandler(wechat_hdlr)
# 将未捕捉异常也发送到日志中
def except_hook(*args):
logger.critical('UNCAUGHT EXCEPTION:', exc_info=args)
_restart()
sys.excepthook = except_hook
for m in 'requests', 'urllib3':
logging.getLogger(m).setLevel(logging.ERROR)
_logger.setLevel(level)
return _logger
logger = get_logger()
send_iter(admin_group, status_text())
# bot.join()
embed()
#!/usr/bin/env python3
# coding: utf-8
import json
import re
import urllib.parse
with open('ad_urls.json') as fp:
ad_urls = json.load(fp)
def is_ad(url):
ad = ad_urls.get(url)
if ad:
return ad
url = urllib.parse.urlsplit(url)
for domain, ad in ad_urls.items():
if re.search(r'(?:^|\.)' + re.escape(domain) + r'$', url.hostname, re.I):
return ad
if __name__ == '__main__':
print(is_ad('https://qnr.io/YiH68V'))
import threading
import time
class KickVotes(object):
def __init__(self, limit_secs=0):
"""
对于每个项目在限定时间内增加的数量
超出限定时间后再增加,则将该项目数量重新设为 1
:param limit_secs: 限定的秒数
"""
# 格式: {to_kick: ({voter, ...}, timestamp)}
self.votes = dict()
self.limit_secs = limit_secs
self._lock = threading.Lock()
def vote(self, voter, to_kick):
"""投票, 返回 最新票数,剩余秒数"""
with self._lock:
if self.secs_left(to_kick) < 0:
self.votes[to_kick] = {voter}, time.time()
else:
self.votes[to_kick][0].add(voter)
return len(self.votes[to_kick][0]), self.secs_left(to_kick)
def secs_left(self, to_kick):
"""剩余秒数,不存在时为 -1"""
if to_kick in self.votes:
return self.limit_secs - (time.time() - self.votes[to_kick][1])
else:
return -1
def get(self, to_kick, default=(None, None)):
return self.votes.get(to_kick, default=default)
def __contains__(self, to_kick):
return to_kick in self.votes
def __getitem__(self, to_kick):
return self.votes[to_kick]
def __delitem__(self, to_kick):
if to_kick in self.votes:
del self.votes[to_kick]
def __repr__(self):
return repr(self.votes)
import threading
import time
class TimedList(object):
def __init__(self):
"""
计时列表,每个项目都有时效性
"""
# 格式: {item: (timestamp, limit_secs)}
self.data = dict()
self._lock = threading.Lock()
def set(self, item, limit_secs):
with self._lock:
self.data[item] = time.time(), limit_secs
def secs_left(self, item):
if item in self.data:
timestamp, limit_secs = self.data[item]
if limit_secs > 0:
return limit_secs - (time.time() - timestamp)
return 999
return 0
def remove(self, item):
if item in self.data:
del self.data[item]
def __contains__(self, item):
return bool(self.secs_left(item) > 0)
if __name__ == '__main__':
tl = TimedList()
a = 1
assert a not in tl
tl.set(a, 2)
assert a in tl
time.sleep(1)
assert a in tl
assert round(tl.secs_left(a)) == 1
time.sleep(1)
assert a not in tl
tl.set(a, -1)
assert tl.secs_left(a) == 999
tl.remove(a)
assert tl.secs_left(a) == 0
assert a not in tl
print('all tests pass')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment