Skip to content

Instantly share code, notes, and snippets.

@billhathaway
Forked from aviflax/lambda.go
Last active April 13, 2022 02:07
Show Gist options
  • Save billhathaway/5e1c0642d548924fb97b to your computer and use it in GitHub Desktop.
Save billhathaway/5e1c0642d548924fb97b to your computer and use it in GitHub Desktop.
// Package lambda helps create workers that run in AWS’ Lambda service.
// The Lambda service is designed to run Node.js programs, so we have a very thin
// Node.js wrapper which is essentially an adapter between our Go worker programs
// and Lambda. Lambda runs our Node.js wrapper/adapter which then in turn runs
// our Go worker programs as child processes, communicating with them via stdio pipes
// and Unix exit codes.
//
// The interaction between the Lambda wrapper and our Go programs works like this:
//
// * The Node.js function is invoked by Lambda. Lambda passes an `event` parameter to
// the function which is an object containing, among other things, a `Records` key,
// the value of which is an array of objects, each of which has a key the key `kinesis`,
// which is an object which has the key `data` which is a base64-encoded Kinesis record.
// * The Node.js function then spawns a Go worker as a child process, encodes the `event`
// object to a JSON string, and sends the JSON string to the Go worker process via stdin.
// * The Go worker attempts to parse the JSON string, deserialize each Kinesis record from
// Base64 and Avro, and then process each record.
// * If the Go worker is able to successfully process every record in the event, it will
// write nothing to stdout or stderr and will exit with exit code 0. The Node.js function will
// then call `context.succeed()` with no parameters.
// * If the Go worker is unable to process any of the records in the event, or encounters any
// other fatal error (such as a deserialization error) it will write a description of the error
// along with any pertinent debugging information to stderr in a free-form text format and then
// exit with exit code 1. The Node.js function will then call `context.fail(error)` with the
// value of `error` being the text that the Go worker wrote to stderr.
//
// TBD:
// * Logging. Maybe we should just use stdout? Or... have the Go workers write to CloudTrail,
// or CloudWatch, or Logentries?
package lambda
import (
"encoding/base64"
"encoding/json"
"errors"
"io/ioutil"
"os"
"github.com/timehop/golog/log"
"github.com/timehop/streams/models/events/appopen"
)
type Record struct {
Kinesis struct {
Data string `json:"data"`
PartitionKey string `json:"partitionKey"`
SequenceNumber string `json:"sequenceNumber"`
} `json:"kinesis"`
}
type Event struct {
Records []Record
}
type Processor func(appopen.AppOpen, log.Logger) error
func ReadAndProcessEventThenExit(streamName string, processor Processor) {
logger := *log.NewWithID(streamName)
event, err := ReadEvent(logger)
if err != nil {
panic(err)
}
err = ProcessEvent(*event, processor, logger)
if err != nil {
os.Stderr.WriteString(err.Error())
os.Exit(1)
}
os.Exit(0) // TODO: send something to stdout? Log something?
}
func ReadEvent(logger log.Logger) (*Event, error) {
bytes, err := ioutil.ReadAll(os.Stdin) // TBD whether this makes sense...
if err != nil {
return nil, err
}
var event Event
err = json.Unmarshal(bytes, &event)
if err != nil {
return nil, err
}
if len(event.Records) == 0 {
return nil, errors.New("malformed event contains no records!")
}
logger.Debug("Read event from stdin", "event", event)
return &event, nil
}
func ProcessEvent(event Event, processor Processor, logger log.Logger) error {
var err error
for _, record := range event.Records {
if record.Kinesis.Data == "" {
logger.Error("record key 'data' is empty", "record", record)
return errors.New("record key 'data' is empty")
}
avrobytes, err := base64.StdEncoding.DecodeString(record.Kinesis.Data)
if err != nil {
return errors.New("could not decode data from Base64: " + err.Error())
}
appOpen, err := appopen.FromAvro(avrobytes)
if err != nil {
return err
}
// The main event
err = processor(*appOpen, logger)
if err != nil {
return err
}
}
return err
}
var child_process = require('child_process');
var prod_config = {
'child_path': './lastopens',
'env': {
'REDIS_URL': 'redis://INSERT_ACTUAL_PROD_HOST_HERE:6379',
'STATHAT_KEY': 'INSERT_ACTUAL_PROD_KEY_HERE',
'LOG_LEVEL': 'DEBUG'
}
}
// When running in prod, Lambda passes only 2 params, so we use the
// above prod config. But in test, our test harness passes in a
// test config as the third param.
exports.handler = function(event, context, test_config) {
var config = test_config || prod_config;
var options = {
env: config.env,
input: JSON.stringify(event)
}
console.log("Calling ", config.child_path, "with options:", options)
var result = child_process.spawnSync(config.child_path, [], options);
if (result.status !== 0) {
console.log(result.stdout.toString());
console.log("Child process exited with non-zero code:", result.status);
return context.fail(new Error(result.stderr.toString()));
}
context.succeed();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment