Skip to content

Instantly share code, notes, and snippets.

@joyrexus
Last active March 30, 2023 19:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joyrexus/e4a7cc13364eabac4262eeed78a99816 to your computer and use it in GitHub Desktop.
Save joyrexus/e4a7cc13364eabac4262eeed78a99816 to your computer and use it in GitHub Desktop.
a quick overview of key services

This is a summary of and reference for a few of the various things I've worked on at Constellation.


New Logging Infra

Before getting into the new logging infrastructure, it's probably worth first reviewing our older logging infra.

Because of the scaling issues we encountered with the persistence and querying of large quantities of logging data in relational databases, we had to come up with a new approach.

The New Approach

The initial objective was to persist logs in S3 and query them with Athena, but exactly how we were going to land the data in S3 and whether we'd be able to effectively query the landed data were still open questions.

Our idea was to leverage a Kinesis Data Firehose delivery stream with dynamic partitioning enabled and have this stream subscribed to an SNS logging topic.

So, logging sources would publish to the SNS topic. These published log messages would then be sent to the delivery stream, buffered, and partitioned by environment, organization, logtype, year, month, day, and hour … then written in batch files to the appropriate S3 partition.

But we first needed to ensure this approach for delivering logging messages to S3 would work as expected.

We verified the basic approach with this spike. See the Key Findings in this comment for details.

Initial testing of query performance against realistic data samples was undertaken with this spike and further investigated in this follow-up spike.

Once the main resources (e.g., the SNS topic and kinesis delivery stream) for the logging stack were in place, we defined tables for each log type based on the conclusions and recommendations of this spike. See this comment for sample queries run against the newly defined tables.

Log Types

Here's the mapping of the log type names used in s3 to the name used in the UI search log interface and sql server logging tables:

  • LOGTYPEUI NAME (TABLE)
  • webManagement Portal (WebRequestLog), logs server-side actions taken by dsp-web
  • appRequests (AppRequestLog), logs server-side actions taken by dsp-app
  • agwTransactions (TransactionLog), logs interactions between dsp-agw and individual connectors
  • hubConnector Hub (ConnectorHubLog), logs connector calls mediated by the hub
  • connectorConnectors (ConnectorLog), logs generated from a connector
  • containerApplication Container (ContainerLog), logs activity in the CU app container

Logging of event flows

The web and app log type messages are generated by our platform logging utility.

This logger generates log messsages that each contain a set of “events”, giving a timeline of server-side events, viz., the set of correlated requests made across the dsp-web, dsp-agw, and dsp-apen web servers.

For example, the logs that have been going into the WebRequestLog table (and now going into s3 under the web log type partition) contain details relating to the server-side actions taken when dsp-web handles a request resulting from an action taken in the Management Portal.

Similarly, the logs that have been going into the AppRequestLog table (and now going into s3 under the app log type partition) contain details relating to the server-side actions taken when dsp-app handles a request resulting from an action taken in the app container.

Each event contains the following details:

  • action: what happened? (received request)
  • description: the actual log message (request received on agw)
  • detail: optional object with supplementary details ({})
  • caller: where is this getting logged from? (/home/ec2-user/agw/bin/helpers/routes/authorization.js:278:531)
  • server: server where log was generated? (dspagw)
  • success: boolean indicating success or failure of request (true)
  • time: time stamp of logged event (2023-02-17T16:38:41.520Z)
  • transactionId: internal id for tracing flow of requests (4cbda106-68ba-44ca-b23b-143cafe8f023-07772ba3-f87a-4408-8eba-05ccc476128c)
  • type: the log event type (request)

Dynamic Partitioning

For all log types, each message is now required to minimally contain the following fields, which are used for the dynamic partitioning when writing to s3:

  • organizationID — organization ID
  • logType — type of log message
  • environmentdev, hot, xat, etc.
  • year
  • month
  • day
  • hour

These required fields are added to each log type by the logging service. (Here's an example of the logging service getting added to dsp-agw.)


Group Assignment Service

This service enables Credit Unions to upload a CSV file with bulk updates to their members’ tile group assignments, adding (or removing) specific members to (from) the CU’s pre-defined tile groups.

The service was implemented as a fargate task. It's kicked off by a lambda that's triggered by s3 file uploads to a CU's data delivery bucket. The service's resources are provisioned by running a cloud formation template found in the constellation_infrastructure repo.

The main logic for the service can be found here, which consists in ...

  • fetching and validating the CU's uploaded s3 file to get individual member assignment changes
  • making the set of tile group assignment updates that need to be done
  • processing the set of tile group assignment updates that need to be done
  • saving a summary report of the updates executed

Note that when we create the set of tile group assignment updates, we check to see if a member being added to or removed from a group is already enrolled. If not, we first do a partial enrollment.


Realtime Event Service

This service provides realtime notifications of key platform events to connected clients. The service exposes a websocket endpoint that clients can connect to and use to subscribe to various topics (event types).

Background

For essential background and context, see this discovery recording, where a proof-of-concept for the service was first discussed and demo-ed. In this this follow-up discovery session we discussed the steps required for implementation.

Most of the work done for this service was tracked as part of this epic.

For some background on the use of EventBridge for use as an event bus, see this spike.

Implementation

An initial proof-of-concept for the service was implemented in this repo.

This was copied over into a new repo (realtime-service) and incrementally updated, now reflecting what we have in production today. See the README for an overview of the service, along with key details on how to use and test it.

However, in order to standardized deployment, it was thought wise to setup dedicated repositories for each of the respective lambdas comprising the service. The overall provisioning of the service is now controlled via this cloudformation stack.

Here are the new repos for the lambda's making up the service (mirroring what's in the unified repo):

Note that while lambda's exist for handling subscribe and unsubscribe actions via the websocket connection, there was a desire to have a more conventional API for subscribing (or unsubscribing) an end user to (or from) a topic. To support this, subscribe and unsubscribe routes were exposed in dsp-agw. The relevant controller can be found here. These endpoints update the subscription records directly in the dynamodb table that the service uses for tracking connections and subscriptions.

This diagram provides a nice overview of the service's high-level architecture.

The definition of the service's connections table is key for understanding the service's data model.

Events

The service's relaymessage lambda is triggered when any of the following events are sent by the platform to the realtime event bus:

  • account_balanceupdated – an account's balance was updated
  • account_nicknameupdated – the nickname of an account was updated
  • account_transactionadded – a transaction was added to an account

When handling these events, the lambda fetches any end user sessions with connections that have subscribed to the given event topic, then relays the event to each subscriber.

There is also one unique event that connectors can send to the realtime event bus directly (viz. platform_account_transactionadded) to indicate that a transaction or transactions have been added to the CU's core for a given set of accounts.

The CDP Transfer connector currently sends this event to the event bus when handling a transfer. Payrailz will also be sending this event in the future.

When this event is sent to the realtime event buse it triggers and is handled by the realtime-platformtransactionadded.

This event results in our attempting to fetch any new transactions for the given account(s) and loading these transactions into our transaction warehouse.

Note that after an account's transactions have been loaded, the load-transaction-database lambda will send out an account_transactionadded event to the realtime event bus.

Auth

How are we securing the websocket endpoints that the realtime service exposes for setting up connections and getting notifications?

The overall auth strategy for the service is covered here, but the key thing to know is that in order to establish a realtime websocket connection, you need an end user session token, which is typically provided to the container app after a user logs in.

Testing

The Testing section in the README describes how to interactively test the realtime notification service from the CLI

However, as indicated in the auth section above, you need an end user session token in order to establish a connection.

The section How to generate a session token in the README describes how you can do just that ... viz., generate a session entry in the end user session cache and get the corresponding session token independently of a container login.

WebSocket Connection URIs

Here are the websocket connection URIs for the service for each of our test and production environments:

  • DEV – wss://dev.cdp-realtime.com
  • QA – wss://qa.cdp-realtime.com
  • HOT – wss://hot.cdp-realtime.com
  • XAT – wss://hot.cdp-realtime.com
  • CAT – wss://cat.cdp-realtime.com
  • PROD – wss://prod.cdp-realtime.com
  • DEV ADP – wss://devadp.cdp-realtime.com
  • QA ADP – wss://qaadp.cdp-realtime.com
  • HOT ADP – wss://hotadp.cdp-realtime.com
  • XAT ADP – wss://xatadp.cdp-realtime.com
  • CAT ADP – wss://catadp.cdp-realtime.com
  • PROD ADP: wss://prodadp.cdp-realtime.com

Transaction Ingestion

See this section of our transaction warehouse guide for an overview of transaction ingestion.

A refactor of the member-transaction-chunker lambda is badly needed.

The chunker performs the following steps when a CU uploads a CSV file with nightly transactions to their data-delivery bucket:

  • fetch a csv file with transaction records from s3
  • parse the csv file line by line
  • validate each line/transaction
  • convert of each line/transaction to JSON representation
  • save a "chunk" of converted transactions to s3 (for subsequent loading step)

A partial refactor can be found here. The processing logic in this refactored lambda is very straightforward.

Compare with the main processing logic for the existing chunker.


Transaction Cleansing

See this section of our transaction warehouse guide for an overview of transaction cleansing.

The main repo for the cleanser can be found here and the main logic for the cleanser can be found here.

This script demonstrates how you can send requests to the Segmint API.


Transaction Paging

This PR for dsp-transactions provides an initial implementation of paging for getTransactions requests.

You'll want to focus on the code starting here.

The primary aim was to return a non-null paging token on getTransactions requests when additional transactions are available but couldn’t be included because of dynamodb limits on query response sizes.

The secondary aim was to put in place the logic for getting subsequent “pages” of transactions when the paging token is included in subsequent getTransactions requests.

Note that the paging token sent and received is a stringified copy of DynamoDB’s LastEvaluatedKey/ExclusiveStartKey with a base64url encoding to make it safe for passing as a query string parameter on subsequent requests.

Note that we're utilizing a particular version of electrodb for querying the dynamodb tables used to persist transactions. However, the current version of ElectroDB handles the serialization/deserialization of paging tokens automagically. (In electro terms, the paging token is known as a cursor.) So some of SerDe logic added in this PR is only necessary until we upgrade.

See this comment for thoughts on how the getTransactions call could keep it's existing call signature, minimizing the amount of work required to implement paging in the front-end.

See this comment for a summary of the sorting and filtering of transactions that get returned by a getTransactions call ... and the various implications/caveats for paging. In particular, we won’t be able to simply rely on dynamodb’s LIMIT parameter when issuing queries to get back records with a fixed page size.


URL Shortening Service

This service provides a simple api endpoint for generating a shortened uri.

Note the example client here and example script demonstrating usage.

Here's an example of the service being used in the SMS notifier.

Misc

  • Links currently expire in 10 days, after which a 403 response is returned. We can increase the expiry if needed.

  • The service does support org-prefixed urls (i.e., vanity subdomains) like tcu.cdp-i.com.

  • Various questions about the service were answered in this spike.

  • We looked into how the service could be utilized when processing transfer notifications in this spike


Utility Scripts

I've created various utility scripts for one off tasks, mostly for creating "mock" transactions in our transaction warehouse for testing purposes.

When appropriate, I've tried to encapsulate these scripts as snippets, with READMEs providing descriptions and usage notes.

These scripts can be re-used with slight tweaks to perform similar tasks. They also demonstrate how we use the electrodb models for interacting with our dynamodb tables.


Call Connector Lambda

The call-connector lambda is used to make designated connector calls on a scheduled basis.

The lambda is the target of a set of EventBridge rules that specify a schedule for invoking the lambda, passing as the event payload the relevant connector details (name, version, method) and parameters for the call.

The lambda merely proxies these connector calls when invoked by making the appropriate HTTP request (given the provided connector call details) to the Connector Hub.

Note that apart from running scheduled connector jobs, the call-connector lambda could potentially be used by other internal services to call a connector when triggered by a particular event.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment