Skip to content

Instantly share code, notes, and snippets.

@Rhilip
Created March 14, 2024 13:08
Show Gist options
  • Save Rhilip/136a157eaf5156e66875f17d5a9abf0e to your computer and use it in GitHub Desktop.
Save Rhilip/136a157eaf5156e66875f17d5a9abf0e to your computer and use it in GitHub Desktop.
某sysop写的PT Tracker(Announce部分)
<?php
/**
* Created by PhpStorm.
* User: pov
* Date: 05/04/2018
* Time: 09:45 AM
*/
namespace App\Http\Controllers\Traits\Tracker;
use App\Exceptions\Tracker\AnnounceException;
use App\Exceptions\Tracker\DatabaseException;
use Swoole\Coroutine\MySQL;
use Swoole\Coroutine\Redis;
use Swoole\Http\Request;
use App\Http\Controllers\Traits\ValidateIP;
use App\Http\Controllers\Traits\Bencoding;
trait Announce
{
use ValidateIP, FastCache, CoMySQLTransaction, Bencoding, ReadableField;
/**
* @param Request $request
* @param Redis $redisConn
* @param MySQL $mysqlConn
* @return string
* @throws \App\Exceptions\Tracker\DatabaseException
*/
public function processAnnounce(Request $request, Redis $redisConn, MySQL $mysqlConn)
{
/*
* HTTP响应的Body
*/
$result = '';
/*
* 定义该次请求中共享的数据
*/
$remoteHttpIP = '';
$userAgent = '';
// 原始的 $_SERVER['REQUEST_URI'] 不包含Query string
$requestUri = '';
// 原始的 $_SERVER['QUERY_STRING']
$queryString = '';
// FLAG 标明本次请求是否开始过事务
$mysqlTransactionStarted = false;
$passkey = '';
$userId = null;
$torrentId = null;
// 仅用于 announce 处理的 键值对数组 深度等于1
$queries = [];
try {
/*
* 0. 预先做的事情 初始化一系列环境变量 不涉及业务逻辑
*
* 检查 Http Header 以便获得 客户端真实 IP
* 获得 QueryString
* 获得 RequestUri
* 获得 User-Agent *必需 不存在则抛出异常
*/
$this->preProcess($request, $remoteHttpIP, $requestUri, $queryString, $userAgent);
/*
* 获得 actionType ( announce | scrape | unknown )
* 并顺带从uri取得 passkey 不存在passkey则抛出异常
*/
$actionType = $this->getAnnounceType($requestUri, $passkey);
// 业务类型分支
switch ($actionType) {
case 'scrape':
/*
* 1. 解析可能含有复数个 info_hash 字段的 QueryString
* 返回一个含 info_hash 键值的数组 其下为一个数字键值数组 存有多个 info_hash
*/
$scrapeQueries = $this->parseDuplicateQueryString($queryString);
/*
* 并对返回的解析结果进行检查
* 返回的scrapeQueries 应该是具有 info_hash 键值的 深度为2的嵌套数组 否则抛出异常
*/
$this->checkScrapeFields($scrapeQueries);
/*
* 2. 检查User-Agent 是否合法
* scrape 请求只检查 User-Agent是否合法即可
*/
$this->checkUserAgent($request, $queries, $userAgent, $mysqlConn, $redisConn, true);
/*
* 3. 检查passkey 有效性
*/
$this->checkPasskey($passkey, $userId, $mysqlConn, $redisConn);
/*
* 4. 检查 info_hash 有效性
* 返回 一个 list[Array]
* array 中包含 种子的 的 id 和原始的 20 bytes binary info_hash
*/
$scrapeTorrents = $this->checkInfoHash($scrapeQueries, $torrentId, $mysqlConn, $redisConn, true);
/*
* 5. 生成 scraped 响应
*/
// scrape 请求不涉及数据库增删改 不需要事务
$result = $this->generateScrapeResponse($scrapeTorrents, $userId, $mysqlConn, $redisConn);
break;
case 'announce':
/*
* 1. 解析并检查必须的数据字段
* 填充 $queries 填充后的该数据结构必定合法 否则抛出异常
*/
$this->checkAnnounceFields($queryString, $queries, $remoteHttpIP);
/*
* 2. 检查客户端 及User-Agent 是否合法
* 默认 同时检查 User-Agent 和 peer_id 的合法性
*/
$this->checkUserAgent($request, $queries, $userAgent, $mysqlConn, $redisConn);
/*
* 3. 检查passkey有效性
*/
$this->checkPasskey($passkey, $userId, $mysqlConn, $redisConn);
/*
* 4. 检查info_hash有效性
* 默认 为 announce 请求
*/
$this->checkInfoHash($queries, $torrentId, $mysqlConn, $redisConn);
// 以下操作需要 DB增删改! 启用数据库事务 !
self::mysqlStartTransaction($mysqlConn, $mysqlTransactionStarted);
/*
* 5. 处理 announce 请求
*/
$this->processAnnounceRequest($queries, $userAgent, $userId, $torrentId, $mysqlConn, $redisConn);
/*
* 6. 生成 announce 响应
*/
$result = $this->generateAnnounceResponse($queries, $userId, $torrentId, $mysqlConn, $redisConn);
break;
default:
throw new AnnounceException('Action type unknown !');
}
} catch (AnnounceException $exception) {
// 发生问题 如果开始过事务 则回滚
self::mysqlRollbackTransaction($mysqlConn, $mysqlTransactionStarted);
// Exception信息会作为Response body返回给用户
return $exception->getMessageBencoded();
}
// 执行完成 提交事务
self::mysqlCommitTransaction($mysqlConn, $mysqlTransactionStarted);
return $result;
}
/**
* @param $request
* @param $remoteHttpIP
* @param $requestUri
* @param $queryString
* @param $userAgent
* @throws AnnounceException
*/
private function preProcess($request, &$remoteHttpIP, &$requestUri, &$queryString, &$userAgent)
{
/*
* 先检查是否存在User-Agent
*/
$header = $request->header;
if (!isset($header['user-agent']))
throw new AnnounceException('Invalid user-agent !');
$userAgent = $header['user-agent'];
/*
* 检查 remote IP相关字段
*/
if (isset($header['x-forwarded-for'])) {
$forwardedIP = explode(',', $header['x-forwarded-for']);
$remoteHttpIP = $forwardedIP[0];
} else if (isset($header['client-ip'])) {
$remoteHttpIP = $header['client-ip'];
} else if (isset($header['x-real-ip'])) {
$remoteHttpIP = $header['x-real-ip'];
}
// x-remote-ip header 必须在nginx转发时设置 其值为nginx 内置变量 $remote_addr
if (!$this->isValidIP($remoteHttpIP))
$remoteHttpIP = $header['x-remote-ip'];
/*
* 相当于 $_SERVER
* 但目前只有 query_string request_method request_uri path_info request_time request_time_float
* server_port remote_port remote_addr master_time server_protocol server_software
* 这些键值
*/
$server = $request->server;
if (!isset($server['request_uri']))
throw new AnnounceException('Invalid request url !');
$oriUri = $server['request_uri'];
$requestUri = trim($oriUri, '/');
$queryString = isset($server['query_string']) ? $server['query_string'] : '';
}
/**
* @param $requestUri
* @param $passkey
* @return string
* @throws AnnounceException
*/
private function getAnnounceType($requestUri, &$passkey)
{
/*
* 先处理passkey字段
*
* announce 请求大概长这样
*
* GET http://xxx.com/auth/passkey/announce?info_hash=XXX&uploaded=XXX&downloaded=XXX
*
* passkey 为 SHA-1 40位 Hex长度
*/
$uriPart = explode('/', $requestUri);
if (count($uriPart) != 3)
throw new AnnounceException('Passkey is missing ! Re-download the torrent !');
if (strlen(trim($uriPart[1])) != 40)
throw new AnnounceException('Invalid passkey length !');
if (strspn(strtolower($uriPart[1]), 'abcdef0123456789') != 40)
throw new AnnounceException('Invalid passkey format !');
$passkey = strtolower(trim($uriPart[1]));
// 判定类型
switch (strtolower($uriPart[2])) {
case 'scrape':
return 'scrape';
break;
case 'announce':
return 'announce';
break;
default:
return 'unknown';
}
}
/**
* @param $scrapeQueries
* @throws AnnounceException
*/
private function checkScrapeFields($scrapeQueries)
{
if (!isset($scrapeQueries['info_hash']))
throw new AnnounceException('Scrape missing field info_hash');
$info_hash = $scrapeQueries['info_hash'];
if (is_array($info_hash)) {
foreach ($info_hash as $item) {
if (strlen($item) != 20)
throw new AnnounceException('Bad scrape field info_hash');
}
return;
}
throw new AnnounceException('Bad scrape parsed field info_hash');
}
private function parseDuplicateQueryString($queryString): array
{
$result = [];
foreach (explode('&', $queryString) as $pair) {
list($key, $value) = explode('=', $pair);
$value = urldecode($value);
if (isset($result[$key]))
$result[$key] = is_array($result[$key]) ? array_merge_recursive($result[$key], [$value]) : array_merge_recursive([$result[$key]], [$value]);
else
$result[$key] = [$value];
}
return $result;
}
/**
* @param Request $request
* @param $queries
* @param $userAgent
* @param MySQL $mysql
* @param Redis $redis
* @param bool $onlyCheckUA
* @throws AnnounceException
*/
private function checkUserAgent(Request $request, $queries, $userAgent, MySQL $mysql, Redis $redis, $onlyCheckUA = false)
{
/*
* block浏览器访问
*/
if (preg_match('/(^Mozilla|Browser|AppleWebKit|^Opera|^Links|^Lynx)/', $userAgent))
throw new AnnounceException('Browser access is not allowed !');
/*
* 检查header
*/
$header = $request->header;
if (isset($header['accept-language']) || isset($header['referer']) || isset($header['cookie']))
throw new AnnounceException('Abnormal access blocked !');
/*
* 阻止过长的客户端名称
* 数据库字段长度仅有 64
*/
if (strlen($userAgent) > 64)
throw new AnnounceException('UserAgent too long !');
/*
* 获得 客户端白名单
*/
$allowedFamily = $this->cacheGet($redis, self::getAgentAllowedFamilyCacheKey());
if ($allowedFamily === false) {
$allowedFamily = [];
$fetched = $mysql->query('SELECT * FROM `agent_allowed_family` WHERE `enabled` = \'yes\' ORDER BY `hits` DESC');
foreach ($fetched as $item) {
$allowedFamily[] = [
'id' => $item['id'],
'start_name' => $item['start_name'],
'peer_id_pattern' => $item['peer_id_pattern'],
'peer_id_match_num' => $item['peer_id_match_num'],
'peer_id_matchtype' => $item['peer_id_matchtype'],
'peer_id_start' => $item['peer_id_start'],
'agent_pattern' => $item['agent_pattern'],
'agent_match_num' => $item['agent_match_num'],
'agent_matchtype' => $item['agent_matchtype'],
'agent_start' => $item['agent_start'],
'exception' => $item['exception']
];
}
$this->cacheSet($redis, self::getAgentAllowedFamilyCacheKey(), $allowedFamily);
}
/*
* 获得 客户端白名单例外
*/
$allowedFamilyException = $this->cacheGet($redis, self::getAgentAllowedExceptionCacheKey());
if ($allowedFamilyException === false) {
$allowedFamilyException = [];
$fetched = $mysql->query('SELECT * FROM `agent_allowed_exception`');
foreach ($fetched as $item) {
$allowedFamilyException[] = [
'family_id' => $item['family_id'],
'name' => $item['name'],
'peer_id' => $item['peer_id'],
'agent' => $item['agent'],
'comment' => $item['comment']
];
}
$this->cacheSet($redis, self::getAgentAllowedExceptionCacheKey(), $allowedFamilyException);
}
/*
* 开始检查客户端 先检查 User-Agent 再检查 Peer id
* 最后检查Exception
*/
$agentAccepted = null;
$peerIdAccepted = null;
$acceptedAgentFamilyId = null;
$acceptedAgentFamilyException = null;
foreach ($allowedFamily as $allowedItem) {
// 每次循环前初始化 FLAG
$agentAccepted = false;
$peerIdAccepted = false;
$acceptedAgentFamilyId = 0;
$acceptedAgentFamilyException = false;
/*
* 检查 User-agent
*/
if ($allowedItem['agent_pattern'] != '') {
if (!preg_match($allowedItem['agent_pattern'], $allowedItem['agent_start'], $agentShould))
throw new AnnounceException('Agent REGEX error for ' . $allowedItem['start_name']);
if (preg_match($allowedItem['agent_pattern'], $userAgent, $agentMatched)) {
if ($allowedItem['agent_match_num'] > 0) {
for ($i = 0; $i < $allowedItem['agent_match_num']; $i++) {
if ($allowedItem['agent_matchtype'] == 'hex') {
$agentMatched[$i + 1] = hexdec($agentMatched[$i + 1]);
$agentShould[$i + 1] = hexdec($agentShould[$i + 1]);
} else {
$agentMatched[$i + 1] = intval($agentMatched[$i + 1]);
$agentShould[$i + 1] = intval($agentShould[$i + 1]);
}
// 从高位向低位比较 agent 版本号
// 高位版本号已经大于要求 不再需要循环比较
if ($agentMatched[$i + 1] > $agentShould[$i + 1]) {
$agentAccepted = true;
break;
}
// 低于要求
if ($agentMatched[$i + 1] < $agentShould[$i + 1])
throw new AnnounceException('Your client is too old. Minimum required version is ' . $allowedItem['start_name']);
// 版本号相等不需要判定 也无需抛出异常 继续进行循环即可
// 除非最后一位也相等
if ($agentMatched[$i + 1] == $agentShould[$i + 1] && $i + 1 == $allowedItem['agent_match_num']) {
$agentAccepted = true;
}
}
} else {
// 无版本号比对要求
$agentAccepted = true;
}
}
} else {
// 无 agent pattern 匹配要求
$agentAccepted = true;
}
// 如果只检查 User-agent 则下面都不需要执行了 也不会检查 Exception
if ($onlyCheckUA) {
if ($agentAccepted)
break;
else
continue;
}
// 从 请求中获得 peer_id
$peer_id = $queries['peer_id'];
/*
* 检查 Peer id
*/
if ($allowedItem['peer_id_pattern'] != '') {
if (!preg_match($allowedItem['peer_id_pattern'], $allowedItem['peer_id_start'], $peerIdShould))
throw new AnnounceException('Peer_id REGEX error for ' . $allowedItem['start_name']);
if (preg_match($allowedItem['peer_id_pattern'], $peer_id, $peerIdMatched)) {
if ($allowedItem['peer_id_match_num'] > 0) {
for ($i = 0; $i < $allowedItem['peer_id_match_num']; $i++) {
if ($allowedItem['peer_id_matchtype'] == 'hex') {
$peerIdMatched[$i + 1] = hexdec($peerIdMatched[$i + 1]);
$peerIdShould[$i + 1] = hexdec($peerIdShould[$i + 1]);
} else {
$peerIdMatched[$i + 1] = intval($peerIdMatched[$i + 1]);
$peerIdShould[$i + 1] = intval($peerIdShould[$i + 1]);
}
// 从高位向低位比较 peer_id 版本号
// 高位版本号已经大于要求 不再需要循环比较
if ($peerIdMatched[$i + 1] > $peerIdShould[$i + 1]) {
$peerIdAccepted = true;
break;
}
// 低于要求
if ($peerIdMatched[$i + 1] < $peerIdShould[$i + 1])
throw new AnnounceException('Your client is too old. Minimum required version is ' . $allowedItem['start_name']);
// 版本号相等不需要判定 也无需抛出异常 继续进行循环即可
// 除非最后一位也相等
if ($peerIdMatched[$i + 1] == $peerIdShould[$i + 1] && $i + 1 == $allowedItem['agent_match_num']) {
$peerIdAccepted = true;
}
}
} else {
// 无 Peer id 版本号要求
$peerIdAccepted = true;
}
}
} else {
// 不存在 Peer id pattern 要求
$peerIdAccepted = true;
}
// 如果有一次都接受了 那么 终止循环
if ($agentAccepted && $peerIdAccepted) {
$acceptedAgentFamilyId = $allowedItem['id'];
$acceptedAgentFamilyException = $allowedItem['exception'] == 'yes' ? true : false;
break;
}
}
/*
* 检查结果 同时检查是否存在对应的Exception
* agent 和 Peer id 必须同时满足要求
*
*/
if ($onlyCheckUA) {
if (!$agentAccepted)
throw new AnnounceException('Client ' . $userAgent . 'is not accepted for scrape !');
// 直接 return
return;
}
if ($agentAccepted && $peerIdAccepted) {
if ($acceptedAgentFamilyException) {
foreach ($allowedFamilyException as $exceptionItem) {
// 满足 Exception 条件时抛出异常
if ($exceptionItem['family_id'] == $acceptedAgentFamilyId
&& preg_match('/^' . $exceptionItem['peer_id'] . '/', $peer_id)
&& ($userAgent == $exceptionItem['agent'] || !$exceptionItem['agent'])
)
throw new AnnounceException('Client ' . $exceptionItem['name'] . ' banned due to: ' . $exceptionItem['comment']);
}
}
// 否则 什么都不做 意味着客户端校验通过
} else {
throw new AnnounceException('Client ' . $userAgent . ' is not acceptable !');
}
}
/**
* @param $queryString
* @param $queries
* @param $remoteHttpIP
* @throws AnnounceException
*/
private function checkAnnounceFields($queryString, &$queries, $remoteHttpIP)
{
/*
* 先解析 Query String
*/
// 保存最初解析出的数组
$queriesOri = [];
// 解析str为键值数组
parse_str($queryString, $queriesOri);
// 处理键值数组所有key为小写
foreach ($queriesOri as $key => $val) {
$queries[strtolower($key)] = $val;
}
// 取出数组的所有键值 (小写)
$queryKeys = array_keys($queries);
// 必须键值
$fieldsNeed = [
'info_hash', 'peer_id', 'port',
'uploaded', 'downloaded', 'left'
];
// 与需求键值比对
foreach ($fieldsNeed as $field) {
if (!in_array($field, $queryKeys))
throw new AnnounceException('Announce missing field ' . $field);
}
/*
* 检查键值是否合法
*
*/
// 长度必须为 20 bytes
foreach (['info_hash', 'peer_id'] as $item) {
if (strlen($queries[$item]) != 20)
throw new AnnounceException('Bad announce field ' . $item);
}
foreach (['uploaded', 'downloaded', 'left'] as $item) {
if (!is_numeric($queries[$item]) || $queries[$item] < 0)
throw new AnnounceException('Bad announce field ' . $item);
}
// 一般来说port 必须在 1-65535 但特殊情况下(如 event=stopped) port可能为0
$portBlacklist = [22, 80, 443, 53, 3389, 8080, 65000, 65001, 65002, 65003, 65004, 65005, 65006, 65007, 65008, 65009, 65010];
if (!is_numeric($queries['port']) || $queries['port'] < 0 || $queries['port'] > 0xffff || in_array($queries['port'], $portBlacklist))
throw new AnnounceException('Illegal port ' . $queries['port'] . '. Port should between 6881-64999');
else
$queries['ipv4_port'] = $queries['ipv6_port'] = $queries['port'];
/*
* 检查可选键值 不存在则赋为默认
*/
$fieldOpt = [
'event' => '', 'no_peer_id' => 1, 'compact' => 0,
'ip' => '', 'ipv6' => '', 'ipv4' => '',
// 常见可能键值
'numwant' => 80, 'corrupt' => 0,
// 以下为方便而添加的键值 BT及其扩展协议不存在该键值
'ipv6_port' => 0, 'ipv4_port' => 0, 'ipv6_http' => ''
];
foreach ($fieldOpt as $field => $value) {
if (!isset($queries[$field])) {
$queries[$field] = $value;
}
}
/*
* 字段合法性检查
*/
foreach (['numwant', 'corrupt', 'no_peer_id', 'numwant', 'compact'] as $item) {
if (!is_numeric($queries[$item]) || $queries[$item] < 0)
throw new AnnounceException('Illegal ' . $item . ' ' . $queries[$item]);
}
// 检查 event 类型
if (!in_array(strtolower($queries['event']), ['started', 'completed', 'stopped', 'paused', '']))
throw new AnnounceException('Unsupported event type ' . $queries['event']);
// 检查特殊情况下下的port 只有 stopped 事件下port可以为0
if ($queries['port'] == 0 && strtolower($queries['event']) != 'stooped')
throw new AnnounceException('Illegal port 0 under event=' . $queries['event']);
/*
* IPv6 字段一定是 Native IPv6地址 不包括 link-local site-local loop-back Terodo 6to4
*
* 该字段只存储 &ipv6=XXX 回报的地址
*
* 但 announce 的 IPv6 字段也可能是 IPv4-mapped-IPv6 地址
* 也有可能是 IPv6 end-point [address]:port
*
* 处理完后 ipv6 一定是 Native IPv6 地址
*/
// 如果 IPv6 不是 end-point 格式
if (!$client = $this->isEndPoint($queries['ipv6'])) {
// IPv6 若无效 (不Native) 即置为空
if (!$this->isNativeIPv6($queries['ipv6']))
$queries['ipv6'] = '';
} else {
// IPv6 是 end-point 格式
// 且不Native 即置为空
if (!$this->isNativeIPv6($client['ip'])) {
$queries['ipv6'] = '';
} else {
// Native IPv6 进行记录
$queries['ipv6'] = $client['ip'];
$queries['ipv6_port'] = $client['port'];
}
}
// 最后检测下是否为 mapped ipv6 字段 如有转换成正常 IPv6 地址
if (strpos($queries['ipv6'], '.') !== false)
$queries['ipv6'] = $this->unMapIPv6($queries['ipv6']);
/*
* IPv6 HTTP 字段用来保存 IPv6 HTTP 地址
*/
// 该字段本来就不存在于 announce 中 故直接检查 $remoteHttpIP 是否符合要求了
if ($this->isNativeIPv6($remoteHttpIP) && $remoteHttpIP != $queries['ipv6'])
$queries['ipv6_http'] = $remoteHttpIP;
/*
* IP 字段用来保存 IPv4 HTTP 地址
*
* IP 字段一定是 DNS记录 或是 有效IPv4 (非保留段 0.0.0.0/8 127.0.0.0/8 240.0.0.0/8 169.254.0.0/16)
* 且 DNS PTR 长度不得超过 64 数据库字段长度限制
*/
// IP 字段若无效 且 不是有效 DNS PTR
if (!$this->isValidIPv4($queries['ip']) && !($this->isValidDNSPTR($queries['ip']) && strlen($queries['ip']) < 64)) {
// 且 remoteHttpIp 有效
if ($this->isValidIPv4($remoteHttpIP))
$queries['ip'] = $remoteHttpIP;
else
$queries['ip'] = '';
}
/*
* IPv4 字段可以是 有效IPv4 地址
* 或者是 IPv4 end-point address:port
*/
// 如果 IPv4 不是 end-point 格式
if (!$client = $this->isEndPoint($queries['ipv4'])) {
// 如果 IPv4 地址无效 或 与 IP 字段相同 置空
if (!$this->isValidIPv4($queries['ipv4']) || $queries['ipv4'] == $queries['ip'])
$queries['ipv4'] = '';
} else {
// IPv4 是 end-point 格式
// 无效 或 与 IP 字段相同 即置为空
if (!$this->isValidIPv4($client['ip']) || $queries['ipv4'] == $queries['ip']) {
$queries['ipv4'] = '';
} else {
// 有效 进行记录
$queries['ipv4'] = $client['ip'];
$queries['ipv4_port'] = $client['port'];
}
}
// 最终 如果 IPv4 不是有效IP 且 IP 也不是有效IP 则记录为 HttpIP
if (!$this->isValidIPv4($queries['ipv4']) && !$this->isValidIPv4($queries['ip']) && $this->isValidIPv4($remoteHttpIP))
$queries['ipv4'] = $remoteHttpIP;
}
/**
* @param $passkey
* @param $userId
* @param MySQL $mysql
* @param Redis $redis
* @throws AnnounceException
*/
private function checkPasskey($passkey, &$userId, MySQL $mysql, Redis $redis)
{
$userId = $this->cacheGet($redis, self::getPasskeyUserCacheKey($passkey));
if ($userId === false) {
$stmt = $this->prepareSQLStatement($mysql, 'SELECT `id` FROM `users` WHERE `passkey` = ? LIMIT 1');
$fetched = $stmt->execute([0 => $passkey]);
if (count($fetched)) {
$userId = $fetched[0]['id'];
$this->cacheSet($redis, self::getPasskeyUserCacheKey($passkey), $userId, 3600 * 24);
} else {
$userId = 0;
$this->cacheSet($redis, self::getPasskeyUserCacheKey($passkey), $userId, 3600 * 24);
throw new AnnounceException('Invalid passkey !');
}
} else if ($userId == 0) {
throw new AnnounceException('Invalid passkey !');
}
}
/**
* @param $queries
* @param $torrentId
* @param MySQL $mysql
* @param Redis $redis
* @param bool $isScrape
* @throws AnnounceException
*/
private function checkInfoHash($queries, &$torrentId, MySQL $mysql, Redis $redis, $isScrape = false)
{
// 如果是 announce 请求 只有一个info_hash 字段
if (!$isScrape) {
$infoHash = $queries['info_hash']; // 20 bytes binary
$hexInfoHash = self::binInfoHash2Hex($infoHash);
$torrentId = $this->getTorrentIdBySHA1InfoHash($hexInfoHash, $mysql, $redis);
// 为 0 则代表种子未注册
if ($torrentId == 0)
throw new AnnounceException('Torrent not registered with this tracker !');
} else {
// 视为 scrape 请求
$scrapedTorrents = [];
$infoHashes = $queries['info_hash']; // 数组
foreach ($infoHashes as $infoHash) {
$hexInfoHash = self::binInfoHash2Hex($infoHash);
$tid = $this->getTorrentIdBySHA1InfoHash($hexInfoHash, $mysql, $redis);
// 只统计 注册过的种子
if ($tid != 0) {
$scrapedTorrents[] = [
'info_hash' => $infoHash,
'id' => $tid
];
}
}
// 如果没有注册过的种子
if (!count($scrapedTorrents))
throw new AnnounceException('Scraped torrents not registered with this tracker !');
// 否则返回种子列表
return $scrapedTorrents;
}
}
/**
* @param $sha1HexFormat
* @param MySQL $mysql
* @param Redis $redis
* @return bool|int|mixed
* @throws AnnounceException
*/
private function getTorrentIdBySHA1InfoHash($sha1HexFormat, MySQL $mysql, Redis $redis)
{
$torrentId = $this->cacheGet($redis, self::getInfoHashTorrentCacheKey($sha1HexFormat));
if ($torrentId === false) {
$stmt = $this->prepareSQLStatement($mysql, 'SELECT `id` FROM `torrents` WHERE `info_hash` = ? LIMIT 1');
$fetched = $stmt->execute([0 => $sha1HexFormat]);
if (count($fetched)) {
$torrentId = $fetched[0]['id'];
$this->cacheSet($redis, self::getInfoHashTorrentCacheKey($sha1HexFormat), $torrentId, 3600 * 24 * 7);
} else {
$torrentId = 0;
$this->cacheSet($redis, self::getInfoHashTorrentCacheKey($sha1HexFormat), $torrentId, 3600 * 24 * 7);
}
}
return $torrentId;
}
/**
* @param $scrapedTorrents
* @param MySQL $mysql
* @param Redis $redis
* @return string
* @throws AnnounceException
*/
private function generateScrapeResponse($scrapedTorrents, $userId, MySQL $mysql, Redis $redis)
{
// 生成 需要的 bencode 对应数据结构
$torrents = [];
$response = [
'type' => 'dictionary',
'value' => [
'files' => [
'type' => 'dictionary',
'value' => &$torrents
]
]
];
foreach ($scrapedTorrents as $torrent) {
$tRes = $this->getTorrentById($torrent['id'], $mysql, $redis);
if ($tRes === false || ($tRes['reviewed'] == 'no' && $tRes['user_id'] != $userId))
continue;
// 生成响应
$torrents[$torrent['info_hash']] = [
'type' => 'dictionary',
'value' => [
'complete' => [
'type' => 'integer',
'value' => $tRes['seeders']
],
'downloaded' => [
'type' => 'integer',
'value' => $tRes['times_completed']
],
'incomplete' => [
'type' => 'integer',
'value' => $tRes['leechers']
]
]
];
}
if (count($torrents))
return self::bencode($response);
else
throw new AnnounceException('Scraped torrents not registered with this tracker !');
}
/**
* @param $id
* @param MySQL $mysql
* @param Redis $redis
* @return bool|mixed
* @throws AnnounceException
*/
private function getTorrentById($id, MySQL $mysql, Redis $redis)
{
$tRes = $this->cacheGet($redis, self::getTorrentIdResCacheKey($id));
if ($tRes === false) {
$stmt = $this->prepareSQLStatement($mysql, 'SELECT `seeders`, `times_completed`, `leechers`, `user_id`, `reviewed` FROM `torrents` WHERE `id` = ? LIMIT 1');
$fetched = $stmt->execute([0 => $id]);
if (count($fetched)) {
$tRes = $fetched[0];
$this->cacheSet($redis, self::getTorrentIdResCacheKey($id), $tRes, 3600 * 3);
}
}
return $tRes;
}
/**
* @param $id
* @param $torrentRes
* @param Redis $redis
*/
private function updateTorrentCacheById($id, $torrentRes, Redis $redis)
{
$this->cacheSet($redis, self::getTorrentIdResCacheKey($id), $torrentRes, 3600 * 3);
}
/**
* @param $queries
* @param $userId
* @param $torrentId
* @param MySQL $mysql
* @param Redis $redis
* @throws AnnounceException
*/
private function processAnnounceRequest($queries, $userAgent, $userId, $torrentId, MySQL $mysql, Redis $redis)
{
// 导出所有键值变量
// extract($queries, EXTR_OVERWRITE);
/*
* 种子权限检查
*/
$torrentRes = $this->getTorrentById($torrentId, $mysql, $redis);
if ($torrentRes['reviewed'] == 'no' && $torrentRes['user_id'] != $userId)
throw new AnnounceException('Torrent temporarily banned for review.');
/*
* user数据
*/
$userRes = $this->getUserById($userId, $mysql);
/*
* 根据 left 字段判定是否为seeder
*/
$seeder = $queries['left'] ? 'no' : 'yes';
// 支持 partial seed Extension 区分暂停或不完全下载状态
$seederForDB = $queries['event'] == 'paused' ? 'partial' : $seeder;
/*
* peers表 self 需要的 field
*/
$fieldSelf = ['`uploaded`', '`downloaded`', '`updated_at`',
'(UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(`updated_at`)) AS `duration`'];
$fieldSelfJoined = join(',', $fieldSelf);
/*
* 尝试在 peers 表寻找session记录
*/
$selfStmt = $this->prepareSQLStatement($mysql, "SELECT {$fieldSelfJoined} FROM `peers` WHERE `user_id` = ? AND `torrent_id` = ? AND `peer_id` = ? LIMIT 1");
$selfFetched = $selfStmt->execute([
0 => $userId,
1 => $torrentId,
2 => self::binPeerId2Hex($queries['peer_id'])
]);
if (count($selfFetched))
$self = $selfFetched[0];
else
unset($self);
$userUpdateSet = [];
/*
* 如果不存在 session 记录
*/
if (!isset($self)) {
/*
* 开始新session前检查帐号权限
*/
$selfCountStmt = $this->prepareSQLStatement($mysql, 'SELECT COUNT(*) AS `count` FROM `peers` WHERE `user_id` = ? AND `torrent_id` = ?');
$selfCount = $selfCountStmt->execute([
0 => $userId,
1 => $torrentId
])[0]['count'];
// 禁止多地同时下载
if ($selfCount >= 1 && $seeder == 'no')
throw new AnnounceException('You are already downloading the same torrent. You can only leech from one location at a time!');
// 禁止3地以上做种
if ($selfCount >= config('tracker.user_max_seed') && $seeder == 'yes')
throw new AnnounceException('You cannot seed the same torrent from more than 3 locations.');
} else {
/*
* $self 存在
* 继续已有的session
*/
// 计算本次Announce数据
$trueUploaded = max(0, $queries['uploaded'] - $self['uploaded']);
$trueDownloaded = max(0, $queries['downloaded'] - $self['downloaded']);
$duration = max(0, $self['duration']);
// 记录需要更新的user字段
$userUpdateSet['uploaded'] = $userRes['uploaded'] + $trueUploaded;
$userUpdateSet['downloaded'] = $userRes['downloaded'] + $trueDownloaded;
}
/*
* 查找历史下载记录
*/
$selfRecordStmt = $this->prepareSQLStatement($mysql, 'SELECT `uploaded`, `downloaded`, `seed_time`, `leech_time` FROM `snatched` WHERE `torrent_id` = ? AND `user_id` = ?');
$selfRecordFetched = $selfRecordStmt->execute([
0 => $torrentId,
1 => $userId
]);
if (count($selfRecordFetched))
$selfRecord = $selfRecordFetched[0];
else
unset($selfRecord);
/*
* 开始根据事件类型记录处理
*/
$torrentUpdateSet = [];
/*
* 存在 session 必存在历史记录
*/
if (isset($self) && $queries['event'] == 'stopped') {
// 做种停止 事件
// 删除 session
$delPeerStmt = $this->prepareSQLStatement($mysql, 'DELETE FROM `peers` WHERE `user_id` = ? AND `torrent_id` = ? AND `peer_id` = ?');
$delPeerStmt->execute([
0 => $userId,
1 => $torrentId,
2 => self::binPeerId2Hex($queries['peer_id'])
]);
if ($delPeerStmt->affected_rows) {
// torrent
// 历史记录更新
if ($seeder == 'yes') {
$torrentUpdateSet['seeders'] = $torrentRes['seeders'] - ($torrentRes['seeders'] ? 1 : 0);
$torrentRes['seeders'] = $torrentUpdateSet['seeders'];
$timeKey = 'seed_time';
} else {
$torrentUpdateSet['leechers'] = $torrentRes['leechers'] - ($torrentRes['leechers'] ? 1 : 0);
$torrentRes['leechers'] = $torrentUpdateSet['leechers'];
$timeKey = 'leech_time';
}
$recordUpdateStmt = $this->prepareSQLStatement($mysql,
"UPDATE `snatched` SET `uploaded` = ?, `downloaded` = ?, `to_go` = ?,
`{$timeKey}` = ?, `peer_id` = ?, `agent` = ?, `updated_at` = NOW()
WHERE `torrent_id` = ? AND `user_id` = ?");
// 更新记录
$recordUpdateStmt->execute([
0 => $selfRecord['uploaded'] + $trueUploaded,
1 => $selfRecord['downloaded'] + $trueDownloaded,
2 => $queries['left'],
3 => $selfRecord[$timeKey] + $duration,
4 => self::binPeerId2Hex($queries['peer_id']),
5 => $userAgent,
6 => $torrentId,
7 => $userId
]);
}
} elseif (isset($self)) {
// session 存在下的任何 非停止事件
/*
* 特殊 下载完成事件
*/
$recordExtraUpdateField = '';
if ($queries['event'] == 'completed') {
// 更新完成数
$torrentUpdateSet['times_completed'] = $torrentRes['times_completed'] + 1;
$torrentRes['times_completed'] = $torrentUpdateSet['times_completed'];
// 更新 下载人数
$torrentUpdateSet['leechers'] = $torrentRes['leechers'] - ($torrentRes['leechers'] ? 1 : 0);
$torrentRes['leechers'] = $torrentUpdateSet['leechers'];
// 更新 做种人数
$torrentUpdateSet['seeders'] = $torrentRes['seeders'] + 1;
$torrentRes['seeders'] = $torrentUpdateSet['seeders'];
// 额外的记录更新SQL
$recordExtraUpdateField = '`finished` = \'yes\', `finished_at` = NOW(),';
}
$ipSet = $this->getIpField($queries);
/*
* 更新 peers表
*/
$peerUpdateStmt = $this->prepareSQLStatement($mysql,
"UPDATE `peers` SET {$ipSet},
`port` = ?, `ipv4_port` = ?, `ipv6_port` = ?, `agent` = ?,
`uploaded` = ?, `downloaded` = ?, `to_go` = ?, `seeder` = ?,
`prev_uploaded` = ?, `prev_downloaded` = ?, `corrupt` = ?,
`prev_action_at` = ?, `updated_at` = NOW()
WHERE `torrent_id` = ? AND `user_id` = ? AND `peer_id` = ?");
$peerUpdateStmt->execute([
0 => $queries['port'],
1 => $queries['ipv4_port'],
2 => $queries['ipv6_port'],
3 => $userAgent,
4 => $self['uploaded'] + $trueUploaded,
5 => $self['downloaded'] + $trueDownloaded,
6 => $queries['left'],
7 => $seederForDB,
8 => $self['uploaded'],
9 => $self['downloaded'],
10 => $queries['corrupt'],
11 => $self['updated_at'],
12 => $torrentId,
13 => $userId,
14 => self::binPeerId2Hex($queries['peer_id'])
]);
/*
* 更新snatched表
*/
$timeKey = $seeder == 'yes' ? 'seed_time' : 'leech_time';
// 注意 $recordExtraUpdateField 如果存在就带有一个逗号分隔符 不用额外逗号
$recordUpdateStmt = $this->prepareSQLStatement($mysql,
"UPDATE `snatched` SET {$ipSet}, {$recordExtraUpdateField}
`port` = ?, `uploaded` = ?, `downloaded` = ?, `to_go` = ?,
`{$timeKey}` = ?, `peer_id` = ?, `agent` = ?, `updated_at` = NOW()
WHERE `torrent_id` = ? AND `user_id` = ?");
$recordUpdateStmt->execute([
0 => $queries['port'],
1 => $selfRecord['uploaded'] + $trueUploaded,
2 => $selfRecord['downloaded'] + $trueDownloaded,
3 => $queries['left'],
4 => $selfRecord[$timeKey] + $duration,
5 => self::binPeerId2Hex($queries['peer_id']),
6 => $userAgent,
7 => $torrentId,
8 => $userId
]);
} elseif ($queries['event'] != 'stopped') {
// session 不存在情况下的 任何非停止事件
$ipSet = $this->getIpField($queries);
/*
* 即在此开始新session
* 先向peers 表中插入记录
*/
$peerInsertStmt = $this->prepareSQLStatement($mysql,
"INSERT INTO `peers` SET {$ipSet},
`torrent_id` = ?, `user_id` = ?, `peer_id` = ?,
`port` = ?, `ipv4_port` = ?, `ipv6_port` = ?, `agent` = ?,
`uploaded` = ?, `downloaded` = ?, `to_go` = ?, `seeder` = ?,
`prev_uploaded` = ?, `prev_downloaded` = ?, `corrupt` = ?,
`prev_action_at` = NOW(), `created_at` = NOW(), `updated_at` = NOW()
ON DUPLICATE KEY UPDATE {$ipSet},
`port` = ?, `ipv4_port` = ?, `ipv6_port` = ?, `agent` = ?,
`uploaded` = ?, `downloaded` = ?, `to_go` = ?, `seeder` = ?,
`corrupt` = ?, `prev_action_at` = NOW(), `updated_at` = NOW()");
$peerInsertStmt->execute([
// 主键部分
0 => $torrentId,
1 => $userId,
2 => self::binPeerId2Hex($queries['peer_id']),
// INSERT 部分
3 => $queries['port'],
4 => $queries['ipv4_port'],
5 => $queries['ipv6_port'],
6 => $userAgent,
7 => $queries['uploaded'],
8 => $queries['downloaded'],
9 => $queries['left'],
10 => $seederForDB,
11 => $queries['uploaded'],
12 => $queries['downloaded'],
13 => $queries['corrupt'],
// UPDATE 部分
14 => $queries['port'],
15 => $queries['ipv4_port'],
16 => $queries['ipv6_port'],
17 => $userAgent,
18 => $queries['uploaded'],
19 => $queries['downloaded'],
20 => $queries['left'],
21 => $seederForDB,
22 => $queries['corrupt']
]);
$recordExtraField = '';
// 记录做种数量更新
if ($peerInsertStmt->affected_rows) {
if ($seeder == 'yes') {
$torrentUpdateSet['seeders'] = $torrentRes['seeders'] + 1;
$torrentRes['seeders'] = $torrentUpdateSet['seeders'];
} else {
$torrentUpdateSet['leechers'] = $torrentRes['leechers'] + 1;
$torrentRes['leechers'] = $torrentUpdateSet['leechers'];
}
if ($queries['event'] == 'completed') {
// 缓存及BD更新
$torrentUpdateSet['times_completed'] = $torrentRes['times_completed'] + 1;
$torrentRes['times_completed'] = $torrentUpdateSet['times_completed'];
// 额外的 SQL
$recordExtraField = '`finished` = \'yes\', `finished_at` = NOW(),';
}
}
/*
* 更新或直接插入snatched 记录
*/
if (!isset($selfRecord)) {
// 记录不存在 插入
$recordInsertStmt = $this->prepareSQLStatement($mysql,
"INSERT INTO `snatched` SET {$ipSet}, {$recordExtraField}
`torrent_id` = ?, `user_id` = ?,
`port` = ?, `uploaded` = ?, `downloaded` = ?, `to_go` = ?,
`peer_id` = ?, `agent` = ?,
`created_at` = NOW(), `updated_at` = NOW()
ON DUPLICATE KEY UPDATE {$ipSet}, {$recordExtraField}
`port` = ?, `uploaded` = ?, `downloaded` = ?, `to_go` = ?,
`peer_id` = ?, `agent` = ?,
`updated_at` = NOW()");
$recordInsertStmt->execute([
// 主键
0 => $torrentId,
1 => $userId,
// INSERT
2 => $queries['port'],
3 => $queries['uploaded'],
4 => $queries['downloaded'],
5 => $queries['left'],
6 => self::binPeerId2Hex($queries['peer_id']),
7 => $userAgent,
// UPDATE
8 => $queries['port'],
9 => $queries['uploaded'],
10 => $queries['downloaded'],
11 => $queries['left'],
12 => self::binPeerId2Hex($queries['peer_id']),
13 => $userAgent
]);
} else {
// 记录存在 更新
$recordUpdateStmt = $this->prepareSQLStatement($mysql,
"UPDATE `snatched` SET {$ipSet}, {$recordExtraField}
`port` = ?, `uploaded` = ?, `downloaded` = ?, `to_go` = ?,
`peer_id` = ?, `agent` = ?,
`updated_at` = NOW()
WHERE `torrent_id` = ? AND `user_id` = ?");
$recordUpdateStmt->execute([
0 => $queries['port'],
1 => $selfRecord['uploaded'] + $queries['uploaded'],
2 => $selfRecord['downloaded'] + $queries['downloaded'],
3 => $queries['left'],
4 => self::binPeerId2Hex($queries['peer_id']),
5 => $userAgent,
6 => $torrentId,
7 => $userId
]);
}
}
// 更新 users 字段
if (count($userUpdateSet)) {
$userUpdateField = $this->getUpdateField($userUpdateSet);
$mysql->query("UPDATE `users` SET {$userUpdateField} WHERE `id` = '{$userId}'");
}
// 更新 torrents 字段
if (count($torrentUpdateSet)) {
$torrentUpdateField = $this->getUpdateField($torrentUpdateSet);
$mysql->query("UPDATE `torrents` SET {$torrentUpdateField} WHERE `id` = '{$torrentId}'");
}
// 存入修改后的torrent 缓存
$this->updateTorrentCacheById($torrentId, $torrentRes, $redis);
}
/**
* @param $updateSet
* @return string
*/
private function getUpdateField($updateSet)
{
$sqlSet = [];
foreach ($updateSet as $key => $val) {
$sqlSet[] = "`{$key}` = '{$val}'";
}
return join(', ', $sqlSet);
}
/**
* @param $userId
* @param MySQL $mysql
* @return bool
* @throws AnnounceException
*/
private function getUserById($userId, MySQL $mysql)
{
$stmt = $this->prepareSQLStatement($mysql, 'SELECT `uploaded`, `downloaded` FROM `users` WHERE `id` = ? LIMIT 1');
$fetched = $stmt->execute([0 => $userId]);
if (count($fetched))
$uRes = $fetched[0];
else
$uRes = false;
return $uRes;
}
/**
* @param $queries
* @return string
*/
private function getIpField($queries)
{
$setField = [];
if ($queries['ip'])
$setField[] = '`ip` = \'' . $queries['ip'] . '\'';
if ($queries['ipv6'])
$setField[] = '`ipv6` = \'' . $queries['ipv6'] . '\'';
if ($queries['ipv6_http'])
$setField[] = '`ipv6_http` = \'' . $queries['ipv6_http'] . '\'';
return join(', ', $setField);
}
/**
* @param $queries
* @param $userId
* @param $torrentId
* @param MySQL $mysql
* @param Redis $redis
* @return string
* @throws AnnounceException
*/
private function generateAnnounceResponse($queries, $userId, $torrentId, MySQL $mysql, Redis $redis)
{
$seeder = $queries['left'] ? 'no' : 'yes';
$torrentRes = $this->getTorrentById($torrentId, $mysql, $redis);
$fieldsPeer = ['`seeder`', '`user_id`', '`peer_id`', '`ip`', '`ipv6`', '`ipv6_http`', '`port`', '`ipv4_port`', '`ipv6_port`'];
$fieldsPeerJoined = join(', ', $fieldsPeer);
$peerStmt = $this->prepareSQLStatement($mysql, "SELECT {$fieldsPeerJoined} FROM `peers` WHERE `torrent_id` = ?");
$peers = $peerStmt->execute([ 0 => $torrentId ]);
$peerList = [];
$response = [
'type' => 'dictionary',
'value' => [
'interval' => [
'type' => 'integer',
'value' => 60
],
'min interval' => [
'type' => 'integer',
'value' => 30
],
'complete' => [
'type' => 'integer',
'value' => $torrentRes['seeders']
],
'incomplete' => [
'type' => 'integer',
'value' => $torrentRes['leechers']
],
'peers' => [
'type' => 'list',
'value' => &$peerList
]
]
];
if (count($peers)) {
foreach ($peers as $peer) {
if (($seeder == 'yes' && $peer['seeder'] == 'yes') || $peer['peer_id'] == self::binPeerId2Hex($queries['peer_id']))
continue;
if ($peer['ip'])
$peerList[] = $this->getPeerListStructure($peer['ip'], $peer['ipv4_port']);
if ($peer['ipv6'])
$peerList[] = $this->getPeerListStructure($peer['ipv6'], $peer['ipv6_port']);
if ($peer['ipv6_http'])
$peerList[] = $this->getPeerListStructure($peer['ipv6_http'], $peer['ipv6_port']);
}
}
return self::bencode($response);
}
/**
* @param $ip
* @param $port
* @return array
*/
private function getPeerListStructure($ip, $port)
{
return [
'type' => 'dictionary',
'value' => [
'ip' => [
'type' => 'string',
'value' => $ip
],
'port' => [
'type' => 'integer',
'value' => $port
]
]
];
}
/**
* @param MySQL $mysql
* @param $sql
* @throws DatabaseException
*/
private function prepareSQLStatement(MySQL $mysql, $sql)
{
$stmt = $mysql->prepare($sql);
if ($stmt === false) {
$errorInfo = $mysql->error;
$mysql->close();
throw new DatabaseException('SQL prepare failed: ' . $errorInfo);
} else {
return $stmt;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment