Skip to content

@nrk /MasterSlaveClusterInitializer.php
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
This is an example of a cluster (client-side sharding) of "virtual" nodes each one configured in a master/slave replication setup.
<?php
// In this example we create a cluster where each node is actually
// a virtual one composed of a set of Redis instances replicated in
// a master / slave configuration. Key partitioning is based on the
// alias assigned to each "virtual" cluster node, so the actual
// configuration of the master and its slaves can be changed at
// will without affecting distribution and thus avoiding rebalancing.
// This example requires at least Predis v0.8.3-dev.
require __DIR__.'/../../autoload.php';
require __DIR__.'/MasterSlaveWrapperConnection.php';
require __DIR__.'/MasterSlaveClusterInitializer.php';
$nodes = array(
'replica-01' => array(
'tcp://127.0.0.1:6381?alias=master',
'tcp://127.0.0.1:6382?alias=slave01',
),
'replica-02' => array(
'tcp://127.0.0.1:6391?alias=master',
'tcp://127.0.0.1:6392?alias=slave01',
),
);
$initializer = new Nrk\Predis\MasterSlaveClusterInitializer($nodes);
$client = new Predis\Client($initializer);
// Key `foo` is on `replica-02`, we use `SET` so the client switches to `master`.
$client->set('foo', 'bar');
$client->get('foo');
// Key `hoge` is on `replica-01`, we only use `GET` so client stays on slaves.
$client->get('hoge');
$cluster = $client->getConnection();
echo $cluster->getConnectionByKey('foo')->getCurrentConnection(), "\n"; // 127.0.0.1:6391
echo $cluster->getConnectionByKey('hoge')->getCurrentConnection(), "\n"; // 127.0.0.1:6382
<?php
namespace Nrk\Predis;
use Nrk\Predis\Connection\MasterSlaveWrapperConnection;
use Predis\Cluster\Distribution\HashRing;
use Predis\Connection\PredisCluster;
use Predis\Option\ClientOptions;
class MasterSlaveClusterInitializer
{
protected $parameters;
public function __construct(array $parameters)
{
$this->parameters = $parameters;
}
protected function createCluster()
{
$ring = new HashRing(HashRing::DEFAULT_REPLICAS, array($this, 'getConnectionHash'));
$cluster = new PredisCluster($ring);
return $cluster;
}
public function getConnectionHash(MasterSlaveWrapperConnection $connection)
{
return $connection->getAlias();
}
public function __invoke(ClientOptions $options)
{
$connections = $options->connections;
$cluster = $this->createCluster();
foreach ($this->parameters as $alias => $configuration) {
$replicationNode = $options->getDefault('replication');
foreach ($configuration as $nodecfg) {
$connection = $connections->create($nodecfg);
$replicationNode->add($connection);
}
$wrappedConnection = new MasterSlaveWrapperConnection($replicationNode, $alias);
$cluster->add($wrappedConnection);
}
return $cluster;
}
}
<?php
namespace Nrk\Predis\Connection;
use Predis\NotSupportedException;
use Predis\Connection\SingleConnectionInterface;
use Predis\Connection\ReplicationConnectionInterface;
use Predis\Command\CommandInterface;
class MasterSlaveWrapperConnection implements SingleConnectionInterface
{
protected $alias;
protected $connection;
public function __construct(ReplicationConnectionInterface $connection, $alias)
{
$this->alias = $alias;
$this->connection = $connection;
}
public function getAlias()
{
return $this->alias;
}
public function getConnection()
{
return $this->connection;
}
public function connect()
{
$this->connection->connect();
}
public function disconnect()
{
$this->connection->disconnect();
}
public function isConnected()
{
return $this->connection->isConnected();
}
public function writeCommand(CommandInterface $command)
{
$this->connection->writeCommand($command);
}
public function readResponse(CommandInterface $command)
{
return $this->connection->readResponse($command);
}
public function executeCommand(CommandInterface $command)
{
return $this->connection->executeCommand($command);
}
public function getCurrentConnection()
{
if (!$connection = $this->connection->getCurrent()) {
$connection = $this->connection->getMaster();
}
return $connection;
}
public function __toString()
{
return (string) $this->getCurrentConnection();
}
public function getResource()
{
throw new NotSupportedException(__METHOD__);
}
public function getParameters()
{
return $this->getCurrentConnection()->getParameters();
}
public function pushInitCommand(CommandInterface $command)
{
throw new NotSupportedException(__METHOD__);
}
public function read()
{
throw new NotSupportedException(__METHOD__);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.