Skip to content

Instantly share code, notes, and snippets.

@michaloo
Last active April 30, 2018 15:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michaloo/fd334b1c67157dc02ad53d4b633247ac to your computer and use it in GitHub Desktop.
Save michaloo/fd334b1c67157dc02ad53d4b633247ac to your computer and use it in GitHub Desktop.
Next generation Hull Client
// @flow
/**
* This is example of a handler which works on users in groups
*/
function updateUsersOn3rdParty(): Promise<*> {
}
function exampleNotificationHandler(ctx, messages): Promise<Array<>> {
const messagesToSkip = messages.filter(shouldSkip);
const messagesToProcess = messages.filter(shouldWork);
return updateUsersOn3rdPart(messages)
.then((processedMessages) => {
return flatten(concat(
messagesToSkip.map(({ user }) => hullClient.asUser(user).logger.debug("outgoing.user.skip")),
processedMessages.reduce(({ user, processedData, fetchedTraits }) => {
const scopedHullClient = hullClient.asUser(user);
return [
scopedHullClient.logger.info("outgoing.user.success", processedData),
scopedHullClient.traits(fetchedTraits)
];
});
));
});
}
// @flow
/**
* This is example of a handler which works on users one by one
*/
function performUrlRequest(): Promise<*> {
}
function exampleNotificationHandler(ctx, messages): Promise<Array<Object>> {
const urlToSendDataTo = ctx.connector.private_settings.url;
return messages.map((message) => {
const scopedClient = hullClient.asUser(message.user);
if (shouldSkip(message)) {
return scopedClient.logger.info("outgoing.user.skip", { reason: "" });
}
if (shouldSkipAndMarkUser(message)) {
return [
scopedClient.logger.info("outgoing.user.skip", { reason: "" });
scopedClient.traits({ should_skip_always: true });
];
}
return performUrlRequest(message)
.then(() => scopedClient.logger.info("outgoing.user.success", { reason: "" }))
.catch(() => scopedClient.logger.error("outgoing.user.error", { reason: "" }));
});
}
// @flow
// DATA STRUCTURES TO READ FROM PLATFORM
// combined ident and attributes
type Entity = {};
type Account = {};
type User = {};
// Entity event coming from platform
type EntityEvent = {};
// DATA STRUCTURES TO WRITE TO PLATFORM
// separate claims to find entity
type EntityClaims = {};
type AccountClaims = {};
type UserClaims = {};
type AuxiliaryClaims = {
create: boolean,
active: boolean
};
// traits setters - direct values or `operation`
type EntityTraits = {};
type AccountTraits = {};
type UserTraits = {};
// separate stuff to create/write an event
type EntityEventName = string;
type EntityEventProperties = {};
type EntityEventContext = {};
type SubjectType = "account" | "user";
// this is what HullClient produces when we call any log function
type LogLine = {
level: string,
message: string,
data: Object,
context: {
organization: string,
connector_id: string,
subject_type: SubjectType,
user_email?: string,
user_id?: string,
user_external_id?: string,
user_anonyomous_id?: string,
account_id?: string,
account_external_id?: string,
account_domain?: string
}
};
// this is what HullClient produces when we call traits/track method
type FirehoseEvent = {
accessToken: string, // JWT token
type: "traits" | "track" | "alias",
body: {} // this can contain
};
// platform api response
type ApiResponse = {};
type HullClientConfiguration = {
connectorId: string,
connectorSecret: string,
organization: string,
userClaims?: UserClaims,
accountClaims?: AccountClaims,
subjectType?: SubjectType,
auxiliaryClaims?: AuxiliaryClaims
};
/**
* Base HullClient class. Allows to
* - scope client to an user or anaccount
* - perform API calls,
* - log stuff
* - produce firehose events
*
* Except for API calls it does not have side effects!
* Logger and traits/track/alias just returns stuff
*/
class HullClient {
configuration: HullClientConfiguration;
agent: superagent;
logger: {};
constructor(configuration: HullClientConfiguration) {
// assert / parse configuration
this.configuration = configuration;
this.logger = {
debug: this.log.bind(this, "debug"),
warning: this.log.bind(this, "warning"),
info: this.log.bind(this, "info"),
error: this.log.bind(this, "error")
}
// this.agent = superagent.agent()
// .set("Content-Type", "application/json")
// .set("User-Agent", `Hull Node Client version: ${pkg.version}`)
// .set("Hull-App-Id", configuration.connectorId)
// .set("Hull-Organization", configuration.organization);
}
asUser(userClaims: UserClaims, auxiliaryClaims: AuxiliaryClaims): UserScopedHullClient {
const newConfiguration = {
...this.configuration,
userClaims,
subjectType: "user",
auxiliaryClaims
};
return new UserScopedHullClient(newConfiguration);
}
asAccount(accountClaims: AccountClaims, auxiliaryClaims: AuxiliaryClaims): AccountScopedHullClient {
const newConfiguration = {
...this.configuration,
accountClaims,
subjectType: "account",
auxiliaryClaims
};
return new AccountScopedHullClient(newConfiguration);
}
log(level: string, message: string, data: Object): LogLine {
const context = this.getLogLineContext(this.configuration);
return {
level,
message,
context,
data
};
}
get(url: string): Promise<ApiResponse> {
return this.agent.get(url);
}
post(url: string): Promise<ApiResponse> {
return this.agent.post(url);
}
put(url: string): Promise<ApiResponse> {
return this.agent.put(url);
}
delete(url: string): Promise<ApiResponse> {
return this.agent.delete(url);
}
/**
* generates access token for firehose events
*/
getAccessToken(configuration: HullClientConfiguration): string {
return "";
}
/**
* produces context object for log lines
*/
getLogLineContext(configuration: HullClientConfiguration): Object {
return {};
}
}
/**
* HullClient scoped to User or Account,
* allows to store attributes and events on the entity
*/
class EntityScopedHullClient extends HullClient {
constructor(configuration: HullClientConfiguration) {
super(configuration);
// assert / parse configuration
this.configuration = configuration;
}
traits(entityTraits: EntityTraits): FirehoseEvent {
const accessToken = this.getAccessToken(this.configuration);
return {
type: "traits",
accessToken,
body: entityTraits
};
}
track(
entityEventName: EntityEventName,
entityEventProperties: EntityEventProperties,
entityEventContext: EntityEventContext
): FirehoseEvent {
const accessToken = this.getAccessToken(this.configuration);
return {
type: "track",
accessToken,
body: {
event: entityEventName,
...entityEventContext,
properties: entityEventProperties
}
};
}
alias(): FirehoseEvent {
const accessToken = this.getAccessToken(this.configuration);
return {
type: "alias",
accessToken,
body: entityTraits
};
}
}
/**
* Additionaly when scoped to user we can link the account
* directly to user
* `hullClient.asUser().account().traits();`
*/
class UserScopedHullClient extends EntityScopedHullClient {
account(accountClaims): AccountScopedHullClient {
const newConfiguration = {
...this.configuration,
accountClaims,
subjectType: "account",
auxiliaryClaims
};
return new AccountScopedHullClient();
}
}
/**
* In case of account we don't have any additional methods
*/
class AccountScopedHullClient extends EntityScopedHullClient {}
module.exports = HullClient;
// @flow
import type { FirehoseEvent, LogLine } from "./hull-client-node";
// this is what we expect to get from handlers
// array of log lines or firehose events
type HandlerResponse = Promise<Array<LogLine | FirehoseEvent>>;
class HullConnector {
constructor(options) {
// set the option
}
setupApp(app) {
// init just the base of request context
app.use((req, res, next) => {
req.hull = {};
req.hull.cache = new CacheAgent;
req.hull.metric = new MetricAgent;
req.hull.queue = new QueueAgent; // still would need for datanyze or mailchimp bulk jobs (for getting campaign activity)
next();
});
}
// start the application just as now
startApp(app) {
}
}
/**
* this how we build the rest of request context object
* in case of notifications and schedulers
*/
function buildNotificationRequestContext(req) {
const secret = req.query.secret;
const ctx = {
notification_id: req.body.notification_id,
connector: req.body.connector,
ship: req.body.connector, // legacy alias
accounts_segments: req.body.accounts_segments,
users_segments: req.body.segments,
segments: req.body.segments, // legacy alias
configuration: req.body.configuration,
cache: req.hull.cache
};
ctx.client = new HullClient({
connectorId: req.configuration.id,
secret
});
return ctx;
}
/**
* this is how we need to build request context
* for other requests where we need to fetch
* and cache connector object and segments lists
*/
async function buildOtherRequestContext(req) {
const secret = req.query.secret;
const connectorId = req.query.ship; // legacy!
const organization = req.query.organization;
const ctx = {
notification_id: "other:request",
cache: req.hull.cache,
configuration: {
connectorId,
secret,
organization
}
};
ctx.client = new HullClient(ctx.configuration);
const connector = await req.cache.wrap("connector", ctx.client.get(connectorId));
const usersSegments = await req.cache.wrap("users_segments", ctx.client.get("/users_segments"));
const accountsSegments = await req.cache.wrap("accounts_segments", ctx.client.get("/accounts_segments"));
ctx.connector = connector;
ctx.ship = connector;
ctx.segments = usersSegments;
ctx.users_segments = usersSegments;
ctx.accounts_segments = accountsSegments;
return ctx;
}
/**
* Works for scheduler endpoint,
* similar would work for status endpoint
*
* where we fail?
* in the response for long running processes like sql import
*/
function schedulerHandler(handler: () => HandlerResponse): () => void {
return function schedulerHandlerRouter(req, res) {
const ctx = buildNotificationRequestContext(req);
handler(ctx)
.then((results: Array<LogLine | FirehoseEvent>) => {
const firehosePayload = processResults(results);
firehosePayloads.map(payload => ctx.client.post("/firehose", firehosePayload))
})
.catch(() => {
// unhandled rejection
res.json({ error: true });
});
};
}
/**
* Works for kraken and batch endpoints,
* I've added one handler per one channel signature since it's a little bit easier
*
* where we fail?
* for long running processes like big batch
*/
function notificationHandler(channel: string, handler: () => HandlerResponse): () => void {
return function notificationHandlerRouter(req, res) {
const ctx = buildNotificationRequestContext(req);
handler(ctx, req.body.messages)
.then((results: Array<LogLine | FirehoseEvent>) => {
const firehosePayloads = processResults(results);
firehosePayloads.map(payload => ctx.client.post("/firehose", firehosePayload))
})
.catch(() => {
// unhandled rejection
res.json({ error: true });
});
}
}
/**
* A handler for UI settings dropdowns options
* or button on the dashboard to trigger an action.
*
*
* Where we fail?
* on a long running process like fetch all intercom users
*/
function dashboardHandler(handler: () => void): () => void {
return async function dashboardHandlerRouter(req, res) {
const ctx = await buildOtherRequestContext(req);
handler(req, res);
}
}
/**
* For the response of handlers for notificatons/batches/schedulers/manual triggers
* which results in any log line or firehose events
*
* We go over the resulting array, log stuff to stdout/stderr
* and return a chunked array to be sent to firehose endpoint
*/
function processResults(results: Array<LogLine | FirehoseEvent>): Array<Object> {
const logLines = results.filter(isLogLine);
logLines.forEach(processLogLine);
const firehoseEvents = results.filter(isFirehoseEvent);
return chunk(firehoseEvents).map(processFirehoseEvents);
}
function isLogLine(entry: LogLine | FirehoseEvent): boolean {
}
function processLogLine(entry: LogLine): void {
console.log(JSON.stringify(entry));
}
function isFirehoseEvent(entry: LogLine | FirehoseEvent): boolean {
}
function chunk(arr, chunkSize) {
const result = [];
const tmp = [...arr];
while (tmp.length) {
result.push(tmp.splice(0, chunkSize));
}
return result;
}
const jwt = require("jwt-simple");
const HullClient = require("./hull-client-node");
const hullClient = new HullClient({
connnectorId: "1234",
secret: "xxx",
organization: "abc.hull.io"
});
hullClient.asUser({ email: "foo@bar.com" }).logger.info("test", { coconuts: 10 })
=== {
level: "info",
message: "test",
context: {
connector_id: "1234",
organization: "abc.hull.io",
user_email: "foo@bar.com",
subject_type: "user"
},
data: { coconuts: 10 }
};
hullClient.asUser({ email: "foo@bar.com" }).traits({ coconuts: 10 })
=== {
type: "traits",
accessToken: jwt.encode({
"io.hull.subjectType": "user",
"io.hull.asUser": {
email: "foo@bar"
}
}, "xxx"),
body: {
coconuts: 10
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment