Skip to content

Instantly share code, notes, and snippets.

@m4p1e
Last active February 20, 2024 04:42
Show Gist options
  • Save m4p1e/3e951e3e6e709625d16e832c60efb315 to your computer and use it in GitHub Desktop.
Save m4p1e/3e951e3e6e709625d16e832c60efb315 to your computer and use it in GitHub Desktop.
A kaiheila bot for giveaway
import json
import khl
from khl import api
from khl.card import CardMessage, Card, Module, Element, Types, Struct
from time import sleep
import time
import logging
import datetime
import random
from dateutil import tz
import asyncio
import threading
import traceback
logging.basicConfig(level='INFO')
khlbot = khl.Bot(token='')
bot_id = ''
error_collect_channel = '8376027126572919'
#allow_channel_id = []
runtime_content = None
runtime_content_lock = threading.Lock()
def convert(date):
pos = ["s", "m", "h", "d"]
time_dic = {"s": 1, "m": 60, "h": 3600, "d": 3600 *24}
i = {"s": "秒", "m": "分钟", "h": "小时", "d": "天"}
unit = date[-1]
val = int(date[:-1])
return val * time_dic[unit], date[:-1] + i[unit]
async def stop_giveaway(gmsg_id: str, data):
global runtime_content
channel_id = data['channel_id']
is_error = False
error_info = ''
try:
channel = await khlbot.fetch_public_channel(channel_id)
giveaway_msg_res = await khlbot.client.gate.exec_req(api.Message.view(gmsg_id))
if len(giveaway_msg_res['reactions']) > 0:
if giveaway_msg_res['reactions'][0]['emoji']['id'] != '[#127881;]':
is_error = True
error_info = 'Miss right reaction emoji'
else:
is_error = True
error_info = 'Miss right reaction emoji'
if is_error == False:
reaction_count = giveaway_msg_res['reactions'][0]['count'] - 1
if reaction_count < data["winners"]:
winners_number = reaction_count
else:
winners_number = data["winners"]
if winners_number == 0:
winners = []
else:
winners = random.sample(range(2, reaction_count + 2), winners_number)
winner_sorted = sorted(winners)
current_interval = 0
current_page = 0
winners_seq =''
giveaway_msg_reaction = []
winner_ids = []
for winner in winner_sorted:
if winner > current_interval:
current_page = winner / 50 + 1
current_interval = current_interval * 50
giveaway_msg_reaction = await khlbot.client.gate.exec_req(api.Message.reactionList(gmsg_id, '🎉', current_page))
winner_ids.append(giveaway_msg_reaction[(winner-1) % 50]['id'])
else:
winner_ids.append(giveaway_msg_reaction[(winner-1) % 50]['id'])
random.shuffle(winner_ids)
for winner_id in winner_ids:
winners_seq = winners_seq + ' (met){0}(met) '.format(winner_id)
if is_error == True:
givewaya_end_msg = Card(
Module.Header('🎉 ' + data['prize'] + ' 🎉'),
Module.Section(Element.Text('非常遗憾本次抽奖发生了意外的错误! 错误信息: {0}'.format(error_info))),
Module.Context('本次抽奖结束')
)
elif winners_seq == '':
givewaya_end_msg = Card(
Module.Header('🎉 ' + data['prize'] + ' 🎉'),
Module.Section(Element.Text('非常遗憾由于此次抽奖人数不足,没有获奖的小伙伴!')),
Module.Context('本次抽奖结束')
)
else:
givewaya_end_msg = Card(
Module.Header('🎉 ' + data['prize'] + ' 🎉'),
Module.Section(Element.Text('祝贺 {0} 获得了本次抽奖的礼物!'.format(winners_seq), Types.Text.KMD)),
Module.Context('本次抽奖结束')
)
giveaway_msg_updated_card = CardMessage(
Card(
Module.Header(data['prize']),
Module.Section(Element.Text('发起人: (met){0}(met)\n参与人数: **{1}**\n获奖者: {2}'.format(data["host"], reaction_count, winners_seq), Types.Text.KMD)),
Module.Context('{0}个获奖名额 | 结束于目标时间 {1}'.format(data["winners"], datetime.datetime.strftime(datetime.datetime.fromtimestamp(data["end_time"]).astimezone(tz.gettz('Asia/Shanghai')), "%Y-%m-%d %H:%M:%S")))
)
)
giveaway_msg_updated_str = json.dumps(giveaway_msg_updated_card)
await khlbot.client.gate.exec_req(api.Message.update(data['msg_id'], giveaway_msg_updated_str))
await khlbot.send(channel, CardMessage(givewaya_end_msg))
info_msg = '[INFO] 频道ID: {0} 抽奖结束, 总共有 `{1}` 人参与, 其中有`{2}`获奖, 分布为[{3}].'.format(channel_id, reaction_count, winners_number, ','.join(map(str,winner_sorted)) if winners_number > 0 else "")
await khlbot.client.gate.exec_req(api.Message.create(khl.MessageTypes.KMD, error_collect_channel, info_msg))
except Exception as e:
error_msg = '[ERROR] 频道ID: {0}, 错误细节: {1}, 相关用户: (met){2}(met)\n```\n{3}\n```'.format(channel_id, str(e), data['host'], traceback.format_exc())
await khlbot.client.gate.exec_req(api.Message.create(khl.MessageTypes.KMD, error_collect_channel, error_msg))
@khlbot.task.add_interval(hours = 1)
async def check_statue():
await khlbot.client.gate.exec_req(api.Message.create(khl.MessageTypes.KMD, error_collect_channel, "[INFO] 稳定运行中!"))
@khlbot.task.add_interval(seconds = 10)
async def check_giveway():
global runtime_content
global runtime_content_lock
if len(runtime_content) == 0:
return
gmsg_ids_need_deleted = []
runtime_content_lock.acquire()
for giveaway in runtime_content:
data = runtime_content[giveaway]
if int(time.time()) > data["end_time"]:
try:
await stop_giveaway(giveaway, data)
except:
...
gmsg_ids_need_deleted.append(giveaway)
for gmsg_id in gmsg_ids_need_deleted:
del runtime_content[gmsg_id]
runtime_content_lock.release()
json.dump(runtime_content, open("giveaways.json", "w"), indent=4)
@khlbot.command(name='roll_giveaway', regex=r"^\!groll\s+(.*)")
async def end_giveway(msg: khl.Message, msg_id: str):
channel_id = msg.target_id
is_error = False
error_info = ''
try:
giveaway_msg_res = await khlbot.client.gate.exec_req(api.Message.view(msg_id))
if giveaway_msg_res['author']['id'] != bot_id:
is_error = True
error_info = 'Miss right giveaway card message'
if giveaway_msg_res['content'].find("发起人: (met){0}(met)".format(msg.author_id)) == -1:
return
if len(giveaway_msg_res['reactions']) > 0:
if giveaway_msg_res['reactions'][0]['emoji']['id'] != '[#127881;]':
is_error = True
error_info = 'Miss right reaction emoji'
else:
is_error = True
error_info = 'Miss right reaction emoji'
reaction_count = giveaway_msg_res['reactions'][0]['count']
if reaction_count == 1:
is_error = True
error_info = 'Miss enough players'
channel = await khlbot.fetch_public_channel(channel_id)
if is_error == False:
winner = random.randint(2, reaction_count)
current_page = winner / 50 + 1
giveaway_msg_reaction = await khlbot.client.gate.exec_req(api.Message.reactionList(msg_id, '🎉', current_page))
winner_id = giveaway_msg_reaction[(winner-1) % 50]['id']
givewaya_end_msg = Card(
Module.Section(Element.Text('祝贺 (met){0}(met) 获得了本次抽奖的礼物!'.format(winner_id), Types.Text.KMD)),
)
else:
givewaya_end_msg = Card(
Module.Section(Element.Text('非常遗憾本次顺次发生了意外的错误! 错误信息: {0}'.format(error_info))),
)
await khlbot.send(channel, CardMessage(givewaya_end_msg))
except Exception as e:
error_msg = '[ERROR] 频道ID: {0}, 错误细节: {1}, 相关用户: (met){2}(met)\n```\n{3}\n```'.format(channel_id, str(e), msg.author_id, traceback.format_exc())
await khlbot.client.gate.exec_req(api.Message.create(khl.MessageTypes.KMD, error_collect_channel, error_msg))
@khlbot.command(name='end_giveaway', regex = r"^\!gend\s+(.*)")
async def end_giveway(msg: khl.Message, msg_id: str):
#if not (msg.target_id in allow_channel_id):
# return
if msg.author.bot == True:
return
global runtime_content
global runtime_content_lock
if len(runtime_content) == 0:
return
runtime_content_lock.acquire()
if msg_id in runtime_content.keys():
if msg.author_id == runtime_content[msg_id]['host']:
try:
await stop_giveaway(msg_id, runtime_content[msg_id])
except:
...
del runtime_content[msg_id]
json.dump(runtime_content, open("giveaways.json", "w"), indent=4)
runtime_content_lock.release()
@khlbot.command(name='start_giveaway', regex = r"^\!gstart\s+(\d+[s|m|h|d])\s+(\d+)w\s+(.*)")
async def start_giveway(msg: khl.Message, date: str, winner_num: str, gift_content: str):
global runtime_content
global runtime_content_lock
#if not (msg.target_id in allow_channel_id):
# return
if msg.author.bot == True:
return
if int(winner_num) > 20:
await msg.reply(content = '[Error] 无法设置多于`20`个获奖人数的抽奖活动,请重新设置!', use_quote = True, type = khl.MessageTypes.KMD)
return
elif int(winner_num) == 0:
await msg.reply(content = '[Error] 无法设置`0`个获奖人数的抽奖活动,请重新设置!', use_quote = True, type = khl.MessageTypes.KMD)
return
converted_time = convert(date)
if converted_time[0] == 0:
await msg.reply(content = '[Error] 无法设置时长为`0`的抽奖活动,请重新选择时长!', use_quote = True, type = khl.MessageTypes.KMD)
return
if converted_time[0] > 432000:
await msg.reply(content = '[Error] 无法设置时长超过`5d`的抽奖活动,请重新选择时长!', use_quote = True, type = khl.MessageTypes.KMD)
return
gift_timestamp = datetime.datetime.now() + datetime.timedelta(seconds=converted_time[0])
gift_msg_card = Card(
Module.Header(gift_content),
Module.Section(Element.Text('点击下方第一个Reaction表情 :tada: 参与!\n持续时长: {0}\n发起人: (met){1}(met)'.format(converted_time[1],msg.author_id), Types.Text.KMD)),
Module.Context('{0}个获奖名额 | 结束于目标时间 {1}'.format(winner_num, datetime.datetime.strftime(gift_timestamp.astimezone(tz.gettz('Asia/Shanghai')), "%Y-%m-%d %H:%M:%S"))),
Module.Countdown(gift_timestamp)
)
try:
await msg.reply('🎉🎉🎉 **抽奖开始**! 🎉🎉🎉', use_quote = False, type = khl.MessageTypes.KMD)
gift_msg_res = await msg.reply(content = CardMessage(gift_msg_card), use_quote = False)
await khlbot.client.gate.exec_req(api.Message.addReaction(gift_msg_res['msg_id'], '🎉'))
info_msg = '[INFO] 频道ID: {0}, 开始了一个时长 `{1}`的抽奖.'.format(msg.target_id, date)
await khlbot.client.gate.exec_req(api.Message.create(khl.MessageTypes.KMD, error_collect_channel, info_msg))
except Exception as e:
error_msg = '[ERROR] 频道ID: {0}, 错误细节: {1}, 相关用户: (met){2}(met)'.format(msg.target_id, str(e), msg.author_id)
await khlbot.client.gate.exec_req(api.Message.create(khl.MessageTypes.KMD, error_collect_channel, error_msg))
return
now = int(time.time())
data = {
"prize": gift_content,
"host": msg.author_id,
"winners": int(winner_num),
"end_time": now + converted_time[0],
"channel_id": msg.target_id,
"msg_id": gift_msg_res['msg_id']
}
runtime_content_lock.acquire()
runtime_content[gift_msg_res['msg_id']] = data
runtime_content_lock.release()
json.dump(runtime_content, open("giveaways.json", "w"), indent=4)
async def load_giveaways():
global runtime_content
runtime_content = json.load(open("giveaways.json", "r"))
await khlbot.client.gate.exec_req(api.Message.create(khl.MessageTypes.KMD, error_collect_channel, "[INFO] Bot重启成功~"))
loop = asyncio.get_event_loop()
loop.run_until_complete(load_giveaways())
loop.run_until_complete(asyncio.gather(khlbot.start()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment