Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Using Kafka and a Samza-like node.js architecture

Disclaimer

I'm still very new to Kafka, eventsourcing, stream processing, etc. I'm in the middle of building my first production system with this stuff and am writing this at the request of a few folks on Twitter. So if you do have experience, please do me and anyone else reading this a favor by pointing out things I get wrong :)

Inspirations

Problems I'm trying to solve.

I'm the technical co-founder at RelateRocket. We're a fairly early stage startup based in SF building an algorithmic social proof product. We hook into marketing automation and CRM tools to auto-produce custom personalized pages. And we're planning on doing this on a massive scale, e.g. a company would create and email out 100,000 pages with custom relatable content.

So the systems I'm building need to a) handle this sort of scale and b) slice and dice the data dozens of ways for internal anaytics, customer-facing analytics, alerting, and for pushing data back to various other tools that our customers are using.

Event sourcing

So while studying this problem, I discovered event sourcing. I'd read The Log blog post a few years ago when it came out and was very attracted to the idea but didn't really connect the dots about how to turn that into a working system until discovering event sourcing.

Event sourcing has two very attractive properties. A) You don't throw data away. You just record everything you can think of and then figure out how to make use of it. This is very attractive to a analytics-heavy product + in a startup when you're really not sure upfront what data is useful and what's not. B) Seperating reads and writes. In school and in the few backend systems I've written (I've mostly done frontend work), I've never enjoyed designing database schemas. They always felt hacky and ungainly. I can now see that most of the ugliness came from the unnatural coupling of the read and write schemas. Writing events feels very natural. You just declare what happened. A user was updated. By this person. And this is what was changed. And that's the end of it. You don't have to awkwardly mutate a user object and perhaps if you're feeling ambitious, write to an audit log table who made the change. And for reads, as I'll get to in a bit, you have unlimited freedom to mold the raw event data into whatever form makes sense for your application. Which is very easy and rather fun actually.

Kafka

Somewhere around when I discovered event sourcing, I also discovered Kafka. Which I won't write much about as there's tons of info on the internet but Kafka is a beautiful piece of software. Highly performant, durable, replayable pub-sub. The perfect tool for so many data tasks.

Stream processing

So event sourcing is super cool but how to do you turn your low-level raw events into usable, queryable objects? Stream processing is the normal answer (there's also batch processing with say Hadoop but that's so 3 years ago).

Basically as new events flow through your system, you "process" them into some sort of higher-level form. E.g. a userCreate event is the start of a new user. A userUpdate event flows by and that's grabbed to update an existing user. A userLoggedIn event happens and we increment the times_logged_in field on the user.

For React.js peeps reading out there, this should sound exactly like the Flux architecture.

There's a variety of stream processing tools out there e.g. Spark and Samza. I've choosen (for now anyways) to forgo using those and instead, do stream processing with node.js. Those tools both sound great and we'll probably use them someday but they don't seem necessary given that we're still small-data not big-data and as the sole developer, I really need to limit the number of tools I'm using to keep the complexity of the product within bounds. So as I'm already using Node.js extensively, it seems appropriate to keep on using it.

What does Samza-like node.js code look like?

Samza, at its heart, is actually very simple. You expose a function (ignoring that Java makes you wrap functions in ugly classes) that's subscribed to a Kafka topic that Samza calls whenever there's a new message. You do something to the message and then generally re-emit the processed message onto a new topic.

So to my earlier example, a userCreated event comes in and you process that into the user schema and then publish that new object to the user topic. Another system that's responsible for responding to user information queries would then listen to that topic and use changes there to update its store.

This is how it'd look in node.js.

  var HighLevelProducer, KeyedMessage, Immutable, List, Map, fromJS, client, emit, fromJS, kafka, producer, users;
  {fromJS, List, Map} = require('immutable');
  _ = require('underscore');
  
    // Setup our Kafka consumer.
  {HighLevelConsumer, KeyedMessage} = kafka = require('kafka-node');
  client = new kafka.Client();
  consumer = new HighLevelConsumer(client);
  
  // Setup our Kafka producer.
  {HighLevelProducer, KeyedMessage} = kafka = require('kafka-node');
  client = new kafka.Client();
  producer = new HighLevelProducer(client);

  // Create user topic.
  producer.on('ready', function() {
    return producer.createTopics(['user'], false, function(err, data) {
      if (err) {
        return console.log(err);
      }
    });
  });

  users = Map();

  emit = function(user) {
    var message;
    if (user) {
      message = new KeyedMessage(user.get('id'), JSON.stringify(user));
      return producer.send([
        {
          topic: 'user',
          messages: message
        }
      ], function(err, data) {
        if (err) {
          return console.log(err);
        }
      });
    }
  };

  // Listen to new events.
  consumer = new Consumer(client, [
    {
      topic: 'events'
    }
  ], {
    groupId: 'user-aggregator'
  });
  
  consumer.on('message', function(event) {
    var e, failedLogins, logins;
    switch (event.event_type) {
      case "userCreated":
        users = users.set(event.entity_id, fromJS({
          id: event.entity_id,
          name: event.event.name,
          email: event.event.email,
          roles: event.event.roles,
          organization_id: event.event.organization_id,
          created_at: event.timestamp,
          updated_at: event.timestamp,
          logins: [],
          logins_failed: []
        }));
        return emit(users.get(event.entity_id));
      case "userUpdated":
        users = users.mergeDeepIn([event.entity_id], fromJS({
          name: event.event.name,
          email: event.event.email,
          roles: event.event.roles,
          updated_at: event.timestamp
        }));
        return emit(users.get(event.entity_id));
      case "userLoggedIn":
        e = _.extend(event.event, {
          timestamp: event.timestamp
        });
        logins = users.getIn([event.actor_id, 'logins']).push(e);
        users = users.setIn([event.actor_id, 'logins'], logins);
        return emit(users.get(event.entity_id));
      case "userFailedLogin":
        e = _.extend(event.event, {
          timestamp: event.timestamp
        });
        failedLogins = users.getIn([event.entity_id, 'logins_failed']).push(e);
        users = users.setIn([event.entity_id, 'logins_failed'], failedLogins);
        return emit(users.get(event.entity_id));
    }
  };

Some additional planned/in-progress/finished use cases

  • aggregate objects from raw events
  • real-time updates — it's easy to connect socket.io or some other websocket/push tech to the stream of new objects and push them to the dashboard. We have a acivity stream page working this way and will be adding more soon.
  • campaigns — We let you create campaigns, arbitary groupings of pages you're sending out. I'll be writing soon a campaign aggregator that watches for new page analytics related to a campaign and group that together to drive campaign-specific analytic dashboards.
  • integration with other tools. Each marketing/sales tool has there own way of viewing the world. To drive integration, I'll write a stream processor for each one that translates our events into updates understandable by that tool.
  • Enrichments — we record the IP address for everyone that visits one of our custom pages. A natural thing to do is lookup geo information on that IP to "enrich" the event.
@kevinrobinson
Copy link

kevinrobinson commented Jul 14, 2015

@KyleAMatthews Nice writeup! It's awesome you're writing about your work and sharing it out. :)

This is interesting stuff, and I think your points about getting up and running fast and really good framing. I had a few thoughts for you which might not be relevant for what you're trying to do, but I figured they're worth sharing and you can take them for what they're worth.

In my experience, there are a lot of tough problems around failure modes. Those might not be super important for what you're doing here, and console.log(err) might do the trick just fine. But by identifying this you can maybe more explicitly identify where you're taking on risk as you build it out, and can circle back to it if those seem like worthwhile improvements later.

Another area like that is state in the users map that's in-memory. If the process crashes, recovering that state quickly might be important rather than recomputing it or losing it (which is one of the key properties of Samza, using Kafka to store snapshots of the local state). Also, another benefit of established stream-processing frameworks are levers for tuning performance, with things like how you increase parallelism as the volume of data grows. It's not clear to me how well JS/node would support that (say, if the single process couldn't keep up), and distributing computation across a cluster is one of the main values of frameworks like Samza or Storm.

These points I'm bringing up might not apply to what you're building now, I just thought it would be helpful to share another perspective. All that being said, I think this is awesome writing and work, much respect for sharing and I'm excited to hear more. These kinds of analytics problems are fun stuff, and writing about these topics and making it easier for everyone to experiment with them is the best way for us all to make more awesome products. :)

@KyleAMathews
Copy link
Author

KyleAMathews commented Jul 14, 2015

Thanks Kevin!

I agree handling failures will become more and more of a problem as we grow. Which might be what push us eventually to use a "real" stream processing framework as they've done the nitty-gritty work there. But for now, moving fast by using familiar tools and avoiding the operational/learning overhead of adopting one of those matters far more.

On recovering from crashes—I'm actually doing the same thing as Samza—on startup I read in everything from the user topic to quickly rebuild the Users state. Also my data is small enough still (< 100k events) that I can reprocess all events in a matter of seconds :) Though obviously that won't last.

On scaling/parallelizing. It's actually pretty straightforward. You just bump up the number of partitions on a topic and create a new node.js process for each partition. E.g. we could create an individual partition for each of our larger customers and clump the smaller ones together.

And yeah, this stuff is super fun. Kafka gives me the same sort of feel that react.js does. Makes really complex / near-impossible feeling problems feel solvable or even straightforward.

Thanks for your perspective—I learn a lot from these sorts of discussions!

@kevinrobinson
Copy link

kevinrobinson commented Jul 14, 2015

Awesome, thanks for the reply too! This is super great stuff to be sharing, and I totally have the same feeling about working with Kafka. :)

It'll be awesome to see folks do more of this kind of work on node too, and make these kinds of approaches and tools more widely available for folks working that stack. I'm excited to see what you build with this, and I'm sure others will be too!

@deanmcpherson
Copy link

deanmcpherson commented Sep 6, 2015

Hey Kyle, love this. I recently stumbled on Kafka / Samza and was just starting to think about building out something like this as a proof of concept. How has it been treating you over the past few months? Also, I was thinking about how you would integrate something like this into a React / Flux based front end.

The part I can't quite wrap my head around is pushing all events through kafka for processing would make it difficult to know how to notify the client UI of the results of their actions. Does that make sense?

I suppose you could subscribe to a success / failure topic, but that seems a little verbose for failure messaging, or returning mutated results.

@KyleAMathews
Copy link
Author

KyleAMathews commented Sep 8, 2015

@dean, things have been great. Haven't gone too far beyond this but things have been really stable so far.

On notifying users of results of their actions, you Kafka isn't actually used for processing the initial action or command from the user. Writes or actions or commands are handled by something else. That thing (whatever you want to call it) processes the command and reports directly back to the user when they're finished. After that, they send to Kafka a report on what happened, the event.

So a simple example, a user updating their profile picture. Their client tells the backend, "upgrade_profile_picture". This gets processed and saved and the client is told the process was successful. Then the user processing agent emits an event to Kafka "user_updated" with the new user information.

@KyleAMathews
Copy link
Author

KyleAMathews commented Sep 8, 2015

This article is a good overview of a lot of these concepts http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/

@daveroberts
Copy link

daveroberts commented Oct 6, 2015

Thanks for sharing! Can you follow up on this? It looks like a really solid foundation for an architecture. Just want to see if it has been working for you.

@mmmdreg
Copy link

mmmdreg commented Aug 12, 2016

@kyle:
re your September 9 comment:

Another way to think about it is like async action creators in redux.

  • The client hits an endpoint to request updating the profile picture.
  • Emit 'profile_update_requested'.
  • The profile updating system listens to the above, processes it and emits 'profile_updated' or 'profile_update_failed'.
  • The endpoint responds to these and responds to the client accordingly (e.g reject or resolve a promise on the same request, or directly notify via socket).

This would keep the events as the golden source and prevent inconsistent state. Updating the image first like your suggestion would make the events more like a post-event audit log.

@tony-kerz
Copy link

tony-kerz commented Jan 16, 2017

hi @KyleAMathews, i'm new to kafka, but also a node fan, wondering what your current outlook is on kafka plus node based on your recent experience? looks like you were originally using this library, are you still using it? if so, how did it work out? if not, what are you using now? thanks! tony...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment