Last active
June 21, 2018 17:25
-
-
Save LittleKey/cb5daabc73218679e9834db2417c0421 to your computer and use it in GitHub Desktop.
qqbot的插件
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import re | |
import os | |
import sys | |
import inspect | |
import logging | |
import datetime | |
import threading | |
from contextlib import contextmanager | |
from collections import OrderedDict | |
from functools import wraps | |
now = datetime.datetime.now() | |
path = '/Users/littlekey/.qqbot-tmp' | |
log_file_name = 'qq_bot_hook.command.{date}.log'.format( | |
date=now.strftime('%Y-%m-%d')) | |
log_format = '[%(asctime)s] [%(levelname)s] %(message)s' | |
# clear old handlers | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
for handler in logger.handlers[:]: | |
logger.removeHandler(handler) | |
# formatter | |
formatter = logging.Formatter(log_format) | |
# file handler | |
file_handler = logging.FileHandler(os.path.join(path, log_file_name), 'a') | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
# stdout handler | |
stdout_handler = logging.StreamHandler(sys.stdout) | |
stdout_handler.setFormatter(formatter) | |
logger.addHandler(stdout_handler) | |
COMMANDS = {} | |
ALLOW_PROGRAMS = ['echo', 'ok', 'say'] | |
BUDDY_ALLOW_PROGRAMS = ALLOW_PROGRAMS + ['ls', 'cat', 'tail', 'git', 'cd'] | |
def register_command(command_name): | |
def command_deco(func): | |
COMMANDS[command_name] = func | |
return func | |
return command_deco | |
# 回调是单线程阻塞模型, 所有回调都在同一线程顺序执行 | |
# 维护了一个Queue, 由MainThread取消费Queue中的task | |
# 另有多个线程负责持续向Queue中持续添加Task | |
main_thread_local = None | |
def get_main_thread_local(): | |
# single instance | |
# 整个主线程保持唯一一个local | |
global main_thread_local | |
if threading.current_thread().name != 'MainThread': | |
return | |
if not main_thread_local: | |
main_thread_local = threading.local() | |
return main_thread_local | |
def set_current_ctx(**kwargs): | |
local = get_main_thread_local() | |
if not local: | |
return | |
if getattr(local, 'ctx', None) is None: | |
return | |
for k, v in kwargs.items(): | |
local.ctx[k] = v | |
def get_current_ctx(key): | |
local = get_main_thread_local() | |
if not local: | |
return | |
ctx = getattr(local, 'ctx', None) | |
if not ctx: | |
return | |
return ctx.get(key, None) | |
@contextmanager | |
def make_context(): | |
# 为每次调用生成一个独立的ctx, 并且在调用结束时释放该ctx | |
local = get_main_thread_local() | |
if local: | |
local.ctx = OrderedDict() | |
try: | |
yield | |
except Exception as exc: | |
logger.exception(exc) | |
finally: | |
if hasattr(local, 'ctx'): | |
if isinstance(local.ctx, dict): | |
local.ctx.clear() | |
del local.ctx | |
def context_deco(func): | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
with make_context(): | |
ret = func(*args, **kwargs) | |
set_current_ctx(func_ret=ret) | |
return get_current_ctx('func_ret') | |
return wrapper | |
def log_func_ctx(func): | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
func_name = func.__name__ | |
signature = inspect.signature(func) | |
bind_args = signature.bind(*args, **kwargs) | |
arguments = ', '.join( | |
'{}={}'.format(k, repr(v)) for k, v in bind_args.arguments.items() | |
) | |
ret = None | |
try: | |
ret = func(*args, **kwargs) | |
finally: | |
logger.info('call {func_name}({arguments}): {ret}'.format( | |
func_name=func_name, arguments=arguments, ret=repr(ret) | |
)) | |
return ret | |
return wrapper | |
@context_deco | |
@log_func_ctx | |
def onQQMessage(bot, contact, member, content): | |
set_current_ctx(bot=bot, contact=contact, member=member, content=content) | |
# hard code contact uni(3611296617) | |
if contact.ctype == 'buddy' or contact.uin == '3611296617': | |
# is QQ friend or @me | |
valid_content = content.lower().strip() | |
if not valid_content.startswith('#'): | |
# content s not command | |
return | |
command = valid_content[1:] | |
arguments = '' | |
if ' ' in command: | |
command, arguments = command.split(' ', 1) | |
logger.info("received command: {} {}".format(command, arguments)) | |
set_current_ctx(command=command) | |
set_current_ctx(arguments=arguments) | |
handler = COMMANDS.get(command, None) | |
if handler: | |
ret = handler() | |
bot.SendTo(contact, str(ret)[:1024]) | |
return ret | |
@register_command('lock') | |
def lock_screen(): | |
from subprocess import call | |
cgsession_path = r'/System/Library/CoreServices/Menu Extras/User.menu/Contents/Resources/CGSession' # noqa | |
logger.info("lock screen") | |
call([cgsession_path, '-suspend']) | |
return True | |
@register_command('shutdown') | |
def shutdown_mac(): | |
from subprocess import call | |
call(["osascript", "-e", 'tell application "System Events" to shut down']) | |
logger.warning("shutdown") | |
return True | |
@register_command('killall') | |
def killall_application(): | |
from subprocess import call | |
apple_script_shutdown = r""" | |
tell application "System Events" to set _runAppNameList to name of every application process whose visible is true and (name is not "Finder") | |
repeat with _runAppName in _runAppNameList | |
tell application _runAppName to quit | |
end repeat | |
""" # noqa | |
call(['osascript', '-e', apple_script_shutdown]) | |
return True | |
@register_command('shell') | |
def execute_shell(): | |
arguments = get_current_ctx('arguments') | |
if not arguments: | |
return | |
from subprocess import getoutput | |
commands = map(lambda c: c.strip(), re.split(r'[;|&]', arguments)) | |
for command in commands: | |
if not command: | |
continue | |
if ' ' in command: | |
program, _ = command.split(' ', 1) | |
else: | |
program = command | |
allow_programs = ALLOW_PROGRAMS | |
contact = get_current_ctx('contact') | |
if contact and contact.ctype == 'buddy': | |
allow_programs = BUDDY_ALLOW_PROGRAMS | |
if program not in allow_programs: | |
logger.warning('Refuse shell program: {}'.format(program)) | |
return 'Illegal shell command: {}'.format(arguments) | |
return getoutput(arguments) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment