Skip to content

Instantly share code, notes, and snippets.

@unscriptable
Last active April 21, 2023 18:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save unscriptable/5fc29408bedfdceb545c69e22f1f0bc0 to your computer and use it in GitHub Desktop.
Save unscriptable/5fc29408bedfdceb545c69e22f1f0bc0 to your computer and use it in GitHub Desktop.
Most + redis + docker
node_modules
.DS_Store

Most + Redis + Docker

A suite of simple apps to illustrate how to use most to handle redis events.

This suite may be run using docker or node.js 10.x. If you have docker installed already or would like to try it, follow the docker instructions. Otherwise, you will need to be running redis locally. Follow the node instructions to install node and redis.

Install

Pre-requisites

  1. Install git
  2. Clone this repo:
git clone git@github.com:mostjs/core.git
cd examples/most-redis-docker

Assumptions

The remainder of this document assumes your working directory is most-redis-docker. You may have to modify some of the following directions if you chose another folder name.

This project uses environment variables. If you are using the windows command prompt, you may have to change some commands slightly to use env vars. For example, rather than type REDIS_MYCHANNEL=my_channel npm run consume, you would type set REDIS_MYCHANNEL=my_channel&&npm run consume.

Option 1: Docker

  1. Install docker. (You will need to create a DockerHub account.)
  2. Build the docker image for this project:
docker build -t most-redis .

Option 2: node.js

  1. Install node 10.x or the latest LTS version.
  2. Install the dependencies for this project:
npm install
  1. Install redis.

If you already have an instance of redis running, you may edit the docker- compose.yml file in the most-redis-docker folder. Change any values for REDIS_HOST and REDIS_PORT according to your redis instance

Run

When running this example, the publish service periodically sends JSON messages to redis, and the consume services receive the JSON messages and transform them. In your terminal(s), you should expect to see informational logs from each service as they start up and as they process the JSON messages.

Option 1: Docker

  1. From the most-redis-docker folder, create and run the docker-compose network defined in docker-compose.yml:
docker-compose up

Use Ctrl-C to stop the docker-compose network.

You may also use docker-compose up -d and docker-compose down to start and stop the docker-compose network in the background.

Option 2: node.js

  1. From the most-redis-docker folder:
REDIS_MYCHANNEL=my_channel npm run consume
  1. Open a new terminal or shell, and from the most-redis-docker folder type:
REDIS_MYCHANNEL=my_channel npm run consume
  1. Open a third terminal or shell, and from the most-redis-docker folder type:
REDIS_MYCHANNEL=my_channel npm run publish

Experiment!

Env vars

Both the consume and publish services accept env vars. If you are using Docker, these may be modified in the docker-compose.yml file. The following env vars are used:

  • REDIS_PORT: integer, default is 6379
  • REDIS_HOST: string, default is '127.0.0.1'
  • REDIS_CHANNEL: string, required
  • TIMEOUT: integer, default is 2500

Try these scenarios

  • Easy: Change the channel name for one or more of the services.
  • Easy: Change the timeout to 0 (there are two ways to do this). What happens?
  • Master: Handle some messages differently from others. Hint: try most.js's filter, skipWhile, or takeWhile in the consume service.
  • Master: Modify and configure the publish service to publish to 2 channels, then configure the consume services to each listen to one of those channels. Hint: accept a comma-separated list of channels via REDIS_CHANNEL?
  • Guru: Send invalid JSON from the publish service and use recoverWith in the consume service to handle it.
  • Guru: Create a selective forwarding service by re-publishing some of the messages from the consume service.
// Functions to handle app configuration.
const config
= exports.config
= env => {
const port = Number(env.REDIS_PORT || 6379)
const host = String(env.REDIS_HOST || '127.0.0.1')
const timeout = Number(env.TIMEOUT || 2500)
const channel = env.REDIS_CHANNEL
if (!channel) throw new Error(`REDIS_CHANNEL not specified.`)
return { port, host, timeout, channel }
}
// Composition plan to consume a redis stream
const redis = require('redis')
const { init: initRedisClient, subscribe } = require('./redis')
const { runEffects, map, tap } = require("@most/core")
const { createAdapter } = require("@most/adapter")
const { newDefaultScheduler } = require("@most/scheduler")
const { config } = require('./config')
// Create a logging abstraction. Could also be graylog, logstash, etc.
const { log, error } = console
const crash = err => { error(err); process.exit(1) }
// Read config from env vars.
const { host, port, timeout, channel } = config(process.env)
// Create and initialize a redis client.
const client
= initRedisClient(redis, { host, port, timeout }, error)
.catch(crash) // fail hard if we can't connect!
// Create an adapter to "push" events from redis.
const [ onMessage, messageStream ] = createAdapter()
// Create a most.js stream
const fromMessage
= map(message => JSON.parse(message).value)
const logValue
= tap(value => log('received value =', value))
const receiveNewValues
= fromMessage(logValue(messageStream))
// TODO: transform or something?
runEffects(receiveNewValues, newDefaultScheduler())
client
.then(subscribe(channel, onMessage))
.then(() => log(`Consumer listening on ${channel}.`))
.catch(crash)
version: '3.2'
networks:
default: null
services:
redis:
image: redis:5.0
ports:
- 6379:6379
consumer1:
image: most-redis
entrypoint:
- node
- consume
environment:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_CHANNEL: my_channel
volumes:
- $PWD:/app
depends_on:
- redis
consumer2:
image: most-redis
entrypoint:
- node
- consume
environment:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_CHANNEL: my_channel
volumes:
- $PWD:/app
depends_on:
- redis
publisher:
image: most-redis
entrypoint:
- node
- publish
environment:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_CHANNEL: my_channel
volumes:
- $PWD:/app
depends_on:
- redis
FROM node:10.12.0-jessie
# https://github.com/Yelp/dumb-init
RUN wget -O /usr/local/bin/dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.2/dumb-init_1.2.2_amd64
RUN chmod +x /usr/local/bin/dumb-init
RUN mkdir -p /app
WORKDIR /app
ADD . /app/
RUN npm install
# CMD ["/usr/local/bin/dumb-init", "npm", "run", "start"]
{
"name": "most-redis",
"version": "1.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
"@most/adapter": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/@most/adapter/-/adapter-1.0.0.tgz",
"integrity": "sha512-+ECkLCTgKqc+uFK7q/fp2elaZahNRS1Tjz0XAu5QOiMynxJt8qN8LR+fuSoRZdhzQ0/+4nWf44X/x2fWUKe8fQ==",
"requires": {
"@most/types": "^1.0.1"
}
},
"@most/core": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/@most/core/-/core-1.3.2.tgz",
"integrity": "sha512-oI2sjIJgyluzf7xWuGMl8qPo/gAAMtArEOLcIiML0QWetOySpgX8OMikrLjsy+K6x+gZm5rOpsITbLTbJo5Z9Q==",
"requires": {
"@most/disposable": "^1.2.1",
"@most/prelude": "^1.7.2",
"@most/scheduler": "^1.2.1",
"@most/types": "^1.0.1"
}
},
"@most/disposable": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/@most/disposable/-/disposable-1.2.1.tgz",
"integrity": "sha512-VQ5gGZd5VFqYyP3sfYmVbrb4MX7j7tqIQKpoATOz5loDXE/9Y/BU5SzD2pGhVtui62kUdK13UFSom4njtmQ1Lw==",
"requires": {
"@most/prelude": "^1.7.2",
"@most/types": "^1.0.1"
}
},
"@most/prelude": {
"version": "1.7.2",
"resolved": "https://registry.npmjs.org/@most/prelude/-/prelude-1.7.2.tgz",
"integrity": "sha512-GM5ec7+xpkuXiCMyzhyENgH/xZ8t0nAMDBY0QOsVVD6TrZYjJKUnW1eaI18HHX8W+COWMwWR9c0zoPiBp9+tUg=="
},
"@most/scheduler": {
"version": "1.2.2",
"resolved": "https://registry.npmjs.org/@most/scheduler/-/scheduler-1.2.2.tgz",
"integrity": "sha512-TcbdJMso8MBX9NO4/5LTitsp1qTKIWMHBXL3aaEqHsCSvHgJNwNOCEG9PrFrBhIoTkVgHVghP8KaoEVqqhxmmg==",
"requires": {
"@most/prelude": "^1.7.2",
"@most/types": "^1.0.1"
}
},
"@most/types": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/@most/types/-/types-1.0.1.tgz",
"integrity": "sha512-fCLkp1Yzp+Kamr00i1yG3GPhekOE150TSeoqUAy1cslmJ/a0+x7Y6GCwO26ke614sUbV9BdL0gTGCZJA4V1JUw=="
},
"double-ended-queue": {
"version": "2.1.0-0",
"resolved": "https://registry.npmjs.org/double-ended-queue/-/double-ended-queue-2.1.0-0.tgz",
"integrity": "sha1-ED01J/0xUo9AGIEwyEHv3XgmTlw="
},
"redis": {
"version": "2.8.0",
"resolved": "https://registry.npmjs.org/redis/-/redis-2.8.0.tgz",
"integrity": "sha512-M1OkonEQwtRmZv4tEWF2VgpG0JWJ8Fv1PhlgT5+B+uNq2cA3Rt1Yt/ryoR+vQNOQcIEgdCdfH0jr3bDpihAw1A==",
"requires": {
"double-ended-queue": "^2.1.0-0",
"redis-commands": "^1.2.0",
"redis-parser": "^2.6.0"
}
},
"redis-commands": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.4.0.tgz",
"integrity": "sha512-cu8EF+MtkwI4DLIT0x9P8qNTLFhQD4jLfxLR0cCNkeGzs87FN6879JOJwNQR/1zD7aSYNbU0hgsV9zGY71Itvw=="
},
"redis-parser": {
"version": "2.6.0",
"resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-2.6.0.tgz",
"integrity": "sha1-Uu0J2srBCPGmMcB+m2mUHnoZUEs="
}
}
}
{
"name": "most-redis",
"version": "1.0.0",
"description": "Coordinate redis events with most.js",
"scripts": {
"publisher": "node publish",
"consumer": "node consume"
},
"author": "@unscriptable",
"license": "MIT",
"dependencies": {
"@most/adapter": "^1.0.0",
"@most/core": "^1.3.2",
"@most/scheduler": "^1.2.2",
"redis": "^2.8.0"
},
"keywords": [
"most",
"redis",
"event",
"stream"
]
}
// Composition plan to publish to a redis stream
const redis = require('redis')
const { init: initRedisClient, publisher } = require('./redis')
const { runEffects, periodic, scan, map, tap } = require("@most/core")
const { newDefaultScheduler } = require("@most/scheduler")
const { config } = require('./config')
// Create a logging abstraction. Could also be graylog, logstash, etc.
const { log, error } = console
const crash = err => { error(err); process.exit(1) }
// Read config from env vars.
const { host, port, timeout, channel } = config(process.env)
// Create and initialize a redis client.
const client
= initRedisClient(redis, { host, port, timeout }, error)
.catch(crash) // fail hard if we can't connect!
// Create stream
const everyThreeSeconds
= periodic(3000)
const monotonicallyIncrease
= scan(b => b + 1)
const toJsonMessage
= map(value => JSON.stringify({ value }))
const logValue
= tap(value => log('sending value =', value))
const emitPeriodicMonotonicNumbers
= logValue(toJsonMessage(monotonicallyIncrease(0)(everyThreeSeconds)))
client
.then(publisher(channel))
.then(publish => tap(publish, emitPeriodicMonotonicNumbers))
.then(stream => runEffects(stream, newDefaultScheduler()))
.then(() => log(`Publisher publishing on ${channel}.`))
.catch(crash)
// Functions to work with redis clients.
// Create and initialize a redis client using the proided host, port, timeout,
// and onError handler.
const init
= exports.init
= ({ createClient }, { port, host, timeout }, onError) => {
// Create a promise for a redis client
const clientP
= new Promise(
(resolve, reject) => {
const client = createClient({ port, host })
client.on('ready', () => resolve(client))
client.on('error', reject)
}
)
// Create redis timeout detector (() => Promise)
const redisTimeout
= rejectAfter(timeout, `Redis failed to respond after ${timeout} msec!`)
return Promise.race([ clientP, redisTimeout() ])
.then(client => client.on('error', onError))
}
// Subscribe a client to a channel by name (string), sending received mesasges
// to the onMessage handler.
const subscribe
= exports.subscribe
= (channel, onMessage) => client => {
client.on('message', (_, message) => onMessage(message))
client.subscribe(channel)
return client
}
// Create a publish function for a redis client that sends messages on a
// sepcific channel by name (string).
const publisher
= exports.publisher
= channel => client => {
return message => client.publish(channel, message)
}
// Helper: reject after the given msec with an Error having the given message.
const rejectAfter
= (msec, message) => () =>
new Promise(
(_, reject) =>
setTimeout(() => reject(new Error(message)), msec)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment