Skip to content

Instantly share code, notes, and snippets.

@ipcrm
Last active June 9, 2020 20:57
Show Gist options
  • Save ipcrm/638651d9b8c746b2426de468b15aa7bc to your computer and use it in GitHub Desktop.
Save ipcrm/638651d9b8c746b2426de468b15aa7bc to your computer and use it in GitHub Desktop.
Triggered Listener Coordination
/*
* Copyright © 2020 Atomist, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {AutomationContextAware, Configuration, guid, HandlerContext, logger} from "@atomist/automation-client";
import {WebSocketLifecycle} from "@atomist/automation-client/lib/internal/transport/websocket/WebSocketLifecycle";
import {
AbstractWebSocketMessageClient,
} from "@atomist/automation-client/lib/internal/transport/websocket/WebSocketMessageClient";
import * as namespace from "@atomist/automation-client/lib/internal/util/cls";
import {AnyPush} from "@atomist/sdm";
import {configure, GraphQLPreferenceStoreFactory} from "@atomist/sdm-core";
import * as cluster from "cluster";
import * as os from "os";
import {HelloWorldGoalConfigurer} from "./lib/goals/goalConfigurer";
import {HelloWorldGoalCreator} from "./lib/goals/goalCreator";
import {HelloWorldGoals} from "./lib/goals/goals";
class TriggeredMessageClient extends AbstractWebSocketMessageClient {
constructor(ws: WebSocketLifecycle,
workspaceId: string,
// tslint:disable-next-line:no-shadowed-variable
configuration: Configuration) {
super(ws, {} as any, guid(), { id: workspaceId }, {} as any, configuration);
}
}
interface ListenerActive {
name: string;
ts: number;
}
/**
* The main entry point into the SDM
*/
export const configuration = configure<HelloWorldGoals>(async sdm => {
sdm.addTriggeredListener({
trigger: { interval: 60000 },
listener: async l => {
// Only run this on the master, otherwise it will run this listener on every worker
if (cluster.isMaster) {
const workspaceIds = l.sdm.configuration.workspaceIds;
if (!!workspaceIds && workspaceIds.length > 0) {
for (const workspaceId of workspaceIds) {
const ses = namespace.create();
ses.run(async () => {
const id = guid();
namespace.set({
invocationId: id,
correlationId: id,
workspaceName: workspaceId,
workspaceId,
operation: "ManagePendingGoalSets",
ts: Date.now(),
name: l.sdm.configuration.name as string,
version: l.sdm.configuration.version as string,
});
try {
const graphClient = l.sdm.configuration.graphql?.client?.factory.create(
workspaceId,
l.sdm.configuration
);
const messageClient = new TriggeredMessageClient(
(l.sdm.configuration.ws as any).lifecycle,
workspaceId,
l.sdm.configuration
) as any;
const ctx: HandlerContext & AutomationContextAware = {
graphClient,
messageClient,
workspaceId,
correlationId: id,
invocationId: id,
context: {
name: l.sdm.configuration.name as string,
version: l.sdm.configuration.version as string,
operation: "RunTriggeredListener",
ts: Date.now(),
workspaceId,
workspaceName: workspaceId,
correlationId: id,
invocationId: id,
},
} as any;
const myId = `${os.hostname()}/${process.pid}`;
const key = `${l.sdm.configuration.name}/my-triggered-listener-active`;
const prefs = GraphQLPreferenceStoreFactory(ctx);
const current = await prefs.get<ListenerActive>(key);
// 3 minute delay. If an SDM hasn't checked in in 3 minutes that it is the active SDM for this
// triggered listener someone else should take over (60 second interval)
const delay = 180000;
if (current?.name === undefined) {
logger.info(`No one has registered yet, taking over...`);
await prefs.put<ListenerActive>(key, {
name: myId,
ts: Date.now(),
});
} else if (
current?.name === myId &&
current?.ts > Date.now() - delay
) {
logger.info(`It's me! Do something useful!`);
await prefs.put<ListenerActive>(key, {
name: myId,
ts: Date.now(),
});
} else if (
current?.name !== myId &&
current?.ts < Date.now() - delay
) {
logger.info(
`Delay Exceeeded; taking over from ${current?.name}`,
);
await prefs.put<ListenerActive>(key, {
name: myId,
ts: Date.now(),
});
} else {
logger.info(
`Instance ${current.name} is actively running this triggered listener`,
);
}
} catch (e) {
logger.debug("Error managing triggered listener: %s", e.stack);
}
});
}
}
}
},
});
// Use the sdm instance to configure commands etc
sdm.addCommand({
name: "HelloWorld",
description: "Command that responds with a 'hello world'",
listener: async ci => {
await ci.addressChannels("Hello World");
},
});
// Create goals and configure them
const goals = await sdm.createGoals(HelloWorldGoalCreator, [
HelloWorldGoalConfigurer,
]);
// Return all push rules
return {
hello: {
test: AnyPush,
goals: goals.helloWorld,
},
};
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment