Skip to content

Instantly share code, notes, and snippets.

@mpj
Last active March 21, 2017 09:58
Show Gist options
  • Save mpj/cbd212275d95f8c401788524184b7103 to your computer and use it in GitHub Desktop.
Save mpj/cbd212275d95f8c401788524184b7103 to your computer and use it in GitHub Desktop.
Request pattern modeled on top of pubsub and cloud functions

Request pattern modeled on top of pubsub and cloud functions

Below we see a message that represents an request that is "in flight" so to speak. You can imagine that 'get-user' is a function that is responsible for fetching a user and then sending it as the response to a http-request that is pending on a rest service.

Earlier, get-user has fetched user1 and their friends from the database, but the database response does not contain the names of the friends, just their ids. Therefore, get-user has made a request to decorate-user that in turn will call get-name for each user.

{
  "receiver": "get-name",
  "message": {
    "user_id": "user4"
  },
  "waiting": {
    "receiver": "decorate-user",
    "state": {
      "user_id": "user1",
      "friends": {
        "user5": {},
        "user4": {},
        "user3": { "name": "wallace" },
        "user2": { "name": "katja" },
      },
      "waiting": {
        "receiver": "get-user",
        "state": {
          "http_request_id": "correlator-123"
        }
      }
    }
  }
}

As you see, a limitation of this pattern is that we cannot issue multiple asyncronous calls simultaneously. This is both a benefit and a drawback in the sense that it will make race conditions etc impossible, but it means that we will have to create some kind of adapters that perform multigets for us, as the frameworks itself is not allowed to.

So what does the actual 'get-name' receiver look like?

const db = require('mydatabase')
bus.impure.receive('get-name', (envelope, commands) => {
  db.get('names', envelope.message.user_id, name => commands.dispatch({
    receiver: envelope.message.waiting.receiver,
    state: envelope.message.waiting.state,
    response: {
      name: 'get-name',
      message: {
        user_id: envelope.message.user_id,
        name: name
      }
    },
  }))
})

The receiver above will fetch the name from a fictious database, and dispatch the fetched name back to the callback, i.e. 'decorate-user'. The above is very verbose, and shown in it's raw form in order to give you a sense of the internals and flow. However, if the bus provides a little bit of sugar using a "respond" helper, and with a little help of destructuring, we can make the exact same code quite terse:

const db = require('mydatabase')
bus.impure.receive('get-name', ({ user_id }, { respond }) =>
  db.get('names', user_id, name => respond({ user_id, name })))

...or with async/await:

const db = require('mydatabase')
bus.impure.receive('get-name', async ({ user_id }) => {
  const name = await db.get('names', user_id)
  return { user_id, name }
}

You'll notice that the stuff above are 'impure'. This is a special type of receiver that the system knows has some kind of side effect. It might be that it request data from the database, or that it does some credit card transaction. It is expected to return a promise that resolves to a bus envelope with a response. This separation allows us to know which receivers we can execute wildly thousands of times and look at input/output for automatic regression testing.

And below is the actual message that the receiver dispatches, which decorate-user will use to fill in user4 in it's state, and then dispatch another get-name message for user5.

{
  "receiver": "decorate-user",
  "responses": {
    "get-name": {
      "user_id": "user4",
      "name": "david"
    }
  },
  "state": {
    "user_id": "user1", 
    "friends": {
      "user5": {},
      "user4": {},
      "user3": { "name": "wallace" },
      "user2": { "name": "katja" },
    },
    "waiting": {
      "receiver": "get-user",
      "request_id": "correlator-123",
    }
  }
}
bus.receive('decorate-user', (message, commands) => {
  if (message.id) {
    message.state.user_id = message.id
  }
  if (message.responses['get-name']) {
    let { user_id, name } = message.responses['get-name']
    message.state.friends[user_id] = { name }
  }
  let friendIdLackingName = Object.keys(m.state.friends).find(id => !m.state.friends[id].name)
  
  return commands.dispatch({
    receiver: 'get-name',
    message: {
      id: friendIdLackingName
    },
    waiting: {
      receiver: 'decorate-user',
      state: message.state,
    }
  })
  
})

... a bit verbose, lets see what we can do..

bus.receive('decorate-user', ({ message: { id }, state, responses, request }) => {
  if (id) {
    state.user_id = id
  }
  if (responses['get-name']) {
    let { user_id, name } = responses['get-name']
    state.friends[user_id] = { name }
  }
  let friendIdLackingName = Object.keys(m.state.friends).find(id => !m.state.friends[id].name)
  // TODO respond if no friend lacking in name
  return request('get-name', { id: friendIdLackingName })
})

correlation ids

It should be noted here that get-name is modeled to send user_id as part of it's response, to provide correlator for decorate-user (otherwise it cannot know what get-name response corresponds to what user). It might be necessary for the bus to provide a mechanism to easily generate correlation id's for scenarios where they are not as "naturally occuring" as in this case.

So what's the point of all this?

Because all I/O of the receivers are now in the form of simple object messages that contain state+commands, it means that mocking is a thing of the past, and instead of manually writing unit tests from scratch, we can capture the input and output the code that we've verified works and use as tests. It also allows us to model request chains on top of a pub-sub/cloud functions architecture. Hypothetically, we could also sample the I/O of receivers in a database and use it as regression test by running the same input on new versions of the receivers to verify that we get the same output.

bus.sandbox('get-user', { id: 'user123' })

Running the above will create files that log the entire interaction, in a copy-paste format for testing:

sandbox-result.js

bus.given({
  receiver: 'get-user',
  message: { id: 'user1' }
  state: {},
}).requests({
  receiver: 'decorate-user',
  message: { id: 'user1' }
  state: { user_id: 'user1'}
})




bus
  .when
    .state({})
    .request('decorate-user', { id: 'user1' })
  .then
    .state({ user_id: '1' })
    .request('get-friends', { user_id: 'user1' })

bus
  .when('decorate-user')
    .state({
      "friends": {
        "user5": {},
        "user4": {},
        "user3": { "name": "wallace" },
        "user2": { "name": "katja" },
      }
    })
    .response({ user_id: 'user4', name: 'nina' })
  .then
    .state({
      "friends": {
        "user5": {},
        "user4": { "name": "nina" },
        "user3": { "name": "wallace" },
        "user2": { "name": "katja" },
      }
    })
    .request('get-name', { user_id: 'user5' })

Sampling and auto-regression

if (process.env.NODE_ENV === 'production') {
  bus.sample({
    'project': 'myapp',
    'version': require('package.json').version 
    'secret': '8217812usdh8fhaduidsjsdaidsau23ne2jh',
    'rate': 1/1000
  })
}

// will generate a regression report on the SaaS 
// sampler, will pull a stream of sampled messages from the sampler
// and push through the local bus, and upload any diffs. The sampler 
// interface will visualize the deltas.
bus.regress({
  'project': 'myapp',
  'report-name': 'mpjtest', // optional
  'secret': '8217812usdh8fhaduidsjsdaidsau23ne2jh',
})

// Future - 
// the sampler could compare versions of the app in production, to perform automatic regression

// the sampler might also do some automatic machine learning and grouping of
// input and output ("this field used to vary between 1-69999 but now has some values in an higher range")
// 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment