Last active
June 28, 2024 03:27
-
-
Save lyluongthien/4dceaa3e589939da554b70633331428c to your computer and use it in GitHub Desktop.
Gist used in presentation of Kai at GT: Optimization and fault tolerance in distributed transaction with Node.JS GraphQL Servers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// example for 2 phase commit transaction | |
const { Client } = require("pg"); | |
const jsforce = require("jsforce"); | |
const pgClient = new Client({ | |
connectionString: "postgres://user:password@localhost:5432/db1", | |
}); | |
const sfConn = new jsforce.Connection({ | |
loginUrl: "https://the-salesforce-instance-url.com", | |
username: "the-salesforce-username", | |
password: "the-salesforce-password", | |
}); | |
async function twoPhaseCommit(pgClient, sfConn, pgTransaction, sfTransaction) { | |
let data; | |
try { | |
// Prepare phase: Begin transaction in PostgreSQL | |
await pgClient.connect(); | |
await sfConn.connect() | |
await pgClient.query("BEGIN"); | |
// Perform PostgreSQL transaction operation | |
const pgVote = await pgTransaction(pgClient); | |
// Perform Salesforce transaction operation | |
const [sfVote, sfdata] = await sfTransaction(sfConn); | |
data = sfdata; | |
// Commit phase: If both votes are 'yes', commit; otherwise, rollback | |
if (pgVote === "yes" && sfVote === "yes") { | |
await pgClient.query("COMMIT"); | |
console.log("Transaction committed"); | |
} else { | |
throw new Error("Transaction aborted"); | |
} | |
} catch (error) { | |
// Rollback on failure | |
await pgClient.query("ROLLBACK"); | |
if (data) await sfConn.sobject("Contact").delete(data.id); | |
console.error("Transaction failed", error); | |
} finally { | |
// Close connections | |
await pgClient.end(); | |
} | |
} | |
async function pgTransaction(client) { | |
try { | |
await client.query("INSERT INTO users (id, name) VALUES ($1, $2)", [1, "John Doe"]); | |
return "yes"; | |
} catch (error) { | |
console.error("PostgreSQL transaction failed", error); | |
return "no"; | |
} | |
} | |
async function sfTransaction(conn) { | |
try { | |
const record = { Name: "John Doe" }; | |
const data = await conn.sobject("Contact").create(record); | |
return ["yes", data]; | |
} catch (error) { | |
console.error("Salesforce transaction failed", error); | |
return ["no"]; | |
} | |
} | |
// Start the transaction | |
twoPhaseCommit(pgClient, sfConn, pgTransaction, sfTransaction); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// example of converting build tree function of DMU | |
#include <node.h> | |
#include <vector> | |
#include <string> | |
#include <unordered_map> | |
using namespace v8; | |
struct Data { | |
std::string parentId; | |
// Add other fields as needed | |
}; | |
struct DecisionMakingUnit { | |
std::string id; | |
Data data; | |
DecisionMakingUnit* parent; | |
std::vector<DecisionMakingUnit*> children; | |
int depth; | |
}; | |
void BuildTree(const FunctionCallbackInfo<Value>& args) { | |
Isolate* isolate = args.GetIsolate(); | |
// Check the arguments | |
if (args.Length() < 2 || !args[0]->IsArray() || !args[1]->IsArray()) { | |
isolate->ThrowException(Exception::TypeError( | |
String::NewFromUtf8(isolate, "Invalid arguments"))); | |
return; | |
} | |
// Extract JavaScript arrays | |
Local<Array> allItemsArray = Local<Array>::Cast(args[0]); | |
Local<Array> rootsArray = Local<Array>::Cast(args[1]); | |
std::unordered_map<std::string, DecisionMakingUnit*> nodeMap; | |
// Step 1: Create all nodes and store them in the map | |
for (uint32_t i = 0; i < allItemsArray->Length(); ++i) { | |
Local<Object> item = Local<Object>::Cast(allItemsArray->Get(i)); | |
DecisionMakingUnit* node = new DecisionMakingUnit(); | |
node->id = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "id"))->ToString()); | |
node->data.parentId = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "parentId"))->ToString()); | |
// Initialize other fields as needed | |
nodeMap[node->id] = node; | |
} | |
// Step 2: Establish parent-child relationships | |
for (uint32_t i = 0; i < allItemsArray->Length(); ++i) { | |
Local<Object> item = Local<Object>::Cast(allItemsArray->Get(i)); | |
std::string itemId = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "id"))->ToString()); | |
DecisionMakingUnit* node = nodeMap[itemId]; | |
std::string parentId = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "parentId"))->ToString()); | |
DecisionMakingUnit* parent = nodeMap[parentId]; | |
if (parentId != "" && parent) { | |
node->parent = parent; | |
node->depth = parent->depth + 1; | |
parent->children.push_back(node); | |
} else { | |
// If no parent found, it's a root node | |
rootsArray->Set(rootsArray->Length(), item); | |
} | |
} | |
args.GetReturnValue().Set(rootsArray); | |
} | |
void Initialize(Local<Object> exports) { | |
NODE_SET_METHOD(exports, "buildTree", BuildTree); | |
} | |
NODE_MODULE(NODE_GYP_MODULE_NAME, Initialize) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CancelableCircuitBreaker { | |
constructor(private resolver: GraphQLResolver, private timeout: number, public cancelEvent: null | EventEmitter = null | |
) {} | |
wrap() { | |
return <GraphQLResolver>(async (...args) => { | |
const winner = await Promise.race([ | |
new Promise((resolve) => | |
setTimeout(() => resolve("Timeout"), this.timeout) | |
), | |
new Promise((resolve) => | |
context.req.on("close", () => resolve("ClientRequestClosed")) | |
), | |
this.cancelEvent | |
? new Promise((resolve) => | |
this.cancelEvent?.on("close", () => | |
resolve("ClientRequestClosed") | |
) | |
) | |
: null, | |
this.resolver(...args), | |
].filter(Boolean)); | |
switch (winner) { | |
case "Timeout": | |
throw new http.RequestTimeoutError(); | |
case "Close": | |
console.log("Client cancelled request before it complete"); | |
return null; | |
default: | |
return winner; // return data of resolver as normal | |
} | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class Observable<T> { | |
private observers: Observer<T>[] = []; | |
public lastError?: Error; | |
public lastValue?: T; | |
public state: "idle" | "started" | "ended" | "error" = "idle"; | |
constructor(private process: (o: Observer<T>) => void, private option = { mergeValue: false }) {} | |
private start() { | |
if (this.state === "started") return; | |
this.state = "started"; | |
this.process({ | |
next: (d) => this.next(d), | |
error: (e) => this.error(e), | |
complete: () => { | |
this.complete(); | |
}, | |
}); | |
} | |
subscribe(observer: Observer<T>) { | |
this.observers.push(observer); | |
if (this.lastValue !== undefined) observer.next?.(this.lastValue); | |
this.start(); | |
return () => { | |
this.unsubscribe(observer); | |
}; | |
} | |
unsubscribe(observer: Observer<T>) { | |
this.observers = this.observers.filter((o) => o === observer); | |
} | |
next(newValue: T) { | |
this.lastValue = newValue; | |
let value = newValue; | |
if (this.option.mergeValue) { | |
value = Object.assign({}, this.lastValue || {}, newValue); | |
} | |
this.observers.forEach((observer) => observer.next?.(value)); | |
} | |
error(error: Error) { | |
this.state = "error"; | |
this.lastError = error; | |
this.observers.forEach((observer) => observer.error?.(error)); | |
} | |
complete(error?: Error) { | |
this.state = "ended"; | |
this.observers.forEach((observer) => observer.complete?.(this.lastValue, error)); | |
this.observers = []; | |
} | |
toPromise() { | |
return new Promise<T>((resolve, reject) => { | |
if (this.state === "ended") { | |
resolve(this.lastValue as T); | |
} | |
if (this.state === "error") { | |
reject(this.lastError); | |
} | |
this.subscribe({ | |
complete(lastData, error) { | |
if (lastData !== undefined) { | |
resolve(lastData); | |
} | |
reject(error); | |
}, | |
}); | |
}); | |
} | |
} | |
const Observables: Record<string, Observable<any>> = {}; | |
export const createUniqObservable = <T>(key: string, getPromise: () => Promise<T>) => { | |
if (!Observables[key]) { | |
Observables[key] = new Observable(promiseToObserverProcess(getPromise)); | |
} | |
return Observables[key] as Observable<T>; | |
}; | |
export const promiseToObserverProcess = <T>(getPromise: () => Promise<T>) => { | |
return (observer: Observer<T>) => { | |
getPromise().then(observer.next).catch(observer.error).finally(observer.complete); | |
}; | |
}; | |
export type Observer<T> = { | |
next?: (data: T) => void; | |
error?: (error: Error) => void; | |
complete?: (lastData?: T, error?: Error) => void; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// example for saga transaction | |
const { Client } = require('pg'); | |
const jsforce = require('jsforce'); | |
const pgClient = new Client({ | |
connectionString: 'postgres://user:password@localhost:5432/db1', | |
}); | |
const sfConn = new jsforce.Connection({ | |
loginUrl: 'https://your-salesforce-instance-url.com', | |
username: 'your-salesforce-username', | |
password: 'your-salesforce-password' | |
}); | |
async function createUserSaga(pgClient, sfConn) { | |
try { | |
// Step 1: Create user in PostgreSQL | |
await pgClient.connect(); | |
await pgClient.query('BEGIN'); | |
await pgClient.query('INSERT INTO users (id, name) VALUES ($1, $2)', [1, 'John Doe']); | |
// Step 2: Create contact in Salesforce | |
const sfContact = { FirstName: 'John', LastName: 'Doe' }; | |
const sfRecord = await sfConn.sobject('Contact').create(sfContact); | |
// Commit PostgreSQL transaction if everything is successful | |
await pgClient.query('COMMIT'); | |
console.log('Saga completed successfully'); | |
} catch (error) { | |
// Compensating actions in case of failure | |
console.error('Saga failed, performing compensating actions', error); | |
// Rollback PostgreSQL transaction | |
await pgClient.query('ROLLBACK'); | |
// Delete the Salesforce contact if it was created | |
try { | |
if (sfRecord && sfRecord.id) { | |
await sfConn.sobject('Contact').delete(sfRecord.id); | |
} | |
} catch (deleteError) { | |
console.error('Failed to delete Salesforce contact', deleteError); | |
} | |
} finally { | |
// Close PostgreSQL connection | |
await pgClient.end(); | |
} | |
} | |
// Start the saga | |
createUserSaga(pgClient, sfConn); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment