Skip to content

Instantly share code, notes, and snippets.

@KyleAMathews
Last active May 13, 2022 00:49
Show Gist options
  • Star 48 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save KyleAMathews/a5c30ef8afef565b8d22 to your computer and use it in GitHub Desktop.
Save KyleAMathews/a5c30ef8afef565b8d22 to your computer and use it in GitHub Desktop.
Using Kafka and a Samza-like node.js architecture

Disclaimer

I'm still very new to Kafka, eventsourcing, stream processing, etc. I'm in the middle of building my first production system with this stuff and am writing this at the request of a few folks on Twitter. So if you do have experience, please do me and anyone else reading this a favor by pointing out things I get wrong :)

Inspirations

Problems I'm trying to solve.

I'm the technical co-founder at RelateRocket. We're a fairly early stage startup based in SF building an algorithmic social proof product. We hook into marketing automation and CRM tools to auto-produce custom personalized pages. And we're planning on doing this on a massive scale, e.g. a company would create and email out 100,000 pages with custom relatable content.

So the systems I'm building need to a) handle this sort of scale and b) slice and dice the data dozens of ways for internal anaytics, customer-facing analytics, alerting, and for pushing data back to various other tools that our customers are using.

Event sourcing

So while studying this problem, I discovered event sourcing. I'd read The Log blog post a few years ago when it came out and was very attracted to the idea but didn't really connect the dots about how to turn that into a working system until discovering event sourcing.

Event sourcing has two very attractive properties. A) You don't throw data away. You just record everything you can think of and then figure out how to make use of it. This is very attractive to a analytics-heavy product + in a startup when you're really not sure upfront what data is useful and what's not. B) Seperating reads and writes. In school and in the few backend systems I've written (I've mostly done frontend work), I've never enjoyed designing database schemas. They always felt hacky and ungainly. I can now see that most of the ugliness came from the unnatural coupling of the read and write schemas. Writing events feels very natural. You just declare what happened. A user was updated. By this person. And this is what was changed. And that's the end of it. You don't have to awkwardly mutate a user object and perhaps if you're feeling ambitious, write to an audit log table who made the change. And for reads, as I'll get to in a bit, you have unlimited freedom to mold the raw event data into whatever form makes sense for your application. Which is very easy and rather fun actually.

Kafka

Somewhere around when I discovered event sourcing, I also discovered Kafka. Which I won't write much about as there's tons of info on the internet but Kafka is a beautiful piece of software. Highly performant, durable, replayable pub-sub. The perfect tool for so many data tasks.

Stream processing

So event sourcing is super cool but how to do you turn your low-level raw events into usable, queryable objects? Stream processing is the normal answer (there's also batch processing with say Hadoop but that's so 3 years ago).

Basically as new events flow through your system, you "process" them into some sort of higher-level form. E.g. a userCreate event is the start of a new user. A userUpdate event flows by and that's grabbed to update an existing user. A userLoggedIn event happens and we increment the times_logged_in field on the user.

For React.js peeps reading out there, this should sound exactly like the Flux architecture.

There's a variety of stream processing tools out there e.g. Spark and Samza. I've choosen (for now anyways) to forgo using those and instead, do stream processing with node.js. Those tools both sound great and we'll probably use them someday but they don't seem necessary given that we're still small-data not big-data and as the sole developer, I really need to limit the number of tools I'm using to keep the complexity of the product within bounds. So as I'm already using Node.js extensively, it seems appropriate to keep on using it.

What does Samza-like node.js code look like?

Samza, at its heart, is actually very simple. You expose a function (ignoring that Java makes you wrap functions in ugly classes) that's subscribed to a Kafka topic that Samza calls whenever there's a new message. You do something to the message and then generally re-emit the processed message onto a new topic.

So to my earlier example, a userCreated event comes in and you process that into the user schema and then publish that new object to the user topic. Another system that's responsible for responding to user information queries would then listen to that topic and use changes there to update its store.

This is how it'd look in node.js.

  var HighLevelProducer, KeyedMessage, Immutable, List, Map, fromJS, client, emit, fromJS, kafka, producer, users;
  {fromJS, List, Map} = require('immutable');
  _ = require('underscore');
  
    // Setup our Kafka consumer.
  {HighLevelConsumer, KeyedMessage} = kafka = require('kafka-node');
  client = new kafka.Client();
  consumer = new HighLevelConsumer(client);
  
  // Setup our Kafka producer.
  {HighLevelProducer, KeyedMessage} = kafka = require('kafka-node');
  client = new kafka.Client();
  producer = new HighLevelProducer(client);

  // Create user topic.
  producer.on('ready', function() {
    return producer.createTopics(['user'], false, function(err, data) {
      if (err) {
        return console.log(err);
      }
    });
  });

  users = Map();

  emit = function(user) {
    var message;
    if (user) {
      message = new KeyedMessage(user.get('id'), JSON.stringify(user));
      return producer.send([
        {
          topic: 'user',
          messages: message
        }
      ], function(err, data) {
        if (err) {
          return console.log(err);
        }
      });
    }
  };

  // Listen to new events.
  consumer = new Consumer(client, [
    {
      topic: 'events'
    }
  ], {
    groupId: 'user-aggregator'
  });
  
  consumer.on('message', function(event) {
    var e, failedLogins, logins;
    switch (event.event_type) {
      case "userCreated":
        users = users.set(event.entity_id, fromJS({
          id: event.entity_id,
          name: event.event.name,
          email: event.event.email,
          roles: event.event.roles,
          organization_id: event.event.organization_id,
          created_at: event.timestamp,
          updated_at: event.timestamp,
          logins: [],
          logins_failed: []
        }));
        return emit(users.get(event.entity_id));
      case "userUpdated":
        users = users.mergeDeepIn([event.entity_id], fromJS({
          name: event.event.name,
          email: event.event.email,
          roles: event.event.roles,
          updated_at: event.timestamp
        }));
        return emit(users.get(event.entity_id));
      case "userLoggedIn":
        e = _.extend(event.event, {
          timestamp: event.timestamp
        });
        logins = users.getIn([event.actor_id, 'logins']).push(e);
        users = users.setIn([event.actor_id, 'logins'], logins);
        return emit(users.get(event.entity_id));
      case "userFailedLogin":
        e = _.extend(event.event, {
          timestamp: event.timestamp
        });
        failedLogins = users.getIn([event.entity_id, 'logins_failed']).push(e);
        users = users.setIn([event.entity_id, 'logins_failed'], failedLogins);
        return emit(users.get(event.entity_id));
    }
  };

Some additional planned/in-progress/finished use cases

  • aggregate objects from raw events
  • real-time updates — it's easy to connect socket.io or some other websocket/push tech to the stream of new objects and push them to the dashboard. We have a acivity stream page working this way and will be adding more soon.
  • campaigns — We let you create campaigns, arbitary groupings of pages you're sending out. I'll be writing soon a campaign aggregator that watches for new page analytics related to a campaign and group that together to drive campaign-specific analytic dashboards.
  • integration with other tools. Each marketing/sales tool has there own way of viewing the world. To drive integration, I'll write a stream processor for each one that translates our events into updates understandable by that tool.
  • Enrichments — we record the IP address for everyone that visits one of our custom pages. A natural thing to do is lookup geo information on that IP to "enrich" the event.
@deanmcpherson
Copy link

Hey Kyle, love this. I recently stumbled on Kafka / Samza and was just starting to think about building out something like this as a proof of concept. How has it been treating you over the past few months? Also, I was thinking about how you would integrate something like this into a React / Flux based front end.

The part I can't quite wrap my head around is pushing all events through kafka for processing would make it difficult to know how to notify the client UI of the results of their actions. Does that make sense?

I suppose you could subscribe to a success / failure topic, but that seems a little verbose for failure messaging, or returning mutated results.

@KyleAMathews
Copy link
Author

@dean, things have been great. Haven't gone too far beyond this but things have been really stable so far.

On notifying users of results of their actions, you Kafka isn't actually used for processing the initial action or command from the user. Writes or actions or commands are handled by something else. That thing (whatever you want to call it) processes the command and reports directly back to the user when they're finished. After that, they send to Kafka a report on what happened, the event.

So a simple example, a user updating their profile picture. Their client tells the backend, "upgrade_profile_picture". This gets processed and saved and the client is told the process was successful. Then the user processing agent emits an event to Kafka "user_updated" with the new user information.

@KyleAMathews
Copy link
Author

This article is a good overview of a lot of these concepts http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/

@daveroberts
Copy link

Thanks for sharing! Can you follow up on this? It looks like a really solid foundation for an architecture. Just want to see if it has been working for you.

@mmmdreg
Copy link

mmmdreg commented Aug 12, 2016

@kyle:
re your September 9 comment:

Another way to think about it is like async action creators in redux.

  • The client hits an endpoint to request updating the profile picture.
  • Emit 'profile_update_requested'.
  • The profile updating system listens to the above, processes it and emits 'profile_updated' or 'profile_update_failed'.
  • The endpoint responds to these and responds to the client accordingly (e.g reject or resolve a promise on the same request, or directly notify via socket).

This would keep the events as the golden source and prevent inconsistent state. Updating the image first like your suggestion would make the events more like a post-event audit log.

@tony-kerz
Copy link

hi @KyleAMathews, i'm new to kafka, but also a node fan, wondering what your current outlook is on kafka plus node based on your recent experience? looks like you were originally using this library, are you still using it? if so, how did it work out? if not, what are you using now? thanks! tony...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment