secret
Last active

Main blog post

  • Download Gist
arch.md
Markdown

Evolution at SoundCloud

This is a story of how we adapted our architecture over time to adapt to our growth and scaling needs..

Scaling is a luxury problem and and surprisingly has more to do with organization than it does implementation. At each epoch of scale, usually in the orders of magnitude of number of users we were supporting starting in thousands and now reaching the hundreds of millions, we identified our bottlenecks and addressed them as simply as possible by introducing conceptual integration points in our infrastructure to divide and conquer each problem individually.

By identifying and extracting points of scale into smaller problems and having well defined integration points when the time arrived, we are able to grow organically.

Product conception

From day one, we had the simple need of getting each idea out of our heads and in front of eyeballs as quickly as possible. During this phase, we used a very simple setup:

Internet -> Web (Apache) -> App (Rails) -> Data (MySQL)

Apache was serving our image/style/behavior resources, and Rails backed by MySQL provided an environment where almost all of our product could be modeled, routed and rendered quickly. Most of our team understood this model and could work well together, delivering a product that is very similar to what we have today.

We consciously chose not to implement high availability at this point, knowing what it would take when that time hopefully arrived. At this point we left our private beta, revealing SoundCloud to the public.

Our primary cost optimization was for opportunity, and anything that got in the way of us developing the concepts behind SoundCloud were avoided. For example, when a new comment was posted, we blocked until all followers were notified knowing that we could make that asynchronous later.

In the early stages we were conscious to ensure we were not only building a product, but also a platform. Our Public API was developed alongside our website from the very beginning. We're now driving the website with the same API we were offering to 3rd party integrations.

Growing out of Apache

Apache served us well, but we were running Rails app servers on multiple hosts, and the routing and virtual host configuration in Apache was cumbersome to keep in sync between development and production.

The Web tier's primary responsibility is to manage and dispatch incoming web requests, as well as buffering outbound responses so to free up an application server for the next request as quickly as possible. This meant the better connection pooling and content based routing configuration we had, the stronger this tier would be.

At this point we replaced Apache with Nginx and reduced our web tier's configuration complexity, but our architecture didn't change.

Internet -> Web (nginx) -> App -> Data

Load distribution and a little queue theory

Nginx worked great, but as we were growing, we found that some workloads took significantly more time compared to others (in the order of hundreds of milliseconds).

When you're working on a slow request when a fast request arrives, the fast request will have to wait until the slow request finishes, called “head of the line blocking problem”. When we had multiple applications servers each with its own listen socket backlog, analogous to a grocery store, where you inevitably stand at one register and watch all the other registers move faster than your own.

Around 2008 when we first developed the architecture, concurrent request processing in Rails and ActiveRecord was fairly immature. Even though we felt confident that we could audit and prepare our code for concurrent request processing, we did not want to invest the time to audit our dependencies. So we stuck with the model of a single concurrency per application server process and ran multiple processes per host.

In Kendall's notation once we've sent a request to from the web server to the application server, the request processing can be modeled by a M/M/1 queue. The response time of such a queue depends on all prior requests, so if we drastically increase the average work time of one request the average response time also drastically increases.

Of course, the right thing to do is to make sure our work times are consistently low for any web request, but we are still in the period of optimizing for opportunity, so we decided to continue with product development and solve this problem with better request dispatching.

We looked at the Phusion passenger approach of using multiple child processes per host but felt that we'd could easily fill each child with long-running requests. This is like having many queues with a few workers on each queue, simulating concurrent request processing on a single listen socket.

This changed the queue model from M/M/1 to M/M/c where c is the number of child processes for every dispatched request. This is like the queue system found in a post office, or a "take a number, the next available worker will help you" kind of queue. This model reduces the response time by a factor of c for any job that was waiting in the queue which is better, but assuming we had 5 children, we would just be able to accept an average of 5 times as many slow requests. We were already seeing a factor of 10 growth in the upcoming months, and had limited capacity per host, so adding only 5 to 10 workers was not enough address the head of the line blocking problem.

We wanted a system that never queued, but if it did queue, the wait time in the queue was minimal. Taking the M/M/c model to the extreme, we asked ourselves "how can we make c as large as possible?"

To do this, we needed to make sure that a single Rails application server never received more than one request at a time. This ruled out TCP load balancing because TCP has no notion of an HTTP request/response. We also needed to make sure that if all application servers were busy, the request would be queued for the next available application server. This meant we must maintain complete statelessness between our servers. We had the latter, but didn't have former.

We added HAProxy into our infrastructure, configuring each backend with a maximum connection count of 1 and added our backend processes across all hosts, to get that wonderful M/M/c reduction in resident wait time by queuing the HTTP request until one of any of our backends on any host became available. HAProxy became our queuing load balancer that would buffer any backpressure by queuing requests from the application or dependent backend services so we could defer designing sophisticated queuing in other components in our request pipeline.

    Internet
       |
       v
      Web
       |
       v
    Queuing
  Load Balancer
   (HAProxy) ------> App -> Data

I heartily recommend Neil J. Gunther's work Analyzing Computer System Performance with Perl::PDQ to brush up on queue theory and strengthen your intuition on how to model and measure queuing systems from HTTP requests all the way down to your disk controllers.

Going asynchronous

One class of request that took a long time was the fan-out of notifications from social activity. For example, when you upload a sound to SoundCloud, everyone that follows you will be notified. For people with many followers, if we were to do this synchronously, the request times would exceed the tens of seconds. We needed to queue a job that would be handled later.

Around the same time we were considering how to manage our storage growth for sounds and images, and had chosen to offload storage to Amazon S3 keeping transcoding compute in Amazon EC2.

Coordinating these subsystems, we needed some middleware that would reliably queue, acknowledge and re-deliver job tickets on failure. We went through a few systems, but in the end settled on AMQP because of having a programmable topology, implemented by RabbitMQ.

To keep the same domain logic that we had in the website, we loaded up the Rails environment and built a lightweight dispatcher class that looked at queues modeled by a namespace that describes estimated work times. This created a priority system in our asynchronous workers without requiring adding the complexity of message priorities to the broker. Most of our queues for asynchronous work performed by the application are namespaced with either "interactive" (under 250ms work time) or "batch" (any work time). Other namespaces were used specific to each application.

Internet
   |
   v
  Web
   |
   v
  Load
 Balancer --> App -----> Data
               |
  Broker <-----+
(RabbitMQ)
    |
    +----> Transcode (EC2) <-> Storage (S3)
    |
    +----> App Workers

Caching

When we approached the hundreds of thousands user mark, we saw we were burning too much CPU in the application tier, mostly spent in the rendering engine and Ruby runtime.

Instead of introducing Memcached to alleviate IO contention in the database like most applications, we aggressively cached partial DOM fragments and full pages. This turned into an invalidation problem which we solved by maintaining the reverse index of cache keys that also needed invalidation on model changes in memcached.

Our highest volume request was one specific endpoint that was delivering data for the widget. We created a special route for that endpoint in nginx and added proxy caching to that stack, but wanted to generalize caching to the point where any end point could produce proper HTTP/1.1 cache control headers and would be treated well by an intermediary we control. Now our widget content is served entirely from our public API.

We added Memcached and much later Varnish to our stack to handle backend partially rendered template caching and mostly read-only API responses.

 Internet
    |
    v
HTTP Cache (Varnish)
    +
   Web
    |
    v
  Load
Balancer -> App ----> Data
             |
             +------> Object Cache (Memcached)
             |
Broker <-----+
   |
   +----> Transcode <-> Storage
   |
   +----> App Workers

Generalization

Our worker pools grew, handling more asynchronous tasks. The programming model was similar for all of them: take a domain model and schedule a continuation with that model state to be processed at a later state.

Generalizing this pattern, we leveraged the after-save hooks in ActiveRecord models in a way we call ModelBroadcast. The principle is that when the business domain changes, events are dropped on the AMQP bus with that change for any asynchronous client that is interested in that class of change. This technique of decoupling the write path from the readers enables the next evolution of growth by accommodating integrations we hadn’t foreseen.

after_create do |r|
  broker.publish("models", "create.#{r.class.name}",  r.attributes.to_json)
end

after_save do |r|
  broker.publish("models", "save.#{r.class.name}", r.changes.to_json)
end

after_destroy do |r|
  broker.publish("models", "destroy.#{r.class.name}", r.attributes.to_json)
end

This isn't perfect, but it added a much needed non-disruptive, generalized, out-of-app integration point in the course of a day.

Dashboard

Our most rapid data growth was the result of our Dashboard. The Dashboard is a personalized materialized index of activities inside of your social graph and the primary place to personalize your incoming sounds from the people you follow.

We have always had a storage and access problem with this component. Looking at the read and write paths separately, the read path needs to be optimized for sequential access per user over a time range. The write path needs to be optimized for random access where one event may affect millions of users’ indexes.

The solution required a system that could reorder writes from random to sequential and store in sequential format for read that could be grown to multiple hosts. Sorted string tables are a perfect fit for the persistence format, and add the promise of free partitioning and scaling in the mix, we chose Cassandra as the storage system for the Dashboard index.

The intermediary steps started with the model broadcast and used RabbitMQ as a queue for staged processing, in three major steps: fan-out, personalization, and serialization of foreign key references to our domain models.

  • Fan-out finds the areas of the social graph where an activity should propagate.

  • Personalization looks at the relationship between the originator and destination users as well as other signals to annotate or filter the index entry.

  • Serialization persists the index entry in Cassandra for later lookup and joining against our domain models for display or API representations.

 Internet
    |
    v
HTTP Cache
    +
   Web
    |
    v
  Load
Balancer -> App ----> Data
             |
             +------> Object Cache
             |
Broker <-----+
   |
   +----> Transcode <--> Storage
   |
   +----> App Workers
   |
   +----> Dashboard <--- Data
                  \      Replica (MySQL owned by App for graph/personalization)
                   \
                    +--> Index (Cassandra)

Search

Our search is conceptually a back-end service that exposes a subset of data store operations over an HTTP interface for queries. Updating of the index is handled similarly to the dashboard via ModelBroadcast with some enhancement from database replicas with index storage managed by Elastic Search.

 Internet
    |
    v
HTTP Cache
    +
   Web
    |
    v
  Load
Balancer -> App ----> Data
             |
             +------> Object Cache
             |
             +------> Search Query --> Index (Elastic Search)
             |
Broker <-----+
   |
   +----> Transcode <--> Storage
   |
   +----> App Workers
   |
   +----> Dashboard <--- Data Replica
   |              \----> Index
   |
   +----> Search  <----- Data Replica (MySQL)
          Indexer -----> Index (Elastic Search)

Notifications and Stats

To make sure users are properly notified when their dashboard updates, whether this is over iOS/Android push notifications, email or other social networks we simply added another stage in the Dashboard workflow that receives messages when a dashboard index is updated. Agents can get that completion event routed to their own AMQP queues via the message bus to initiate their own logic. Reliable messages at the completion of persistence is part of the eventual consistency we work with throughout our system.

Our statistics offered to logged in users at http://soundcloud.com/you/stats also integrates via the broker, but instead of using ModelBroadcast, we emit special domain events that are queued up in a log then rolled up into a separate database cluster for fast access across the various time ranges.

 Internet
    |
    v
HTTP Cache
    +
   Web
    |
    v
  Load
Balancer -> App ----> Data
             |
             +------> Object Cache
             |
             +------> Search Query <-- Index
             |
             +------> Stats <--------- Index (MySQL)
             |
Broker <-----+
   |
   +----> Transcode <--> Storage
   |
   +----> App Workers
   |
   +----> Dashboard <--- Data Replica
   |              \----> Index
   |
   +----> Search  <----- Data Replica
   |      Indexer -----> Index
   |
   +----> Notifications -------------> 3rd party (Apple/Android/Email/Facebook)
   |
   +----> Stats -------> Log (MySQL)
                          |
                        Stats
                         ETL
                          |
                          v
                        Index (MySQL)

What's next

We have established some clear integration points in the broker for asynchronous write paths and in the application for synchronous read and write paths to backend services.

Over time, the application server's codebase has collected both integration and functional responsibilities. As the product development settles, we have much more confidence now to decouple the function from the integration to be moved into backend services that can be consumed à la carte by not only the application but by other backend services, each with a private namespace in the persistence layer.

 Internet
    |
    v
HTTP Cache
    +
   Web
    |
    v
  Load
Balancer -> App ----> Data
             |
             +------> Object Cache
             |
             +------> Search Query <-- Index
             |
             +------> Stats <--------- Index
             |
             +------> (More Backend Services)
             |
Broker <-----+
   |
   +----> Transcode <--> Storage
   |
   +----> App Workers
   |
   +----> Dashboard <--- Data Replica
   |              \----> Index
   |
   +----> Search  <----- Data Replica
   |      Indexer -----> Index
   |
   +----> Notifications -------------> 3rd party
   |
   +----> Stats -------> Log
   |                      |
   |                     ETL
   |                      |
   |                    Index
   |
   +----> (More Asyncronous Features)

The way we develop SoundCloud is to identify the points of scale then isolate and optimize the read and write paths individually, in anticipation of the next magnitude of growth.

At the beginning of the product, our read and write scaling limitations were consumer eyeballs and developer hours. Today, we're engineering for the realities of limited IO, network and CPU. We have the integration points set up in our architecture, all ready for the continued evolution of SoundCloud!

broker.md
Markdown

Make you broker

They don't call an AMQP server a 'broker' without reason. It's such a critical part of our system that when this component is broke, it makes everything else more broker.

One of our single RabbitMQ servers has clocked in at well over 100,000 deliveries per second for bursty traffic and in some workflows, it hums along at around 25,000 deliveries per second. The brokered messaging model has enabled significant decoupling of components as well as a degree of reliability on application component failure from the at-least-once delivery guarantee.

But AMQP model is simple when you have a single server. This has lead us to create our own bottleneck as growing outside of a single broker process introduces significant complexity. We have reached that stage where we no longer can use a single broker.

We specifically and thankfully designed our systems not to rely on ordering guarantees that many brokers could offer. Because we handle ordering our operations on significant boundaries, we are able to remove this bottleneck and failure mode from our brokers.

To remove the bottleneck, the producers are written to connect and publish normally, but their connection is directed via LVS to one of N brokers. The consumers then make a connection to each of N brokers.

The same AMQP exchanges, bindings and queues exist on all brokers, declared in the same way all the consumers and all the producers. But each broker only needs to handle 1/N of all publishings and they all can be run with active/active availability. If a broker is taken down, producers will reestablish their connection and get balanced to a different broker, and consumers will try to reestablish their connections to the restored broker, not interrupting service.

                  Broker 1 +----> Consumer 1
                            \ /
Producer --LVS    Broker 2 --+--> Consumer 2
             \              / \
              *-> Broker N +----> Consumer N

The trade-off is that since each consumer is handling messages from N brokers, the minimum window size for a consumer is N messages. To get fair queuing, it is typical to tell the broker to only send 1 message at a time until the message is acknowledged. We can still tell each broker to send 1 message a time, but the consumer will still be multiplexing each broker's message with the other broker's messages for processing.

In practice, this isn't a problem when messages can be concurrently processed in the consumers while the same system architecture holds up just fine.

mothership.md
Markdown

The Mothership and Beyond

Looking back to where we are today, you can see that most everything passes through that 'Rails' box. So much so, we affectionately call this codebase the "mothership".

This codebase has the responsibilities of dispatching requests to application handlers, loading up domain models, rendering templates, rendering API representations, authenticating users, authenticating 3rd party applications, integrate with external services, handle state transitions for sound uploads, render HTML versions of newsletters, paginate and render search results and dashboards, aggregate and render the stats, manage preferences, delegate payments, record subscription history, manage visibility permissions, compile and serve our JavaScript and CSS, scale, crop and store uploaded images plus much more.

Rails provides a framework to integrate database backed content into coherent application, but it takes developer discipline to keep the concerns separate. As a result, we have coarse grained boundaries between our functional slices of SoundCloud, but to fully isolate the functions so we can grow, we are currently working on decoupling the domain of "SoundCloud" from the integration tools and conveniently flat (global) namespace that Rails provides.

To do this, we are preparing our infrastructure to support many more smaller platform agnostic functional slices, with simpler and well defined integration points.

The two sides of integration that need separation are on request routing and persistence.

Routing, integration, function, data

Simplifying the architecture, the incoming requests get routed to an integration layer, that combines functions specific to each domain from the data that each domain depends upon.

This means we can optimize each layer specifically for one of 4 roles, routing, integration, function or data. Back-end services are typically domain specific like the Dashboard or Search Index. Integration services are fundamentally transformations of many formats to one. Routing needs well defined keys and an easily programmed topology, while data need to accommodate read, write and volume patterns of the functions.

We've already started on simplifying the routing on the way into the integration layer, and have some examples like Stats and Dashboard that have isolated domain data into specific schemas and clusters, moving to something that is less Rails and more SoundCloud, like:

Routing        Integration       Function     Data

Internet
   |
   |     +---> Developer API
   |     |---> Explore           Search
   |     |---> Upload            Media
   v     |---> Signup            Payments
Routing -+---> Dashboard         Notifications
         |---> Mobile            Metrics
         |---> Next              Social
         |---> Widget            People
         |---> Delivery          Auth
         +---> Stats

Hi Sean,

maybe we should split the blog post into two parts as it could become way too long? What do you think?

Alex

The broker stuff looks good, but I have the following suggestions (space permitting!):

  • Quote 'Publish One, Subscribe Many' as the name of the topological pattern
  • Outline that messages will be persistent, so in the case of a failed node, we have more or less complete durability .. at the expense of delayed delivery

To follow on, I think the size of the piece we discussed doesn't need to be cut .. I'll simply elaborate more if/when I get around to an overview/post.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.