Skip to content

Instantly share code, notes, and snippets.

@arobson
Last active October 14, 2021 06:46
Show Gist options
  • Save arobson/8755566 to your computer and use it in GitHub Desktop.
Save arobson/8755566 to your computer and use it in GitHub Desktop.
Rabbit.MQ + Node.js Notes

Abstraction Suggestions

Summary: use good/established messaging patterns like Enterprise Integration Patterns. Don't make up your own. Don't expose transport implementation details to your application.

Broker

As much as possible, I prefer to hide Rabbit's implementation details from my application. In .Net we have a Broker abstraction that can communicate through a lot of different transports (rabbit just happens to be our preferred one). The broker allows us to expose a very simple API which is basically:

  • publish
  • request
  • start/stop subscription

The broker abstraction doesn't attempt to manage configuration specific or implementation specific details, it's just how the application interacts with any number of transports in order to send and receive messages.

Channels

We use a channel abstraction to hide transport implementation details and expose a consistent interface to the Broker. A channel is used to publish, but not to receive. Our Rabbit implementation has a configuration API that allows us to define exchanges and create channels in the broker for each one, but consuming code doesn't know what transport the channel is actually using.

Subscriptions

Our abstraction also has the idea of 'subscriptions' which again are transport agnostic. With our Rabbit lib, once we've defined a queue, we can start a subscription to it through the Broker API. If we changed the transports out, the only code that would change would be the queue declaration.

Message Dispatch

In order to decouple our handlers from our subscriptions, we have a message dispatcher which scans our assemblies to determine what message handlers deal with what incoming messages. Once messages start flowing in, the dispatcher handles certain infrastructure/plumbing concerns (like message deserialization) and things like mutual exclusion (if two messages would impact state and create a race condition, we prevent them from being handled in parallel).

Message Handlers

A message handler is dumb and short-lived. Its only job is to handle 1 message and then clean up any dependencies that were instantiated for the purpose of handling the message. We never invoke callbacks on something else with a lifecycle we don't manage. This might be largely part of the fact that we're in .Net, but for lots of reasons, it's good design to handle messages in a stateless manner.

We do have different classes of handlers that behave in different ways and provide different guarantees. For example, we have a handler that expects to have state hydrated for it and loaded from an external store for the scope of the message. We have another that uses event sourcing to not only load state, but recent events and replay them to get to a 'best known state' before processing the message. I'm over-simplifying, but the point is, you can make different kinds of handlers to process messages with different patterns.

Food for thought

Rabbit is awesome. AMQP is good stuff. That doesn't mean you might not need the ability to interact across different protocols/transports. If that time comes, do you want to re-write portions of your app? It's very nice being able to just swap out channel and subscription configuration outside the "crux" of the app and find that your application can still work across a different protocol with different guarantees/performance profile.

A Note About Serialization

In .Net, serialization is a huge performance bottleneck. Protocol buffers seem to be about the fastest inter-platform way to send messages, but protobuf is a pain to manage in dynamic languages, so we avoid it. That leaves us with JSON and even the best serializer in .Net is still slow enough that > 30% of our CPU time is spent serializing and deserializing messages. This adds quite a bit of time to message round-trip. :sad:

Use CorrelationIds!

The correlation id can indicate what high-level entity a message relates to. It's invaluable. You can also make use of different metadata fields or add one-off headers to the message before sending it. Use these, but do so transparently in your application code where possible. We have an envelope abstraction with some common fields. Based on the transport the envelope data gets sent different ways. Rabbit is by far the easiest one to add metadata to the publish.

Idempotence

You can't rule out double-delivery. Rabbit provides either "at least once" or "at most once" delivery. That said, message handlers need to behave such that if they got a message twice, they wouldn't do something stupid. (e.g. in a bank app, if a deposit message is delivered)

Challenges In Distributed Systems

This is getting into distributed systems. If only ONE instance of everything in the system exists, it's probably safe to skip this bit. There's a good chance that one day, you may want to run multiple instances of everything to get resilience and more throughput. It would be nice if suddenly the system's architecture wasn't invalid.

Isolation of state mutation / Mutual Exclusion / Invariant Guarantees ARE HARD

Hard problems are hard. Imagine the same node banking service has two running instances

this is just a well understood example, lol. Don't write bank apps in Node, k? Promise? Sweet.

Two withdrawal events happen back-to-back and due to your architecture, some edge case (like a partition), each of your services gets one of the events to process. How do you prevent one of the following nightmare scenarios:

No Two Phase Commit / Global or Record Locks

Ok, these aren't nightmare scenarios in CA/CP systems because you'd just use two phase commit with a global lock or a read lock on the record. The downside to these systems is they're much more likely to become unavailable OR degrade significantly because every node in your system relies on them. So for these examples, I get to pretend this is not an option.

1. Data Loss

If both messages landed at about the same time, each handler would read the balance record and get the same balance. Call it $100. Each would subtract their withdrawal amounts ($15 and $20 respectively) and then persist the balance back to the database. Last-write-wins, baby. Either our new balance is $85 or $80 (math is hard, but computering is more so).

2. Invariant Violations

What if the two withdrawals would result in an over-draft? What if the over-draft wasn't allowed? As above, if your two services are processing these commands in parallel, there's not a fool-proof way to prevent over-drafts.

Possible Solutions (read: trade-offs)

Routing

If you were using the account id as the correlation id and the consistent hash exchange, you could route messages for the same account to the same queue and then only allow one consumer per queue. This would prevent multiple services from receiving events for the same account. There are other challenges with this approach though :\ (there is no silver bullet)

Event Sourcing

Instead of writing the account balance back immediately, you publish a different event; something like "AmountWithdrawn". When the balance record is read, you would also load events that had taken place for that record since its creation and 'replay' them against the record. You still have to snapshot the record every now and then and there are challenges with that. You can read about CRDTs and event sourcing and other related data structures/approaches to learn more.

Compensating Actions

You could use something like event sourcing with compensating actions (i.e. detect the violation later and take necessary steps to correct the problem, lots and lots of systems that need availability do this). Consider that a lot of reservation systems allow over-booking and would rather max everything out and issue refunds/vouchers/apologies to folks who lose out rather than NEVER over-book but run a much less available system.

We don't take the same approach across the board with everything in our system. It's a case-by-case basis.

Rabbit + Node.JS Notes

Sorry if a lot of this is too basic. Feel free to ask more target questions and I can answer those or add to this. Hope it helps. If you're reading this and you see something awful/wrong/ignorant - please correct it, then everyone learns. Yay.

Disclaimer

It's been a bit since I worked with any of Node's amqp libs. I can't remember if every RabbitMQ feature is supported, check the docs for your lib of choice.

A Note About Performance

Node actually doesn't do so well with RabbitMQ relative to just about every other language I've used it with (C#/F#, Erlang, Python, Ruby).

All Node's RabbitMQ drivers I knew of were written in pure JS rather than C, which means no true parallelism. All communication being marshalled across the different multiplexed channels ends up stacking up behind one another in the event loop. Ruby & Python both have Rabbit lib's in C so that the GIL doesn't keep the protocol from clipping along quite nicely.

Don't get me wrong, it's not the end of the world, but the kind of perf I talked about in the CodeMash talk is out of reach. It's just a trade-off to be aware of.

Connections

One connection per virtualhost and a connection to a single broker even if there's a cluster. Build the broker abstraction to support multiple connections definitions so that it can fall back on should one connection fail.

Disconnects

For certain exchange/queue configurations, exchanges and queues can get lost during a disconnect. I suggest the abstraction capture all that config and replay it against the new connection transparent to the app (this includes bindings and subscriptions that were previously made as well).

Blocked Connections

In the event Rabbit gets over-whelmed, it can try to send a blocked notification event. This is important, because if messages are sent ignoring this kind of event, Rabbit can kick the bucket, drop the connection or just dump messages into the void. See http://www.rabbitmq.com/connection-blocked.html for details.

I have yet to get one of these, but YMMV, right? :)

Channels

Create and manage a channel for every exchange and every queue used. There are notifications and metadata that are channel specific and using a single channel to communicate with more than one exchange or queue is going to cause untold grief.

Make sure to handle various notifications Rabbit sends back to via channels. I'll cover some of those specifically under the exchange and queue notes.

Exchanges

Publisher Confirms & Back-Pressure

I like not losing messages. Rabbit has some nice features that help achieve "at least once delivery" and one of those is publisher confirms. A confirm is Rabbit saying, "thanks, the message hit the exchange as requested". It doesn't guarantee the message landed in a queue (exchanges can lead to no-where or their bindings might exclude a message).

In our app, I store the messages and metadata in a dictionary (ok, sorted set) keyed by sequence #. When the confirm comes through, I remove it. If I lose the connection, once everything is re-established, I re-publish those messages in the background (Rabbit assigns new sequence #s for each message). In the event we can't connect to any nodes, we write the unpublished messages out to file so we can see what 'got lost' vs wondering what made it or what didn't.

Use the rate of confirms to create back-pressure by not allowing more than X messages to be waiting for confirms. I do this in our .Net abstraction and it is configurable per exchange. Just keep in mind - there is a memory trade-off for the behavior I'm describing but for our needs, it's totally worth it.

Persistent Delivery

The publish is what determines if a message should hit disk or not by setting a persistent delivery flag. This causes all queues receiving the message to write the message on disk. If you don't want to lose a message even if the node dies, do this. If you need lower latency more than reliability, don't do this.

Food for thought

My understanding of Rabbit's implementation is that there aren't really performance overhead implications for an exchange, so having lots of them doesn't add over-head. Bindings are where the 'work' of handling a published message is going on (aside from queues) and so be cautious about throwing a ton of bindings on a single exchange. Instead, use exchange-to-exchange bindings to narrow down the number of messages that need to go through any given binding.

Queues

Don't. Poll.

I've seen a lot of polling the queue - asking Rabbit for a message at a time. This is the worst. It will release Zalgo the Pony and he will drink from the blood of your slain app. If I had to imagine a use case for polling, it would be, "I want everything to be super slow, as though little snails where carrying messages between my computers".

Queue Depth Limits

Unbound queues are very, very dangerous. In certain cases, for asynchronous work loads that don't have an SLA, you can take advantage of them, but they're still dangerous. You should consider your application's load and set reasonable limits. You'll need to handle dead-lettered messages that result from exceeding queue-depth and likely add this to your back-pressure strategies so that the message producers don't assume it's fine to endlessly pump a flood of messages to over-flowing queues.

Ack / Nack

Another optional feature to look at for durability is explicitly acknowledging or rejecting every message the consumer receives. When receiving a message from Rabbit, each message gets a deliveryTag (only valid for the current connection and channel) that gets used to acknowledge or reject the message. When ack/nack-ing, there's an option to acknowledge every message since the last ack or nack, or just a single delivery tag. Pro-tip: it's a lot of work, but tracking these tags and acknowledging batches of them will significantly improve how quickly messages can be consumed / processed. Other pro-tip: this is harder than it sounds for lots of reasons :)

QoS

When in Ack mode, consumers can limit Rabbit to only allow so many out-standing, un-ack'd messages. This is very important and I can't recommend it enough. Without it, Rabbit will throw everything it can at a consumer and if it should fall over, ALL of those messages have to go back into the queue. This can be a good thing, since a failing service doesn't take a ton of unprocessed work with it. But it can also be bad if, say, there are now 5,000 messages getting requeued on an already busy queue.

Food For Thought

Each Rabbit Queue is a single Erlang process. An Erlang process is a bit like a Node application; it's an event loop with its own mailbox and it can do exactly one thing at a time. This means, each queue, though it can have many consumers, can really only do so much work. With a LOT of messages and/or low round-trip latency is desirable (from publish to processed), stand up lots of queues and lots of consumers rather than having fewer queues and fewer consumers.

Also remember, if a queue depth is 0, Rabbit sends the message straight through to an available consumer. Keeping light work-loads per queue means much lower latency.

Super Geek Out

Erlang's scheduler is very advanced in that no single process (read queue) can monopolize a CPU core. It ensures that every process gets an equal share of execution time. The scheduler is also quite adept at utilizing all the available cores to do this so folks like putting Rabbit on fairly beefy machines because you can get great performance improvements. Just more reason to use more queues vs. fewer, especially if you have a decent server.

@arobson
Copy link
Author

arobson commented Feb 1, 2014

One day, I should organize this crap into a blog or something.

@muraiki
Copy link

muraiki commented Feb 6, 2015

Thank you for posting this!

@mikemoser
Copy link

Thanks man, good notes. It's so easy to get started but if you really want reliable systems without loosing messages it starts to bend the mind a bit.

@toddca
Copy link

toddca commented Jul 24, 2015

Do you have the c# implementation eluded to above available anywhere?

@cecemel
Copy link

cecemel commented Aug 5, 2016

thx!

@larister
Copy link

Really appreciate you taking the time to write these notes up @arobson; they've very useful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment