- Task Scheduler
- Router
- Audience
- Exports (via Exports-API)
- Imports
- Handlers
- Start
- (Step Handlers)
- Update_Execution
- Database Schema
- Tasks
- Executions
{
"steps": {
0: GetMembersFromAudienceExport,
1: ImportMembersToAudience
},
"parameters": {
0: {
"static": {
"account_id": 100,
"group_id": 50
}
},
1: {
"static": {
"field_map": [],
},
"dynamic": {
"source_url": "0.s3_url"
}
}
}
}
{
"status": "p",
"last_completed_step": 0,
"outputs": {
0: {
"s3_url": "http://sarahstoryengineering.com/"
}
}
}
Very similar to what we have today with member-sync:
One-off (including scheduled one-off) tasks go through eda-scheduler
Recurring tasks are handled through our CW system built with member-sync
If a webhook implementation comes into play, we can deal with that in the Router
Note: We'd love to avoid repeating the CW system but we worked through LOTS of scenarios with eda-scheduler, spring scheduler, task steps, etc. and they all had problems (most solvable) but it looks like we can get started with what we have and get by with it. It is not scalable but it'll work for now; when it becomes a scale problem, it'll be a good thing (from a revenue POV) and we can adjust it then.
Router is an API. It's primary function is routing messages for the flow of task execution steps. Ideally it will be dumb but, for expediency, it's gonna be a little smarter in the first slice.
Responsibilities:
- HTTP (API)
- Webhook start task (Conceptual, Do not implement now)
- Get Recurring Tasks (IDs) for specified Hour (CW's start-syncs handler)
- Start Task (Create Execution via start-sync)
- Parameters
- Task ID
- Action
- Creates execution in DB
- Sends first Step message
- Parameters
- Update Execution (via update_execution handler)
- Parameters
- Execution ID
- Step status
- Step output
- Action
- Update execution outputs/last_completed_step in DB
- Parameters
- Audience Exports update execution (via audience-export-completed)
- Parameters
- Execution ID
- Step status
- Step output
- Action
- Update execution outputs/last_completed_step in DB
- Parameters
- Audience Imports update execution (via audience-import-finished)
- Parameters
- Execution ID
- Step status
- Step output
- Action
- Update execution outputs/last_completed_step in DB
- Parameters
- SQS
- Send Task/Execution info to next Step Handler
In order to avoid having to build an infrastructure inside audience-connect to handle communication between audience exports/imports completion events (like we did in member-sync via the jobs tables), we enhance audience export/imports to accept a callback url. When the export/imports completion events occur, we will call that URL with the payload of the completion message body.
The callback URL passed to the export (via GetMembersFromAudienceExport) and the import (via ImportMembersToAudience) will include the Execution ID and current Step index. This will allow the Router to be able to update the specified execution for the current step with the output from the export/import.
This will be all the Step handlers (building blocks) that we add along the way. The general expection is that they'll need the following info:
- Execution ID
- Current Step Index
- Parameters (Router builds this from Task Definition + Execution outputs)
When finished, the handler will send a new message to the Update_Execution with the following info:
- Execution ID
- Current Step Index
- Output
Note: GetMembersFromAudienceExport and ImportMembersToAudience will not send a message to Update_Execution
This handler will accept the information from the step handlers and communicate that information back to the Router via HTTP. This gives us a fault-tolerant system so we can update the execution out of band if needed.
CREATE SCHEMA connect;
CREATE TABLE connect.tasks (
task_id SERIAL PRIMARY KEY,
task_name TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
task_definition JSONB NOT NULL,
integration_type INT NOT NULL,
);
CREATE TABLE connect.ref_integration_types (
integration_type_id SERIAL PRIMARY KEY,
integration_type_name VARCHAR(150) NOT NULL,
integration_enabled BOOLEAN NOT NULL DEFAULT true
);
CREATE TABLE connect.task_runtimes (
task_id INTEGER NOT NULL,
runtime_hour INTEGER NOT NULL CHECK (runtime_hour >= 0 AND runtime_hour <= 23),
PRIMARY KEY (task_id, runtime_hour)
);
CREATE TABLE connect.task_executions (
execution_id BIGSERIAL PRIMARY KEY,
task_id INTEGER NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP WITH TIME ZONE,
execution_status VARCHAR(1) NOT NULL DEFAULT 'p',
exception TEXT,
last_completed_step INT,
outputs JSONB
);