Skip to content

Instantly share code, notes, and snippets.

@jshensh
Created May 9, 2021 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jshensh/b7f05cc2e1ebf600a1eedbf4a30bc346 to your computer and use it in GitHub Desktop.
Save jshensh/b7f05cc2e1ebf600a1eedbf4a30bc346 to your computer and use it in GitHub Desktop.
curl_multi_exec test
root@debian:~/php-curl-class# php test.php
Start: 1620584188.6943
curl_multi_exec: 1620584188.6959
sleep: 1620584191.6963
test1 received data at: 1620584193.697
test3 received data at: 1620584193.6971
test4 received data at: 1620584193.6979
test2 received data at: 1620584193.6989
test5 received data at: 1620584198.6994
test6 received data at: 1620584198.6994
test7 received data at: 1620584198.7006
test8 received data at: 1620584198.7009
array(8) {
["test1"]=>
string(40) "strlen: 259 data: float(1620584193.6961)"
["test3"]=>
string(40) "strlen: 259 data: float(1620584193.6966)"
["test4"]=>
string(40) "strlen: 259 data: float(1620584193.6969)"
["test2"]=>
string(40) "strlen: 259 data: float(1620584193.6965)"
["test5"]=>
string(40) "strlen: 259 data: float(1620584198.6985)"
["test6"]=>
string(39) "strlen: 258 data: float(1620584198.699)"
["test7"]=>
string(40) "strlen: 259 data: float(1620584198.6994)"
["test8"]=>
string(37) "strlen: 256 data: float(1620584198.7)"
}
Done: 1620584198.7015
<?php
// composer require jshensh/php-curl-class
require('./vendor/autoload.php');
use CustomCurl\Client;
/**
* Custom Curl 多线程类
* @author jshensh <admin@imjs.work>
*/
class MultiTest
{
// $chDataArr = [['cookieJar' => $cookieJar, 'code' => $code, 'reRequest' => $reRequest]];
private $multiOptions = [
'concurrency' => null
],
$clientArr = [],
$chArr = [],
$chDataArr = [],
$chArrEndFlag = false;
/**
* 构造方法
* @access public
* @param array $clientArr CustomCurl\Client 的集合数组
* @param array $options 选项数组
* @return void
*/
public function __construct($clientArr, $options = [])
{
foreach ($clientArr as $client) {
if (!($client instanceof Client)) {
throw new \Exception('Argument 1 must be a collection of instances of CustomCurl\\Client');
}
}
$this->multiOptions = array_merge($this->multiOptions, $options);
$this->clientArr = $clientArr;
}
/**
* 获取 $clientArr 中的 Curl 句柄
* @access private
* @param int|string $clientIndex 索引
* @return resource
*/
private function getCh($clientIndex = null)
{
if ($clientIndex === null) {
if ($this->chArrEndFlag) {
return false;
}
$clientIndex = key($this->clientArr);
}
if (!isset($this->clientArr[$clientIndex])) {
return false;
}
$client = $this->clientArr[$clientIndex];
$this->chArr[$clientIndex] = $client->getHandle();
$cookieJar = &$client->getCookieJar();
if (!isset($this->chDataArr[$clientIndex])) {
$this->chDataArr[$clientIndex] = [
'cookieJar' => $cookieJar,
'code' => null,
'reRequest' => $client->get('reRequest')
];
}
if (next($this->clientArr) === false) {
$this->chArrEndFlag = true;
}
return $this->chArr[$clientIndex];
}
/**
* 执行多线程请求
* @access public
* @return array
*/
public function exec($callback = null)
{
$mh = curl_multi_init();
$active = null;
$result = [];
for ($i = 0; $i < ($this->multiOptions['concurrency'] !== null ? $this->multiOptions['concurrency'] : count($this->clientArr)); $i++) {
$ch = $this->getCh();
if (!$ch) {
break;
}
curl_multi_add_handle($mh, $ch);
}
$reRequestPool = [];
do {
$mrc = curl_multi_exec($mh, $active);
} while ($mrc == CURLM_CALL_MULTI_PERFORM);
echo "curl_multi_exec: " . microtime(true) . "\n";
sleep(3);
echo "sleep: " . microtime(true) . "\n";
while ($active && $mrc == CURLM_OK) {
if (curl_multi_select($mh) != -1) {
do {
$mrc = curl_multi_exec($mh, $active);
while ($info = curl_multi_info_read($mh)) {
$index = array_search($info['handle'], $this->chArr, true);
if ($index !== false) {
$this->chDataArr[$index]['reRequest']--;
$output = curl_multi_getcontent($info['handle']);
if (((int) $info['result'] || !$output) && $this->chDataArr[$index]['reRequest'] > 0) {
$reRequestPool[] = $index;
curl_multi_remove_handle($mh, $info['handle']);
curl_close($info['handle']);
}
$this->chDataArr[$index]['code'] = (int) $info['result'];
if ($output) {
if ($callback) {
$result[$index] = $callback($output, $index);
} else {
$result[$index] = new Statement(0, $ch, $output, $this->chDataArr[$i]['cookieJar']);
}
}
}
}
} while ($mrc == CURLM_CALL_MULTI_PERFORM);
}
if ($reRequestPool || ($this->multiOptions['concurrency'] !== null && $active < $this->multiOptions['concurrency'])) {
$nextCh = $this->getCh();
$nextCh = $nextCh ? $nextCh : $this->getCh(array_shift($reRequestPool));
if ($nextCh) {
curl_multi_add_handle($mh, $nextCh);
$mrc = curl_multi_exec($mh, $active);
}
}
}
foreach ($this->chArr as $i => $ch) {
$output = curl_multi_getcontent($ch);
$curlErrNo = $this->chDataArr[$i]['code'];
if ($curlErrNo === 0 && $output) {
// $result[$i] = new Statement(0, $ch, $output, $this->chDataArr[$i]['cookieJar']);
} else {
$result[$i] = new Statement($curlErrNo, $ch, $output);
}
curl_multi_remove_handle($mh, $ch);
}
return $result;
}
}
echo "Start: " . microtime(true) . "\n";
Client::setConf('timeout', 10);
$pool = new MultiTest([
'test1' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
'test2' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
'test3' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
'test4' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
'test5' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
'test6' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
'test7' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
'test8' => Client::init('http://127.0.0.1/testSleep.php?t=5'),
], ['concurrency' => 4]);
$res = $pool->exec(function($v, $k) {
echo "{$k} received data at: " . microtime(true) . "\n";
return 'strlen: ' . strlen($v) . ' data: ' . substr($v, strrpos($v, "\r\n") + 2, -1);
});
var_dump($res);
echo "Done: " . microtime(true) . "\n";
<?php
sleep((int) $_GET['t']);
var_dump(microtime(true));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment