Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@litch
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save litch/c3d4722ae836294a59b2 to your computer and use it in GitHub Desktop.
Save litch/c3d4722ae836294a59b2 to your computer and use it in GitHub Desktop.

Reading of EventStore events should be idempotent.

When a handler reads an event, it will do two things. It will manipulate its current view of application state (change balance, whatever), and then it will take actions.

As an example, we want a registration service that will read the “wallet-*” category event stream and whenever a wallet is created by something (and creates a 'WalletCreated' event), it will begin an on boarding process. The service will keep a list of wallets in its current state snapshot (which it will act based on), and it will also send commands to send a welcome text message to a customer. Here is an example code snippet for the event handler:

def !
  # Mutate internal state
  wallet = {id: id, phone_number: phone_number, type: 'Wallet', events: [this_event]}
  Registration::Repository::Wallet.save(wallet)

  # Do things with side effects
  Registration::Commands::SendWelcomeTextMessage.!(wallet)
end

While we want the registration service to re-read the wallet-* stream at will (on restart, say), we do not want it to issue the command to send a welcome text message on every restart.

One way to think about making sure that reading the same event multiple times would be to record every action that is taken in something else - let’s say a relational DB for familiarity.

The flow would be:

Receive Created Event -> Update internal state -> Check relational DB for sent text message -> Conditionally send text message

def !
  # Mutate internal state
  this_event = {type: 'WalletCreated'}
  wallet = {
    id: id,
    phone_number: phone_number,
    type: 'Wallet',
    events: [this_event]
  }
  Registration::Repository::Wallet.save(wallet)

  # Conditionally do things with side effects
  action_names = Registration::DataAccess::Postgresql::GetActionNamesForWallet.!("wallets-#{id}")
  unless actions_names.include?('WelcomeTextMessageSent')
    Registration::Commands::SendWelcomeTextMessage.!(wallet)
    Registration::DataAccess::Postgresql::RecordWalletAction(wallet, 'WelcomeTextMessageSent')
  end
end

(There is also the possibility that the command gets issued every time that event is read, and that that command receiver decides whether the action has been done before, but since conceptually it is the same (check to make sure an action with a side effect has not already been performed) we will leave that aside for now.)

It is assumed that the relational db will be authoritative and persistent across service restarts.

But wait a minute, aren’t the command issuances Events? Something we already have a fantastic tool for storing and managing? So let’s put in a requirement that every time we send a command, we record an event that we sent that command. And then we can see if that stream's events contain the fact that the command was sent:

def !
  # Mutate internal state
  this_event = {type: 'WalletCreated'}
  wallet = {
    id: id,
    phone_number: phone_number,
    type: 'Wallet',
    events: [this_event]
  }
  Registration::Repository::Wallet.save(wallet)

  # Conditionally do things with side effects
  Registration::DataAccess::EventStore::ReadStreamEvents.!("wallets-#{id}") do |all_events_for_this_wallet|
    unless all_events_for_this_wallet.include?('WelcomeTextMessageSent')
      Registration::Commands::SendWelcomeTextMessage.!(wallet)

      es_writer.!(type: 'WelcomeTextMessageSent', stream_name: "wallets-#{id}")
    end
  end
end

Ok, so now we are using the same data storage system to store our stream of state manipulations as well as our side effect-inducing actions. But now we seem to have a timing problem. We want to essentially look into the "future", but how far into the future should we look? I think that this has been my biggest stumbling block conceptually. Let's talk through this:

Let's say our system has been up for awhile, and has accumulated 1000 events. Then we restart it and want it to rebuild its state. So we start streaming the events from the EventStore into the handler and at event 5, it sees a WalletCreated event. So it mutates its internal state and now it's time to see whether it has already sent the text message. There are two places you could envision querying for that piece of information - either the internal state or all of the information in the EventStore. It obviously doesn't make sense to query the internal state of the system, so let's consider reading the EventStore. Aren't we already querying the EventStore?

No.

Right now we have one process that is streaming the events from the EventStore into our application. If we, during the course of processing some event go query the EventStore again for a different set of information (Tell me all the events that have occured on wallet-618), we do not need to then load those into the event handler or anything like that. We can just treate it conceptually the same way we would a query to PostgreSQL. So we ask the EventStore in its current state (not way back in time at event 5) whether it has seen any events in the stream that suggest that the contemplated action should not be taken.

Now, of course you are subject to race conditions (what if you have two consumers of the stream and they read the event at the some time, the sending text message happens at the some time), but they are the same race conditions you would face if you were storing the record of actions taken in any other storage system - you would need some kind of mutex/locking mechanism.

This gives us a way to look at now when considering what actions to take, while giving "replay-ability" and re-instantiation of current instance state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment