Skip to content

Instantly share code, notes, and snippets.

@biggert
Last active August 31, 2018 15:44
Show Gist options
  • Save biggert/83859d91e0895ceb345a3c36326c480d to your computer and use it in GitHub Desktop.
Save biggert/83859d91e0895ceb345a3c36326c480d to your computer and use it in GitHub Desktop.
audience-connect model(s)

Components

  • Task Scheduler
  • Router
  • Audience
    • Exports (via Exports-API)
    • Imports
  • Handlers
    • Start
    • (Step Handlers)
    • Update_Execution
  • Database Schema
    • Tasks
    • Executions

Data Model Flowbee

Task Definition Example

{
    "steps": {
        0: GetMembersFromAudienceExport,
        1: ImportMembersToAudience
    },
    "parameters": {
        0: {
            "static": {
                "account_id": 100,
                "group_id": 50
            }
        },
        1: {
            "static": {
                "field_map": [],
            },
            "dynamic": {
                "source_url": "0.s3_url"
            }
        }
    }
}

Task Execution Example

{
    "status": "p",
    "last_completed_step": 0,
    "outputs": {
        0: {
            "s3_url": "http://sarahstoryengineering.com/"
        }
    }
}

Task Scheduler

Very similar to what we have today with member-sync:
One-off (including scheduled one-off) tasks go through eda-scheduler
Recurring tasks are handled through our CW system built with member-sync
If a webhook implementation comes into play, we can deal with that in the Router

Note: We'd love to avoid repeating the CW system but we worked through LOTS of scenarios with eda-scheduler, spring scheduler, task steps, etc. and they all had problems (most solvable) but it looks like we can get started with what we have and get by with it. It is not scalable but it'll work for now; when it becomes a scale problem, it'll be a good thing (from a revenue POV) and we can adjust it then.

Router

Router is an API. It's primary function is routing messages for the flow of task execution steps. Ideally it will be dumb but, for expediency, it's gonna be a little smarter in the first slice.

Responsibilities:

  • HTTP (API)
    • Webhook start task (Conceptual, Do not implement now)
    • Get Recurring Tasks (IDs) for specified Hour (CW's start-syncs handler)
    • Start Task (Create Execution via start-sync)
      • Parameters
        • Task ID
      • Action
        • Creates execution in DB
        • Sends first Step message
    • Update Execution (via update_execution handler)
      • Parameters
        • Execution ID
        • Step status
        • Step output
      • Action
        • Update execution outputs/last_completed_step in DB
    • Audience Exports update execution (via audience-export-completed)
      • Parameters
        • Execution ID
        • Step status
        • Step output
      • Action
        • Update execution outputs/last_completed_step in DB
    • Audience Imports update execution (via audience-import-finished)
      • Parameters
        • Execution ID
        • Step status
        • Step output
      • Action
        • Update execution outputs/last_completed_step in DB
  • SQS
    • Send Task/Execution info to next Step Handler

Audience

In order to avoid having to build an infrastructure inside audience-connect to handle communication between audience exports/imports completion events (like we did in member-sync via the jobs tables), we enhance audience export/imports to accept a callback url. When the export/imports completion events occur, we will call that URL with the payload of the completion message body.

The callback URL passed to the export (via GetMembersFromAudienceExport) and the import (via ImportMembersToAudience) will include the Execution ID and current Step index. This will allow the Router to be able to update the specified execution for the current step with the output from the export/import.

Handlers

Step Handlers

This will be all the Step handlers (building blocks) that we add along the way. The general expection is that they'll need the following info:

  • Execution ID
  • Current Step Index
  • Parameters (Router builds this from Task Definition + Execution outputs)

When finished, the handler will send a new message to the Update_Execution with the following info:

  • Execution ID
  • Current Step Index
  • Output

Note: GetMembersFromAudienceExport and ImportMembersToAudience will not send a message to Update_Execution

Update_Execution (previously named Update_Task)

This handler will accept the information from the step handlers and communicate that information back to the Router via HTTP. This gives us a fault-tolerant system so we can update the execution out of band if needed.

Database Schema

CREATE SCHEMA connect;

CREATE TABLE connect.tasks (
    task_id SERIAL PRIMARY KEY,
    task_name TEXT NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    task_definition JSONB NOT NULL,
    integration_type INT NOT NULL,
);

CREATE TABLE connect.ref_integration_types (
     integration_type_id SERIAL PRIMARY KEY,
     integration_type_name VARCHAR(150) NOT NULL,
     integration_enabled BOOLEAN NOT NULL DEFAULT true
);

CREATE TABLE connect.task_runtimes (
    task_id INTEGER NOT NULL,
    runtime_hour INTEGER NOT NULL CHECK (runtime_hour >= 0 AND runtime_hour <= 23),
    PRIMARY KEY (task_id, runtime_hour)
);

CREATE TABLE connect.task_executions (
    execution_id BIGSERIAL PRIMARY KEY,
    task_id INTEGER NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMP WITH TIME ZONE,
    execution_status VARCHAR(1) NOT NULL  DEFAULT 'p',
    exception TEXT,
    last_completed_step INT,
    outputs JSONB
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment