Skip to content

Instantly share code, notes, and snippets.

@LittleKey
Last active June 21, 2018 17:25
Show Gist options
  • Save LittleKey/cb5daabc73218679e9834db2417c0421 to your computer and use it in GitHub Desktop.
Save LittleKey/cb5daabc73218679e9834db2417c0421 to your computer and use it in GitHub Desktop.
qqbot的插件
# -*- coding: utf-8 -*-
import re
import os
import sys
import inspect
import logging
import datetime
import threading
from contextlib import contextmanager
from collections import OrderedDict
from functools import wraps
now = datetime.datetime.now()
path = '/Users/littlekey/.qqbot-tmp'
log_file_name = 'qq_bot_hook.command.{date}.log'.format(
date=now.strftime('%Y-%m-%d'))
log_format = '[%(asctime)s] [%(levelname)s] %(message)s'
# clear old handlers
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# formatter
formatter = logging.Formatter(log_format)
# file handler
file_handler = logging.FileHandler(os.path.join(path, log_file_name), 'a')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# stdout handler
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
logger.addHandler(stdout_handler)
COMMANDS = {}
ALLOW_PROGRAMS = ['echo', 'ok', 'say']
BUDDY_ALLOW_PROGRAMS = ALLOW_PROGRAMS + ['ls', 'cat', 'tail', 'git', 'cd']
def register_command(command_name):
def command_deco(func):
COMMANDS[command_name] = func
return func
return command_deco
# 回调是单线程阻塞模型, 所有回调都在同一线程顺序执行
# 维护了一个Queue, 由MainThread取消费Queue中的task
# 另有多个线程负责持续向Queue中持续添加Task
main_thread_local = None
def get_main_thread_local():
# single instance
# 整个主线程保持唯一一个local
global main_thread_local
if threading.current_thread().name != 'MainThread':
return
if not main_thread_local:
main_thread_local = threading.local()
return main_thread_local
def set_current_ctx(**kwargs):
local = get_main_thread_local()
if not local:
return
if getattr(local, 'ctx', None) is None:
return
for k, v in kwargs.items():
local.ctx[k] = v
def get_current_ctx(key):
local = get_main_thread_local()
if not local:
return
ctx = getattr(local, 'ctx', None)
if not ctx:
return
return ctx.get(key, None)
@contextmanager
def make_context():
# 为每次调用生成一个独立的ctx, 并且在调用结束时释放该ctx
local = get_main_thread_local()
if local:
local.ctx = OrderedDict()
try:
yield
except Exception as exc:
logger.exception(exc)
finally:
if hasattr(local, 'ctx'):
if isinstance(local.ctx, dict):
local.ctx.clear()
del local.ctx
def context_deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
with make_context():
ret = func(*args, **kwargs)
set_current_ctx(func_ret=ret)
return get_current_ctx('func_ret')
return wrapper
def log_func_ctx(func):
@wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
signature = inspect.signature(func)
bind_args = signature.bind(*args, **kwargs)
arguments = ', '.join(
'{}={}'.format(k, repr(v)) for k, v in bind_args.arguments.items()
)
ret = None
try:
ret = func(*args, **kwargs)
finally:
logger.info('call {func_name}({arguments}): {ret}'.format(
func_name=func_name, arguments=arguments, ret=repr(ret)
))
return ret
return wrapper
@context_deco
@log_func_ctx
def onQQMessage(bot, contact, member, content):
set_current_ctx(bot=bot, contact=contact, member=member, content=content)
# hard code contact uni(3611296617)
if contact.ctype == 'buddy' or contact.uin == '3611296617':
# is QQ friend or @me
valid_content = content.lower().strip()
if not valid_content.startswith('#'):
# content s not command
return
command = valid_content[1:]
arguments = ''
if ' ' in command:
command, arguments = command.split(' ', 1)
logger.info("received command: {} {}".format(command, arguments))
set_current_ctx(command=command)
set_current_ctx(arguments=arguments)
handler = COMMANDS.get(command, None)
if handler:
ret = handler()
bot.SendTo(contact, str(ret)[:1024])
return ret
@register_command('lock')
def lock_screen():
from subprocess import call
cgsession_path = r'/System/Library/CoreServices/Menu Extras/User.menu/Contents/Resources/CGSession' # noqa
logger.info("lock screen")
call([cgsession_path, '-suspend'])
return True
@register_command('shutdown')
def shutdown_mac():
from subprocess import call
call(["osascript", "-e", 'tell application "System Events" to shut down'])
logger.warning("shutdown")
return True
@register_command('killall')
def killall_application():
from subprocess import call
apple_script_shutdown = r"""
tell application "System Events" to set _runAppNameList to name of every application process whose visible is true and (name is not "Finder")
repeat with _runAppName in _runAppNameList
tell application _runAppName to quit
end repeat
""" # noqa
call(['osascript', '-e', apple_script_shutdown])
return True
@register_command('shell')
def execute_shell():
arguments = get_current_ctx('arguments')
if not arguments:
return
from subprocess import getoutput
commands = map(lambda c: c.strip(), re.split(r'[;|&]', arguments))
for command in commands:
if not command:
continue
if ' ' in command:
program, _ = command.split(' ', 1)
else:
program = command
allow_programs = ALLOW_PROGRAMS
contact = get_current_ctx('contact')
if contact and contact.ctype == 'buddy':
allow_programs = BUDDY_ALLOW_PROGRAMS
if program not in allow_programs:
logger.warning('Refuse shell program: {}'.format(program))
return 'Illegal shell command: {}'.format(arguments)
return getoutput(arguments)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment