Skip to content

Instantly share code, notes, and snippets.

@jdaigle
Last active February 17, 2016 16:45
Show Gist options
  • Save jdaigle/58da80e951d9cd266a79 to your computer and use it in GitHub Desktop.
Save jdaigle/58da80e951d9cd266a79 to your computer and use it in GitHub Desktop.
AMQP Message Broker Architecture

Sending To Azure (receiver = azure)

SEND (ch=0) attach(name:e1099e60-f8df-4624-9ffd-2d7079c9f075
        ,handle:0
        ,role:False
        ,source:source()
        ,target:target(address:event_queue)
        ,initial-delivery-count:0)
RECV (ch=0) attach(name:e1099e60-f8df-4624-9ffd-2d7079c9f075
        ,handle:0
        ,role:True
        ,source:source()
        ,target:target(address:event_queue)
        ,max-message-size:262144)
RECV (ch=0) flow(next-in-id:4294967293
        ,in-window:5000
        ,next-out-id:1
        ,out-window:2048
        ,handle:0
        ,delivery-count:0
        ,link-credit:1000
        ,available:0
        ,echo:False)
SEND (ch=0) transfer(handle:0
        ,delivery-id:0
        ,delivery-tag:00000000
        ,message-format:0
        ,settled:False
        ,batchable:True) payload 53
RECV (ch=0) disposition(role:True
        ,first:0
        ,settled:True
        ,state:accepted())

Notes:

  • Azure has a max message size of 256 KB
  • Azure has a link credit of 1000. It sends a flow every 100 messages (1/10 of link credit).

Receiving From Azure= (sender = azure)

SEND (ch=0) attach(name:sessionful-receiver-link
        ,handle:0
        ,role:True
        ,source:source(address:event_queue)
        ,target:target())
SEND (ch=0) flow(next-in-id:0
        ,in-window:2048
        ,next-out-id:4294967293
        ,out-window:2048
        ,handle:0
        ,delivery-count:0
        ,link-credit:10
        ,drain:False)
RECV (ch=0) attach(name:sessionful-receiver-link
        ,handle:0
        ,role:False
        ,rcv-settle-mode:1
        ,source:source(address:event_queue)
        ,target:target()
        ,initial-delivery-count:0
        ,max-message-size:266240)
RECV (ch=0) transfer(handle:0
        ,delivery-id:0
        ,delivery-tag:7007EA896AF80F479377BF8207062B2D
        ,message-format:0
        ,more:False
        ,batchable:True) payload 177
(plus 9 more...)
SEND (ch=0) disposition(role:True
        ,first:0
        ,settled:True
        ,state:accepted())
(maybe a few more...)
SEND (ch=0) flow(next-in-id:11
        ,in-window:2048
        ,next-out-id:4294967293
        ,out-window:5000
        ,handle:0
        ,delivery-count:10
        ,link-credit:10
        ,drain:False)
(last of the dispositions...)
SEND (ch=0) flow(next-in-id:19
        ,in-window:2048
        ,next-out-id:4294967293
        ,out-window:5000
        ,handle:0
        ,delivery-count:18
        ,link-credit:10
        ,drain:False)

Notes:

  • Azure has desired rcv-settle-mode:1. Receiver only settles after sending dispositon and sender settles.

Let's assume we have a solid TCP listener. We'll use SocketAsyncEventArgs, and SslStream, and such. I like this discussion: http://stackoverflow.com/questions/869744/how-to-write-a-scalable-tcp-ip-based-server/895249.

Let's also assume for now we have a mechanisms which handles AMQP frames "connections", "sessions", and attaching "links". For now we're only going to concern ourselves with Transfer and Disposition frames. For reference, my AMQP digest: http://josephdaigle.me/2016/01/21/amqp.html

Once a socket is accepted, it should immediately begin receiving data up to the max frame size. Each received frame should be enqueued in a lock-free data structure. Like a message pump. Our server implementation will be single threaded. Data sent back over the socket is simply buffered and sent.

TCP -> AMQP Frame Decoder -> Queue -> Handler -> AMQP Frame Encoder -> TCP

We'll need an AMQP state machine that handles connections, sessions, links, etc. Reconnects, retransmissions, etc. This layer will translate AMQP frames into message broker commands.

Now, how do we implement our Message Broker queues?

Features to implement:

  1. TTL. A message expiration time will be computed based on the time of arrival at an intermediary. Messages that live longer than their expiration time will be discarded (or dead lettered). When trasmitted, the TTL SHOULD be recomputed.
  2. Delivery Count. The number of prior unsuccesful delivery attempts.
  3. Durable (boolean). If true, message should be persisted to disk before settling.

Delivery State Machine

[AVAILABLE] --- TRANSFER ---> [ACQUIRED] --- ACCEPTED/REJECTED ---> [ARCHIVED]
    ^---RELEASED/MODIFIED --------|

A message stays ACQUIRED until the transfer is settled. Expirery of the TTL may move message from AVAILABLE to ARCHIVED.

Delivery outcomes:

  • accepted: indicates successful processing at the receiver. Archive. Does not increment delivery-count.
  • rejected: indicates an invalid and unprocessable message. Includes error. Increments delivery-count. Archived (or dead letter).
  • released: indicates that the message was not (and will not be) processed. Does not increment delivery-count. Enqueue. Can occur via time-bound lock.
  • modified: indicates that the message was modified, but not processed. Enqueue.

We could implement queues as a linked-list with head/tail references. This could be rough on allocations, unless we pool our message objects. New messages are inserted at the tail. Messages are pulled from the head. We need to maintain a separate map of ACQUIRED messages. If an ACQUIRED message is RELEASED (either by the client, or by a timeout) then it needs to be inserted at the head of the queue instead of the tail. If an ACQUIRED message is MODIFIED (should I even allow that?) then it needs to be inserted at the tail of the queue.

I like the style of the AOF (append-only-file) transaction log that Redis implements. Essentially, every command successful command that results in the state being altered is written to the end of the file. The file could be flushed immediately (ensures durability) or occasionally (faster, but less durable). To prevent the file from growing too large, Redis can rewrite the AOF from the in memory representation, generating a file with the fewest commands necessary. To prevent "stopping the world", Redis uses a clever Linux trick of fork()ing the process, and relying on copy-on-write.

But I need some sort of data structure that will allow me to create snapshots to read from (and write out to disk).

If I imeplement the queue as a single-linked-list (head and tail node), then to "copy" the queue I simply need to copy the head/tail reference. I start at the head and work to the tail. The original queue can move the head/tail without affecting my read operations. This will require a "stop the world" type of operation, as I need to clone the state of all queues. But it should be fast enough not to be noticable. The messages in the linked list are immutable, so it's safe to clone/read.

Unfortunately linked-lists are awful in terms of allocations. The alternative is a circular buffer. However, the technique of making a copy becomes less trivial as I would need to copy the entire array including the values! Stopping the world to copy basically the entire memory footprint seems crazy. But, most queues should be empty or nearly empty. Stopping the world and reading everything active in memory might not be that bad...

Both have tradeoffs. But maybe there is a different way to create snapshots that don't require "stop the world". SQL Server uses checkpoints - a checkpoint causes all dirty pages to be flushed to disk along with tx log info. But I don't want to manage my own memory...

Server Architecture

Network Layer:  [ Sockets / IOCP / TLS ]
                    |          /\
                    |           |
                   \/           |
Protocol Layer: [ Connection / Session / Link ]
                    |          /\
                    |           |
                   \/           |
App Layer:      [ Handles Frames ]
[11:00.004] SEND AMQP 3 1 0 0
[11:00.009] SEND sasl-init(mechanism:PLAIN,initial-response:XXXXXX,hostname:jdaigle-test-amqp.servicebus.windows.net)
[11:00.023] RECV AMQP 3 1 0 0
[11:00.077] RECV sasl-mechanisms(sasl-server-mechanisms:[MSSBCBS,PLAIN,ANONYMOUS,EXTERNAL])
[11:00.498] RECV sasl-outcome(code:0,additional-data:57656C636F6D6521)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment