Last active
April 30, 2018 15:26
-
-
Save michaloo/fd334b1c67157dc02ad53d4b633247ac to your computer and use it in GitHub Desktop.
Next generation Hull Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// @flow | |
/** | |
* This is example of a handler which works on users in groups | |
*/ | |
function updateUsersOn3rdParty(): Promise<*> { | |
} | |
function exampleNotificationHandler(ctx, messages): Promise<Array<>> { | |
const messagesToSkip = messages.filter(shouldSkip); | |
const messagesToProcess = messages.filter(shouldWork); | |
return updateUsersOn3rdPart(messages) | |
.then((processedMessages) => { | |
return flatten(concat( | |
messagesToSkip.map(({ user }) => hullClient.asUser(user).logger.debug("outgoing.user.skip")), | |
processedMessages.reduce(({ user, processedData, fetchedTraits }) => { | |
const scopedHullClient = hullClient.asUser(user); | |
return [ | |
scopedHullClient.logger.info("outgoing.user.success", processedData), | |
scopedHullClient.traits(fetchedTraits) | |
]; | |
}); | |
)); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// @flow | |
/** | |
* This is example of a handler which works on users one by one | |
*/ | |
function performUrlRequest(): Promise<*> { | |
} | |
function exampleNotificationHandler(ctx, messages): Promise<Array<Object>> { | |
const urlToSendDataTo = ctx.connector.private_settings.url; | |
return messages.map((message) => { | |
const scopedClient = hullClient.asUser(message.user); | |
if (shouldSkip(message)) { | |
return scopedClient.logger.info("outgoing.user.skip", { reason: "" }); | |
} | |
if (shouldSkipAndMarkUser(message)) { | |
return [ | |
scopedClient.logger.info("outgoing.user.skip", { reason: "" }); | |
scopedClient.traits({ should_skip_always: true }); | |
]; | |
} | |
return performUrlRequest(message) | |
.then(() => scopedClient.logger.info("outgoing.user.success", { reason: "" })) | |
.catch(() => scopedClient.logger.error("outgoing.user.error", { reason: "" })); | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// @flow | |
// DATA STRUCTURES TO READ FROM PLATFORM | |
// combined ident and attributes | |
type Entity = {}; | |
type Account = {}; | |
type User = {}; | |
// Entity event coming from platform | |
type EntityEvent = {}; | |
// DATA STRUCTURES TO WRITE TO PLATFORM | |
// separate claims to find entity | |
type EntityClaims = {}; | |
type AccountClaims = {}; | |
type UserClaims = {}; | |
type AuxiliaryClaims = { | |
create: boolean, | |
active: boolean | |
}; | |
// traits setters - direct values or `operation` | |
type EntityTraits = {}; | |
type AccountTraits = {}; | |
type UserTraits = {}; | |
// separate stuff to create/write an event | |
type EntityEventName = string; | |
type EntityEventProperties = {}; | |
type EntityEventContext = {}; | |
type SubjectType = "account" | "user"; | |
// this is what HullClient produces when we call any log function | |
type LogLine = { | |
level: string, | |
message: string, | |
data: Object, | |
context: { | |
organization: string, | |
connector_id: string, | |
subject_type: SubjectType, | |
user_email?: string, | |
user_id?: string, | |
user_external_id?: string, | |
user_anonyomous_id?: string, | |
account_id?: string, | |
account_external_id?: string, | |
account_domain?: string | |
} | |
}; | |
// this is what HullClient produces when we call traits/track method | |
type FirehoseEvent = { | |
accessToken: string, // JWT token | |
type: "traits" | "track" | "alias", | |
body: {} // this can contain | |
}; | |
// platform api response | |
type ApiResponse = {}; | |
type HullClientConfiguration = { | |
connectorId: string, | |
connectorSecret: string, | |
organization: string, | |
userClaims?: UserClaims, | |
accountClaims?: AccountClaims, | |
subjectType?: SubjectType, | |
auxiliaryClaims?: AuxiliaryClaims | |
}; | |
/** | |
* Base HullClient class. Allows to | |
* - scope client to an user or anaccount | |
* - perform API calls, | |
* - log stuff | |
* - produce firehose events | |
* | |
* Except for API calls it does not have side effects! | |
* Logger and traits/track/alias just returns stuff | |
*/ | |
class HullClient { | |
configuration: HullClientConfiguration; | |
agent: superagent; | |
logger: {}; | |
constructor(configuration: HullClientConfiguration) { | |
// assert / parse configuration | |
this.configuration = configuration; | |
this.logger = { | |
debug: this.log.bind(this, "debug"), | |
warning: this.log.bind(this, "warning"), | |
info: this.log.bind(this, "info"), | |
error: this.log.bind(this, "error") | |
} | |
// this.agent = superagent.agent() | |
// .set("Content-Type", "application/json") | |
// .set("User-Agent", `Hull Node Client version: ${pkg.version}`) | |
// .set("Hull-App-Id", configuration.connectorId) | |
// .set("Hull-Organization", configuration.organization); | |
} | |
asUser(userClaims: UserClaims, auxiliaryClaims: AuxiliaryClaims): UserScopedHullClient { | |
const newConfiguration = { | |
...this.configuration, | |
userClaims, | |
subjectType: "user", | |
auxiliaryClaims | |
}; | |
return new UserScopedHullClient(newConfiguration); | |
} | |
asAccount(accountClaims: AccountClaims, auxiliaryClaims: AuxiliaryClaims): AccountScopedHullClient { | |
const newConfiguration = { | |
...this.configuration, | |
accountClaims, | |
subjectType: "account", | |
auxiliaryClaims | |
}; | |
return new AccountScopedHullClient(newConfiguration); | |
} | |
log(level: string, message: string, data: Object): LogLine { | |
const context = this.getLogLineContext(this.configuration); | |
return { | |
level, | |
message, | |
context, | |
data | |
}; | |
} | |
get(url: string): Promise<ApiResponse> { | |
return this.agent.get(url); | |
} | |
post(url: string): Promise<ApiResponse> { | |
return this.agent.post(url); | |
} | |
put(url: string): Promise<ApiResponse> { | |
return this.agent.put(url); | |
} | |
delete(url: string): Promise<ApiResponse> { | |
return this.agent.delete(url); | |
} | |
/** | |
* generates access token for firehose events | |
*/ | |
getAccessToken(configuration: HullClientConfiguration): string { | |
return ""; | |
} | |
/** | |
* produces context object for log lines | |
*/ | |
getLogLineContext(configuration: HullClientConfiguration): Object { | |
return {}; | |
} | |
} | |
/** | |
* HullClient scoped to User or Account, | |
* allows to store attributes and events on the entity | |
*/ | |
class EntityScopedHullClient extends HullClient { | |
constructor(configuration: HullClientConfiguration) { | |
super(configuration); | |
// assert / parse configuration | |
this.configuration = configuration; | |
} | |
traits(entityTraits: EntityTraits): FirehoseEvent { | |
const accessToken = this.getAccessToken(this.configuration); | |
return { | |
type: "traits", | |
accessToken, | |
body: entityTraits | |
}; | |
} | |
track( | |
entityEventName: EntityEventName, | |
entityEventProperties: EntityEventProperties, | |
entityEventContext: EntityEventContext | |
): FirehoseEvent { | |
const accessToken = this.getAccessToken(this.configuration); | |
return { | |
type: "track", | |
accessToken, | |
body: { | |
event: entityEventName, | |
...entityEventContext, | |
properties: entityEventProperties | |
} | |
}; | |
} | |
alias(): FirehoseEvent { | |
const accessToken = this.getAccessToken(this.configuration); | |
return { | |
type: "alias", | |
accessToken, | |
body: entityTraits | |
}; | |
} | |
} | |
/** | |
* Additionaly when scoped to user we can link the account | |
* directly to user | |
* `hullClient.asUser().account().traits();` | |
*/ | |
class UserScopedHullClient extends EntityScopedHullClient { | |
account(accountClaims): AccountScopedHullClient { | |
const newConfiguration = { | |
...this.configuration, | |
accountClaims, | |
subjectType: "account", | |
auxiliaryClaims | |
}; | |
return new AccountScopedHullClient(); | |
} | |
} | |
/** | |
* In case of account we don't have any additional methods | |
*/ | |
class AccountScopedHullClient extends EntityScopedHullClient {} | |
module.exports = HullClient; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// @flow | |
import type { FirehoseEvent, LogLine } from "./hull-client-node"; | |
// this is what we expect to get from handlers | |
// array of log lines or firehose events | |
type HandlerResponse = Promise<Array<LogLine | FirehoseEvent>>; | |
class HullConnector { | |
constructor(options) { | |
// set the option | |
} | |
setupApp(app) { | |
// init just the base of request context | |
app.use((req, res, next) => { | |
req.hull = {}; | |
req.hull.cache = new CacheAgent; | |
req.hull.metric = new MetricAgent; | |
req.hull.queue = new QueueAgent; // still would need for datanyze or mailchimp bulk jobs (for getting campaign activity) | |
next(); | |
}); | |
} | |
// start the application just as now | |
startApp(app) { | |
} | |
} | |
/** | |
* this how we build the rest of request context object | |
* in case of notifications and schedulers | |
*/ | |
function buildNotificationRequestContext(req) { | |
const secret = req.query.secret; | |
const ctx = { | |
notification_id: req.body.notification_id, | |
connector: req.body.connector, | |
ship: req.body.connector, // legacy alias | |
accounts_segments: req.body.accounts_segments, | |
users_segments: req.body.segments, | |
segments: req.body.segments, // legacy alias | |
configuration: req.body.configuration, | |
cache: req.hull.cache | |
}; | |
ctx.client = new HullClient({ | |
connectorId: req.configuration.id, | |
secret | |
}); | |
return ctx; | |
} | |
/** | |
* this is how we need to build request context | |
* for other requests where we need to fetch | |
* and cache connector object and segments lists | |
*/ | |
async function buildOtherRequestContext(req) { | |
const secret = req.query.secret; | |
const connectorId = req.query.ship; // legacy! | |
const organization = req.query.organization; | |
const ctx = { | |
notification_id: "other:request", | |
cache: req.hull.cache, | |
configuration: { | |
connectorId, | |
secret, | |
organization | |
} | |
}; | |
ctx.client = new HullClient(ctx.configuration); | |
const connector = await req.cache.wrap("connector", ctx.client.get(connectorId)); | |
const usersSegments = await req.cache.wrap("users_segments", ctx.client.get("/users_segments")); | |
const accountsSegments = await req.cache.wrap("accounts_segments", ctx.client.get("/accounts_segments")); | |
ctx.connector = connector; | |
ctx.ship = connector; | |
ctx.segments = usersSegments; | |
ctx.users_segments = usersSegments; | |
ctx.accounts_segments = accountsSegments; | |
return ctx; | |
} | |
/** | |
* Works for scheduler endpoint, | |
* similar would work for status endpoint | |
* | |
* where we fail? | |
* in the response for long running processes like sql import | |
*/ | |
function schedulerHandler(handler: () => HandlerResponse): () => void { | |
return function schedulerHandlerRouter(req, res) { | |
const ctx = buildNotificationRequestContext(req); | |
handler(ctx) | |
.then((results: Array<LogLine | FirehoseEvent>) => { | |
const firehosePayload = processResults(results); | |
firehosePayloads.map(payload => ctx.client.post("/firehose", firehosePayload)) | |
}) | |
.catch(() => { | |
// unhandled rejection | |
res.json({ error: true }); | |
}); | |
}; | |
} | |
/** | |
* Works for kraken and batch endpoints, | |
* I've added one handler per one channel signature since it's a little bit easier | |
* | |
* where we fail? | |
* for long running processes like big batch | |
*/ | |
function notificationHandler(channel: string, handler: () => HandlerResponse): () => void { | |
return function notificationHandlerRouter(req, res) { | |
const ctx = buildNotificationRequestContext(req); | |
handler(ctx, req.body.messages) | |
.then((results: Array<LogLine | FirehoseEvent>) => { | |
const firehosePayloads = processResults(results); | |
firehosePayloads.map(payload => ctx.client.post("/firehose", firehosePayload)) | |
}) | |
.catch(() => { | |
// unhandled rejection | |
res.json({ error: true }); | |
}); | |
} | |
} | |
/** | |
* A handler for UI settings dropdowns options | |
* or button on the dashboard to trigger an action. | |
* | |
* | |
* Where we fail? | |
* on a long running process like fetch all intercom users | |
*/ | |
function dashboardHandler(handler: () => void): () => void { | |
return async function dashboardHandlerRouter(req, res) { | |
const ctx = await buildOtherRequestContext(req); | |
handler(req, res); | |
} | |
} | |
/** | |
* For the response of handlers for notificatons/batches/schedulers/manual triggers | |
* which results in any log line or firehose events | |
* | |
* We go over the resulting array, log stuff to stdout/stderr | |
* and return a chunked array to be sent to firehose endpoint | |
*/ | |
function processResults(results: Array<LogLine | FirehoseEvent>): Array<Object> { | |
const logLines = results.filter(isLogLine); | |
logLines.forEach(processLogLine); | |
const firehoseEvents = results.filter(isFirehoseEvent); | |
return chunk(firehoseEvents).map(processFirehoseEvents); | |
} | |
function isLogLine(entry: LogLine | FirehoseEvent): boolean { | |
} | |
function processLogLine(entry: LogLine): void { | |
console.log(JSON.stringify(entry)); | |
} | |
function isFirehoseEvent(entry: LogLine | FirehoseEvent): boolean { | |
} | |
function chunk(arr, chunkSize) { | |
const result = []; | |
const tmp = [...arr]; | |
while (tmp.length) { | |
result.push(tmp.splice(0, chunkSize)); | |
} | |
return result; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const jwt = require("jwt-simple"); | |
const HullClient = require("./hull-client-node"); | |
const hullClient = new HullClient({ | |
connnectorId: "1234", | |
secret: "xxx", | |
organization: "abc.hull.io" | |
}); | |
hullClient.asUser({ email: "foo@bar.com" }).logger.info("test", { coconuts: 10 }) | |
=== { | |
level: "info", | |
message: "test", | |
context: { | |
connector_id: "1234", | |
organization: "abc.hull.io", | |
user_email: "foo@bar.com", | |
subject_type: "user" | |
}, | |
data: { coconuts: 10 } | |
}; | |
hullClient.asUser({ email: "foo@bar.com" }).traits({ coconuts: 10 }) | |
=== { | |
type: "traits", | |
accessToken: jwt.encode({ | |
"io.hull.subjectType": "user", | |
"io.hull.asUser": { | |
email: "foo@bar" | |
} | |
}, "xxx"), | |
body: { | |
coconuts: 10 | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment