Skip to content

Instantly share code, notes, and snippets.

@sebastianhoitz
Created March 10, 2011 14:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sebastianhoitz/864208 to your computer and use it in GitHub Desktop.
Save sebastianhoitz/864208 to your computer and use it in GitHub Desktop.
<?php
class Twitter_StreamingClient extends Twitter_Phirehose
{
const URL_BASE = 'http://stream.twitter.com/1/statuses/';
const METHOD_USER = 'user';
const CONNECT_OAUTH = 'oauth';
const CONNECT_BASIC = 'basic';
protected $status_length_base = 10; // for some reason, the userstream uses hexadecimal status lengths
protected $auth_method;
protected $twitterAccount = null;
public static function Initialize($basic_username = NULL,
$basic_password = NULL, $oauth_token = NULL,
$oauth_secret = NULL)
{
if (!self::$instance instanceof Twitter_StreamingClient)
{
self::$instance = new Twitter_StreamingClient(
$basic_username,
$basic_password,
Twitter_StreamingClient::METHOD_USER
);
}
}
public function __construct($username, $password,
$method = Twitter_StreamingClient::METHOD_USER,
$format = self::FORMAT_JSON,
$auth_method = Twitter_StreamingClient::CONNECT_OAUTH)
{
parent::__construct($username, $password, $method, $format);
$this->auth_method = $auth_method;
}
public function setTwitterAccount(App_Model_Source_Twitter $twitterAccount)
{
$this->twitterAccount = $twitterAccount;
}
protected function connect()
{
if ($this->auth_method === Twitter_StreamingClient::CONNECT_OAUTH)
{
$this->connect_oauth();
}
else
{
$this->connect_basic();
}
}
/**
* Connects to the stream URL using the configured method.
*/
protected function connect_basic()
{
// Init state
$connectFailures = 0;
$tcpRetry = self::TCP_BACKOFF / 2;
$httpRetry = self::HTTP_BACKOFF / 2;
// Keep trying until connected (or max connect failures exceeded)
do
{
// Check filter predicates for every connect (for filter method)
if ($this->method == self::METHOD_FILTER)
{
$this->checkFilterPredicates();
}
// Construct URL/HTTP bits
$url = self::URL_BASE . $this->method . '.' . $this->format;
$urlParts = parse_url($url);
$authCredentials = base64_encode($this->username . ':' . $this->password);
// Setup params appropriately
$requestParams = array('delimited' => 'length');
// Filter takes additional parameters
if ($this->method == self::METHOD_USER && count($this->trackWords) > 0)
{
$requestParams['track'] = implode(',', $this->trackWords);
}
if ($this->method == self::METHOD_USER && count($this->followIds) > 0)
{
$requestParams['follow'] = implode(',', $this->followIds);
}
// Debugging is useful
$this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '',
var_export($requestParams, TRUE)));
/**
* Open socket connection to make POST request. It'd be nice to use stream_context_create with the native
* HTTP transport but it hides/abstracts too many required bits (like HTTP error responses).
*/
$errNo = $errStr = NULL;
$scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://';
$port = ($urlParts['scheme'] == 'https') ? 443 : 80;
/**
* We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and
* PHP appears to cache it the result if in a long running process (as per Phirehose).
*/
$streamIPs = gethostbynamel($urlParts['host']);
if (empty($streamIPs))
{
throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"');
}
// Choose one randomly (if more than one)
$this->log('Resolved host ' . $urlParts['host'] . ' to ' . implode(', ', $streamIPs));
$streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))];
$this->log('Connecting to ' . $streamIP);
@$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout);
// No go - handle errors/backoff
if (!$this->conn || !is_resource($this->conn))
{
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $errNo;
$connectFailures++;
if ($connectFailures > $this->connectFailuresMax)
{
$msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle
}
// Increase retry/backoff up to max
$tcpRetry = ($tcpRetry < self::TCP_BACKOFF_MAX) ? $tcpRetry * 2 : self::TCP_BACKOFF_MAX;
$this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' .
$errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.');
sleep($tcpRetry);
continue;
}
// TCP connect OK, clear last error (if present)
$this->log('Connection established to ' . $streamIP);
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;
// If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment
stream_set_blocking($this->conn, 1);
// Encode request data
$postData = http_build_query($requestParams);
// Do it
fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n");
fwrite($this->conn, "Host: " . $urlParts['host'] . "\r\n");
fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n");
fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n");
fwrite($this->conn, "Accept: */*\r\n");
fwrite($this->conn, 'Authorization: Basic ' . $authCredentials . "\r\n");
fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n");
fwrite($this->conn, "\r\n");
fwrite($this->conn, $postData . "\r\n");
fwrite($this->conn, "\r\n");
// First line is response
list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3);
// Response buffers
$respHeaders = $respBody = '';
// Consume each header response line until we get to body
while ($hLine = trim(fgets($this->conn, 4096)))
{
$respHeaders .= $hLine;
}
// If we got a non-200 response, we need to backoff and retry
if ($httpCode != 200)
{
$connectFailures++;
// Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful)
while ($bLine = trim(fgets($this->conn, 4096)))
{
$respBody .= $bLine;
}
// Construct error
$errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')';
// Set last error state
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $httpCode;
// Have we exceeded maximum failures?
if ($connectFailures > $this->connectFailuresMax)
{
$msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle
}
// Increase retry/backoff up to max
$httpRetry = ($httpRetry < self::HTTP_BACKOFF_MAX) ? $httpRetry * 2 : self::HTTP_BACKOFF_MAX;
$this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' .
$errStr . '. Sleeping for ' . $httpRetry . ' seconds.');
sleep($httpRetry);
continue;
} // End if not http 200
// Loop until connected OK
}
while (!is_resource($this->conn) || $httpCode != 200);
// Connected OK, reset connect failures
$connectFailures = 0;
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;
// Switch to non-blocking to consume the stream (important)
stream_set_blocking($this->conn, 0);
// Connect always causes the filterChanged status to be cleared
$this->filterChanged = FALSE;
// Flush stream buffer & (re)assign fdrPool (for reconnect)
$this->fdrPool = array($this->conn);
$this->buff = '';
}
protected function connect_oauth()
{
// Init state
$connectFailures = 0;
$tcpRetry = self::TCP_BACKOFF / 2;
$httpRetry = self::HTTP_BACKOFF / 2;
// Keep trying until connected (or max connect failures exceeded)
do
{
// Check filter predicates for every connect (for filter method)
if ($this->method == self::METHOD_FILTER)
{
$this->checkFilterPredicates();
}
// Construct URL/HTTP bits
$url = self::URL_BASE . $this->method . '.' . $this->format;
$urlParts = parse_url($url);
$authCredentials = base64_encode($this->username . ':' . $this->password);
// Setup params appropriately
$requestParams = array('delimited' => 'length');
// Filter takes additional parameters
if (count($this->trackWords) > 0)
{
$requestParams['track'] = implode(',', $this->trackWords);
}
if (count($this->followIds) > 0)
{
$requestParams['follow'] = implode(',', $this->followIds);
}
// Debugging is useful
$this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '',
var_export($requestParams, TRUE)));
/**
* Open socket connection to make POST request. It'd be nice to use stream_context_create with the native
* HTTP transport but it hides/abstracts too many required bits (like HTTP error responses).
*/
$errNo = $errStr = NULL;
$scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://';
$port = ($urlParts['scheme'] == 'https') ? 443 : 80;
/**
* We must perform manual host resolution here as Twitter's IP regularly rotates (ie: DNS TTL of 60 seconds) and
* PHP appears to cache it the result if in a long running process (as per Phirehose).
*/
$streamIPs = gethostbynamel($urlParts['host']);
if (empty($streamIPs))
{
throw new PhirehoseNetworkException("Unable to resolve hostname: '" . $urlParts['host'] . '"');
}
// Choose one randomly (if more than one)
$this->log('Resolved host ' . $urlParts['host'] . ' to ' . implode(', ', $streamIPs));
$streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))];
$this->log('Connecting to ' . $streamIP);
@$this->conn = fsockopen($scheme . $streamIP, $port, $errNo, $errStr, $this->connectTimeout);
// No go - handle errors/backoff
if (!$this->conn || !is_resource($this->conn))
{
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $errNo;
$connectFailures++;
if ($connectFailures > $this->connectFailuresMax)
{
$msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle
}
// Increase retry/backoff up to max
$tcpRetry = ($tcpRetry < self::TCP_BACKOFF_MAX) ? $tcpRetry * 2 : self::TCP_BACKOFF_MAX;
$this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' .
$errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.');
sleep($tcpRetry);
continue;
}
// TCP connect OK, clear last error (if present)
$this->log('Connection established to ' . $streamIP);
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;
// If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment
stream_set_blocking($this->conn, 1);
// Encode request data
$postData = http_build_query($requestParams);
// Oauth tokens
$oauthHeader = $this->getOAuthHeader('POST', $url, $requestParams);
//die($oauthHeader);
// Do it
fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r\n");
fwrite($this->conn, "Host: " . $urlParts['host'] . ':' . $port . "\r\n");
fwrite($this->conn, "Content-type: application/x-www-form-urlencoded\r\n");
fwrite($this->conn, "Content-length: " . strlen($postData) . "\r\n");
fwrite($this->conn, "Accept: */*\r\n");
#fwrite($this->conn, 'Authorization: Basic ' . $authCredentials . "\r\n");
fwrite($this->conn, $oauthHeader . "\r\n");
fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r\n");
fwrite($this->conn, "\r\n");
fwrite($this->conn, $postData . "\r\n");
fwrite($this->conn, "\r\n");
$this->log("POST " . $urlParts['path'] . " HTTP/1.0");
$this->log("Host: " . $urlParts['host'] . ':' . $port);
$this->log("Content-type: application/x-www-form-urlencoded");
$this->log("Content-length: " . strlen($postData));
$this->log("Accept: */*");
#$this->log('Authorization: Basic ' . $authCredentials);
$this->log($oauthHeader);
$this->log('User-Agent: ' . self::USER_AGENT);
$this->log('');
$this->log($postData);
$this->log('');
// First line is response
list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3);
// Response buffers
$respHeaders = $respBody = '';
// Consume each header response line until we get to body
while ($hLine = trim(fgets($this->conn, 4096)))
{
$respHeaders .= $hLine;
}
// If we got a non-200 response, we need to backoff and retry
if ($httpCode != 200)
{
$connectFailures++;
// Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful)
while ($bLine = trim(fgets($this->conn, 4096)))
{
$respBody .= $bLine;
}
// Construct error
$errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')';
// Set last error state
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $httpCode;
// Have we exceeded maximum failures?
if ($connectFailures > $this->connectFailuresMax)
{
$msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle
}
// Increase retry/backoff up to max
$httpRetry = ($httpRetry < self::HTTP_BACKOFF_MAX) ? $httpRetry * 2 : self::HTTP_BACKOFF_MAX;
$this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' .
$errStr . '. Sleeping for ' . $httpRetry . ' seconds.');
sleep($httpRetry);
continue;
} // End if not http 200
// Loop until connected OK
}
while (!is_resource($this->conn) || $httpCode != 200);
// Connected OK, reset connect failures
$connectFailures = 0;
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;
// Switch to non-blocking to consume the stream (important)
stream_set_blocking($this->conn, 0);
// Connect always causes the filterChanged status to be cleared
$this->filterChanged = FALSE;
// Flush stream buffer & (re)assign fdrPool (for reconnect)
$this->fdrPool = array($this->conn);
$this->buff = '';
}
protected function prepareParameters($method = null, $url = null,
$params = null)
{
if (empty($method) || empty($url))
return false;
$oauth['oauth_consumer_key'] = TWITTER_CONSUMER_KEY;
$oauth['oauth_token'] = $this->username;
$oauth['oauth_nonce'] = md5(uniqid(rand(), true));
$oauth['oauth_timestamp'] = time();
$oauth['oauth_signature_method'] = 'HMAC-SHA1';
if (isset($params['oauth_verifier']))
{
$oauth['oauth_verifier'] = $params['oauth_verifier'];
unset($params['oauth_verifier']);
}
$oauth['oauth_version'] = '1.0';
// encode all oauth values
foreach ($oauth as $k => $v)
$oauth[$k] = $this->encode_rfc3986($v);
// encode all non '@' params
// keep sigParams for signature generation (exclude '@' params)
// rename '@key' to 'key'
$sigParams = array();
$hasFile = false;
if (is_array($params))
{
foreach ($params as $k => $v)
{
if (strncmp('@', $k, 1) !== 0)
{
$sigParams[$k] = $this->encode_rfc3986($v);
$params[$k] = $this->encode_rfc3986($v);
}
else
{
$params[substr($k, 1)] = $v;
unset($params[$k]);
$hasFile = true;
}
}
if ($hasFile === true)
$sigParams = array();
}
$sigParams = array_merge($oauth, (array) $sigParams);
// sorting
ksort($sigParams);
print_r($sigParams);
// signing
$oauth['oauth_signature'] = $this->encode_rfc3986($this->generateSignature($method, $url, $sigParams));
return array('request' => $params, 'oauth' => $oauth);
}
protected function encode_rfc3986($string)
{
return str_replace('+', ' ', str_replace('%7E', '~', rawurlencode(($string))));
}
protected function generateSignature($method = null, $url = null,
$params = null)
{
if (empty($method) || empty($url))
return false;
// concatenating and encode
$concat = '';
foreach ((array) $params as $key => $value)
$concat .= "{$key}={$value}&";
$concat = substr($concat, 0, -1);
$concatenatedParams = $this->encode_rfc3986($concat);
// normalize url
$urlParts = parse_url($url);
$scheme = strtolower($urlParts['scheme']);
$host = strtolower($urlParts['host']);
$port = isset($urlParts['port']) ? intval($urlParts['port']) : 0;
$retval = strtolower($scheme) . '://' . strtolower($host);
if (!empty($port) && (($scheme === 'http' && $port != 80) || ($scheme === 'https' && $port != 443)))
$retval .= ":{$port}";
$retval .= $urlParts['path'];
if (!empty($urlParts['query']))
$retval .= "?{$urlParts['query']}";
$normalizedUrl = $this->encode_rfc3986($retval);
$method = $this->encode_rfc3986($method); // don't need this but why not?
$signatureBaseString = "{$method}&{$normalizedUrl}&{$concatenatedParams}";
var_dump($signatureBaseString);
# sign the signature string
$key = $this->encode_rfc3986(TWITTER_CONSUMER_SECRET) . '&' . $this->encode_rfc3986($this->password);
return base64_encode(hash_hmac('sha1', $signatureBaseString, $key, true));
}
protected function getOAuthHeader($method, $url, $params = array())
{
//$params = $this->prepareParameters($method, $url);
//$oauthHeaders = $params['oauth'];
$accessToken = new Zend_Oauth_Token_Access();
$accessToken->setToken(TWITTER_CONSUMER_KEY)
->setTokenSecret(TWITTER_CONSUMER_SECRET);
$token = new Zend_Oauth_Token_Access();
$token->setToken($this->username)
->setTokenSecret($this->password);
$config = new Zend_Oauth_Config(array(
"consumerKey" => TWITTER_CONSUMER_KEY,
"consumerSecret" => TWITTER_CONSUMER_SECRET,
));
$config->setToken($token);
$oauthHeader = $accessToken->toHeader($url, $config, $params);
return "Authorization: " . $oauthHeader;
$urlParts = parse_url($url);
$oauth = 'Authorization: OAuth realm="", ';
foreach ($oauthHeaders as $name => $value)
{
$oauth .= "{$name}=\"{$value}\", ";
}
$oauth = substr($oauth, 0, -2);
return $oauth;
}
public function enqueueStatus($status)
{
// Your enqueue logic here!
}
protected function log($message)
{
echo $message . "\n";
ob_flush();
flush();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment