Skip to content

Instantly share code, notes, and snippets.

@yukim
Created May 29, 2013 18:22
Show Gist options
  • Save yukim/5672508 to your computer and use it in GitHub Desktop.
Save yukim/5672508 to your computer and use it in GitHub Desktop.
CASSANDRA-5286 New Streaming API design doc

Apache Cassandra Streaming API(CASSANDRA-5286)

Design goal

  • Better control
    • One API for all (bootstrap, move, bulkload, repair...)
    • IN/OUT in one session
    • Event notification
  • Better performance
    • Pipelined stream
    • Persistent connection per host (CASSANDRA-4660)
  • Better reporting
    • Better logging/tracing
    • External tool support for progress tracking
    • More metrics

Highlight

StreamPlan

In order to stream and receive data, you have to create StreamPlan for each operation like repair(streaming part), bootstrap, bulkload, move, etc. Receiving/transferring files are grouped into the same stream session that is created per destination, and those stream sessions are associated to a stream plan.

Stream Plan for repair, bootstrap, SSTableloader, etc.
 |- Stream session with Endpoint 1
      |- Stream receiving tasks
      |- Stream transfer tasks
 |
 |- Stream session with Endpoint 2
 .
 .
 .

And nodetool netstats' output for repair will look like this:

$ nodetool netstats

Repair #xxxx-xxxx-xxxxxxxx (32%)
    /10.1.10.10
      Receiving
        Keyspace1-Standard1-1 1230/1230 bytes - 100%
        Keyspace1-Standard1-2 123/1230 bytes - 10%
        Keyspace1-Standard1-3 0/1230 bytes - 0%
      Sending
        ...

    /10.1.10.11
      Sending
        ...

Better throughput

Stream message and file exchange is pipelined on the same, persistent tcp connection.

Stream event support

Finer grained event notification. With JMX notification support, even external client can listen on event.

API

Public APIs

  • StreamPlan

    • Builder for building streaming plan(what to transfer, what to reqeust). Internally builds StreamSessions to interact with the other nodes and associates them with StreamResultFuture which asynchronously returns final StreamState.
  • StreamResultFuture

    • Represents future result of StreamPlan execution. Assigining StreamEventHandler to this, you are able to track the progress of streaming plan.
  • StreamState

    • State of streaming execution. You can get snapshot of in-progress streaming from StreamResultFuture#getCurrentState or final state as the return value of StreamResultFuture#get.
  • StreamManager

    • Manages all streaming progress
    • Provides various metrics through JMX including notification
  • StreamEventHandler

    • Listens on various stream events.

Basic API usage is as follows:

// Start building your streaming plan
StreamPlan bulkloadPlan = new StreamPlan("Bulkload");
// Add transfer files tasks for each destination
for (InetAddress remote : remoteTargets)
    bulkloadPlan.transferFiles(remote, ranges, sstables);
// Execute your plan
StreamResultFuture result = bulkloadPlan.execute();
try
{
    // ... and wait for streaming completes
    result.get();
    // all streaming success!
}
catch (Exception e)
{
    // some stream failed
}

Alternatively, StreamResultFuture implements guava's ListenableFuture<StreamState>, So you can use FutureCallback<StreamState> to capture stream success and failure.

Futures.addCallback(result, new FutureCallback<StreamState>()
{
    public void onSuccess(StreamState result)
    {
        // Yes, we did it!
    }

    public void onFailure(Throwable t)
    {
        // O_o something goes wrong
    }
});

You can add event listener to StreamResultFuture for stream events:

StreamResultFuture result = bulkloadPlan.execute();
result.addEventListener(new StreamEventHandler() {
    public void handleEvent(StreamEvent event)
    {
        // streaming completed
    }
});

Internal APIs:

  • StreamSession

    • Group of stream tasks (INs and/or OUTs) per destination
  • StreamTask

    • Represents each IN/OUT stream task
    • Each task MUST belong to one Stream session
    • StreamReceiveTask
      • execute method sends Stream request to destination, wait for reply,
    • StreamTransferTask
  • ConnectionHandler

    • Receives/sends streaming messages.

Message flow

When you execute a plan(StreamPlan#execute), StreamSessions are submitted to initiate streaming for each endpoint. Each stream session exchanges messages and files until it complets or fails.

  1. Establish stream connection (StreamInit)

    Each stream session sends StreamInit message to the other end. StreamInit message has Cassandra message header with stream flag on to tell the peer to start streaming procedure. Body part of StreamInit message contains stream plan ID and plan description. Once the peer receives the message, it creates the same StreamPlan and StreamSession from received message.

    After the connection is established, subsequent messaging and file transfer are done on this connection.

  2. Prepare for stream

    Once the initiator sent StreamInit and established the connection, the initiator sends Prepare message. Prepare message contains stream requests and stream summaries.

    Stream requests are used when the initiator is requesting the data for certain keyspace/columnfamilies and ranges.

    Stream summaries are used when the initiator is transferring files. Stream summary describes how many files the initiator is sending, total size of data for certain keyspace/collumnfamily.

    When the peer received Prepare message, the peer StreamSession prepares to transfer/receive files. If the initiator request data, then the peer send back Prepare message that contains stream summaries corresponding to received stream requests.

  3. File transfer

    When both endpoints finish prepare, they start streaming files. Each file transfer consists of file header that describes the content, and data to transfer. File header contains keyspace/columnfamily name, sections of the file, sequence number, and if compressed, compression information. Sequence number is used for retry.

  4. Retry

    When something bad happens during file transfer, retry message is sent with sequence number. When the sender received retry message, it queues the file to retry to be sent immediately.

  5. Session complete

    When the session completes transfer/receiving files, session sends complete message to the other end. Stream session is considered complete when it sends and received complete message.

  6. Session failure

    When something bad happens, the session tries to send session failure message to its peer. When received this message, the session terminates and marks itself as failed.

Events

StreamResultFuture emits StreamEvent at the following cases:

  • Stream session prepared(SESSION_PREPARED)

    Fired when stream session complete prepare receiving/sending files to tell event handler about number of files and total bytes receiving/sending.

  • Stream session complete(SESSION_COMPLETE)

    Fired when session compeleted.

  • Stream progress(FILE_PROGRESS)

    Fired when receiving/sending file progress.

To listen StreamEvent, implement StreamEventHandler and register handler to StreamResultFuture.

JMX support

JMX support to track streaming progress will be provided as previous version. Plus, new streaming provides JMX Notification will support so that external client can listen on file streaming progress(will be implemented using StreamEventHandler).

Configuration

Available configuration so far:

  • Streaming throttle

    Global, configured in yaml, configurable via JMX

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment