Skip to content

Instantly share code, notes, and snippets.

@crazywhalecc
Created May 20, 2023 10:25
Show Gist options
  • Save crazywhalecc/7b86584a659ea2622d87f281b6b22a4b to your computer and use it in GitHub Desktop.
Save crazywhalecc/7b86584a659ea2622d87f281b6b22a4b to your computer and use it in GitHub Desktop.
炸毛框架 3.0 正向 WS 适配插件
<?php
declare(strict_types=1);
namespace ZMPlugin\OneBotWS;
use Choir\WebSocket\CloseFrameInterface;
use Choir\WebSocket\FrameInterface;
use Choir\WebSocket\Opcode;
use MessagePack\MessagePack;
use OneBot\Driver\Coroutine\Adaptive;
use OneBot\Driver\Interfaces\WebSocketClientInterface;
use OneBot\V12\Exception\OneBotException;
use OneBot\V12\Object\ActionResponse;
use OneBot\V12\Object\MessageSegment;
use OneBot\V12\Object\OneBotEvent;
use ZM\Annotation\AnnotationHandler;
use ZM\Annotation\Closed;
use ZM\Annotation\Framework\Init;
use ZM\Annotation\OneBot\BotActionResponse;
use ZM\Annotation\OneBot\BotEvent;
use ZM\Container\ContainerHolder;
use ZM\Container\ContainerRegistrant;
use ZM\Context\BotContext;
use ZM\Exception\WaitTimeoutException;
use ZM\Plugin\OneBot\BotMap;
class Onebot12Ws
{
/**
* @var WebSocketClientInterface[] 客户端连接对象们
*/
private static array $clients = [];
/**
* @var array 客户端连接们额外的信息
*/
private static array $client_info = [];
/**
* 插件初始化函数,包含发起连接到目标的正向 ws 连接位置和释放配置文件
*/
#[Init]
public function init(): void
{
// 检测配置文件
if (config('onebot-12-ws') === null) {
logger()->warning('配置文件不存在,插件将会在插件目录释放默认配置(config/onebot-12-ws.php),请编辑目标连接地址后重启框架进行连接!');
$this->releaseConfig();
return;
}
// 关闭
if (!config('onebot-12-ws.enable', false)) {
return;
}
// 开启并遍历,如果flag为null,则跳过
$valid_list = [];
foreach (config('onebot-12-ws.clients', []) as $item) {
if (!isset($item['flag'])) {
logger()->error('正向连接必须设置 flag 参数');
continue;
}
$parse_url = parse_url($item['address']);
// 对地址格式做验证
if (!in_array($parse_url['scheme'] ?? '', ['ws', 'wss'])) {
logger()->error('正向连接 [' . $item['flag'] . '] 的地址必须以 ws:// 或 wss:// 开头!');
continue;
}
$valid_list[$item['flag']] = $item;
}
dump($valid_list);
// 开始连接
foreach ($valid_list as $flag => $item) {
// 检查 token
if (!empty($item['access_token'] ?? '')) {
$header = ['Authorization' => 'Bearer ' . $item['access_token']];
}
$client = zm_websocket_client($item['address'], $header ?? []);
$client->setMessageCallback(function (...$args) {
logger()->info('收到了消息!');
if (Adaptive::getCoroutine() !== null) {
Adaptive::getCoroutine()->create([$this, 'onClientMessage'], ...$args);
} else {
$this->onClientMessage(...$args);
}
});
$client->setCloseCallback(function (...$args) {
logger()->error('断开iale');
if (Adaptive::getCoroutine() !== null) {
Adaptive::getCoroutine()->create([$this, 'onClientClose'], ...$args);
} else {
$this->onClientClose(...$args);
}
});
self::$clients[$flag] = $client;
if ($client->connect()) {
logger()->info('OneBot 12 正向 WS 连接成功,flag:[' . $flag . ']');
dump($client->send(json_encode([
'action' => 'get_version',
'echo' => '123fwefew',
'params' => []
])));
} else {
$reconnect = config('onebot-12-ws.reconnect_interval', 3000);
logger()->error('正向 WS [' . $flag . '] 连接失败,' . $reconnect . ' 毫秒后重连!');
zm_timer_after($reconnect, fn () => $this->reconnect($flag));
}
}
}
public function reconnect(mixed $flag): void
{
if (self::$clients[$flag]->reconnect()) {
logger()->info('OneBot 12 正向 WS 连接成功,flag:[' . $flag . ']');
} else {
$reconnect = config('onebot-12-ws.reconnect_interval', 3000);
logger()->error('正向 WS [' . $flag . '] 连接失败,' . $reconnect . ' 毫秒后重连!');
zm_timer_after($reconnect, fn () => $this->reconnect($flag));
}
}
/**
* [CALLBACK] 处理服务端发来的 WebSocket Frame 消息
*
* @param FrameInterface $frame 消息帧
* @param WebSocketClientInterface $client ws client 处理对象
*/
public function onClientMessage(FrameInterface $frame, WebSocketClientInterface $client): void
{
if ($frame->getOpcode() === Opcode::TEXT) {
$body = $frame->getData();
$body = json_decode($body, true);
} elseif ($frame->getOpcode() === Opcode::BINARY) {
$body = $frame->getData();
$body = MessagePack::unpack($body);
} else {
logger()->debug('客户端收到了非法的 WS Frame Opcode');
return;
}
if ($body === null) {
logger()->warning('收到非标准格式的消息,已忽略');
return;
}
// 如果含有 type, detail_type,就是 event
if (isset($body['type'], $body['detail_type'])) {
logger()->info('客户端收到了onebot12消息');
try {
$obj = new OneBotEvent($body);
} catch (OneBotException) {
logger()->debug('收到非 OneBot 12 标准的消息,已忽略');
return;
}
// 绑定容器
ContainerRegistrant::registerOBEventServices($obj);
ContainerHolder::getEventContainer()->set(WebSocketClientInterface::class, $client);
if ($obj->getSelf() !== null) {
$bot_id = $obj->self['user_id'];
$platform = $obj->self['platform'];
if (BotMap::getBotFd($bot_id, $platform) === null) {
BotMap::registerBotWithWSClient($bot_id, $platform, true, $client);
logger()->notice("[{$platform}.{$bot_id}] 已接入,状态:" . 'OK');
}
container()->set(BotContext::class, bot($bot_id, $platform));
}
// 调用机器人注解
$handler = new AnnotationHandler(BotEvent::class);
$handler->setRuleCallback(function (BotEvent $event) use ($obj) {
return ($event->type === null || $event->type === $obj->type)
&& ($event->sub_type === null || $event->sub_type === $obj->sub_type)
&& ($event->detail_type === null || $event->detail_type === $obj->detail_type);
});
try {
$handler->handleAll();
} catch (WaitTimeoutException $e) {
// 这里是处理 prompt() 下超时的情况的
if ($e->getTimeoutPrompt() === null) {
return;
}
if (($e->getPromptOption() & ZM_PROMPT_TIMEOUT_MENTION_USER) === ZM_PROMPT_TIMEOUT_MENTION_USER && ($ev = $e->getUserEvent()) !== null) {
$prompt = [MessageSegment::mention($ev->getUserId()), ...$e->getTimeoutPrompt()];
}
if (($e->getPromptOption() & ZM_PROMPT_TIMEOUT_QUOTE_SELF) === ZM_PROMPT_TIMEOUT_QUOTE_SELF && ($rsp = $e->getPromptResponse()) !== null && ($ev = $e->getUserEvent()) !== null) {
$prompt = [MessageSegment::reply($rsp->data['message_id'], $ev->self['user_id']), ...$e->getTimeoutPrompt()];
} elseif (($e->getPromptOption() & ZM_PROMPT_TIMEOUT_QUOTE_USER) === ZM_PROMPT_TIMEOUT_QUOTE_USER && ($ev = $e->getUserEvent()) !== null) {
$prompt = [MessageSegment::reply($ev->getMessageId(), $ev->getUserId()), ...$e->getTimeoutPrompt()];
}
bot()->reply($prompt ?? $e->getTimeoutPrompt());
}
} elseif (isset($body['status'], $body['retcode'])) {
// 如果含有 status,retcode 字段,表明是 action 的 response
$resp = new ActionResponse();
$resp->retcode = $body['retcode'];
$resp->status = $body['status'];
$resp->message = $body['message'] ?? '';
$resp->data = $body['data'] ?? [];
$resp->echo = $body['echo'] ?? null;
ContainerRegistrant::registerOBActionResponseServices($resp);
// 调用 BotActionResponse 事件
$handler = new AnnotationHandler(BotActionResponse::class);
$handler->setRuleCallback(function (BotActionResponse $event) use ($resp) {
return ($event->retcode === null || $event->retcode === $resp->retcode)
&& ($event->status === null || $event->status === $resp->status);
});
container()->set(ActionResponse::class, $resp);
$handler->handleAll();
// 如果有协程,并且该 echo 记录在案的话,就恢复协程
BotContext::tryResume($resp);
}
}
/**
* [CALLBACK] 处理服务端主动断开时的事件
*
* @param CloseFrameInterface $frame 消息帧
* @param WebSocketClientInterface $client ws client 处理对象
*/
public function onClientClose(CloseFrameInterface $frame, WebSocketClientInterface $client): void
{
BotMap::unregisterBotByWSClient($client);
}
/**
* 释放本插件的配置文件
*/
private function releaseConfig(): void
{
$file = __DIR__ . '/../onebot-12-ws.php';
$dstdir = WORKING_DIR . '/config';
if (!is_dir(zm_dir($dstdir))) {
logger()->warning('配置文件目录 ' . $dstdir . ' 不存在,无法释放正向 ws 配置文件!');
return;
}
if (!copy(zm_dir($file), zm_dir($dstdir . '/onebot-12-ws.php'))) {
logger()->error('配置文件释放失败,请检查!');
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment