Skip to content

Instantly share code, notes, and snippets.

@lyluongthien
Last active June 28, 2024 03:27
Show Gist options
  • Save lyluongthien/4dceaa3e589939da554b70633331428c to your computer and use it in GitHub Desktop.
Save lyluongthien/4dceaa3e589939da554b70633331428c to your computer and use it in GitHub Desktop.
Gist used in presentation of Kai at GT: Optimization and fault tolerance in distributed transaction with Node.JS GraphQL Servers
// example for 2 phase commit transaction
const { Client } = require("pg");
const jsforce = require("jsforce");
const pgClient = new Client({
connectionString: "postgres://user:password@localhost:5432/db1",
});
const sfConn = new jsforce.Connection({
loginUrl: "https://the-salesforce-instance-url.com",
username: "the-salesforce-username",
password: "the-salesforce-password",
});
async function twoPhaseCommit(pgClient, sfConn, pgTransaction, sfTransaction) {
let data;
try {
// Prepare phase: Begin transaction in PostgreSQL
await pgClient.connect();
await sfConn.connect()
await pgClient.query("BEGIN");
// Perform PostgreSQL transaction operation
const pgVote = await pgTransaction(pgClient);
// Perform Salesforce transaction operation
const [sfVote, sfdata] = await sfTransaction(sfConn);
data = sfdata;
// Commit phase: If both votes are 'yes', commit; otherwise, rollback
if (pgVote === "yes" && sfVote === "yes") {
await pgClient.query("COMMIT");
console.log("Transaction committed");
} else {
throw new Error("Transaction aborted");
}
} catch (error) {
// Rollback on failure
await pgClient.query("ROLLBACK");
if (data) await sfConn.sobject("Contact").delete(data.id);
console.error("Transaction failed", error);
} finally {
// Close connections
await pgClient.end();
}
}
async function pgTransaction(client) {
try {
await client.query("INSERT INTO users (id, name) VALUES ($1, $2)", [1, "John Doe"]);
return "yes";
} catch (error) {
console.error("PostgreSQL transaction failed", error);
return "no";
}
}
async function sfTransaction(conn) {
try {
const record = { Name: "John Doe" };
const data = await conn.sobject("Contact").create(record);
return ["yes", data];
} catch (error) {
console.error("Salesforce transaction failed", error);
return ["no"];
}
}
// Start the transaction
twoPhaseCommit(pgClient, sfConn, pgTransaction, sfTransaction);
// example of converting build tree function of DMU
#include <node.h>
#include <vector>
#include <string>
#include <unordered_map>
using namespace v8;
struct Data {
std::string parentId;
// Add other fields as needed
};
struct DecisionMakingUnit {
std::string id;
Data data;
DecisionMakingUnit* parent;
std::vector<DecisionMakingUnit*> children;
int depth;
};
void BuildTree(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = args.GetIsolate();
// Check the arguments
if (args.Length() < 2 || !args[0]->IsArray() || !args[1]->IsArray()) {
isolate->ThrowException(Exception::TypeError(
String::NewFromUtf8(isolate, "Invalid arguments")));
return;
}
// Extract JavaScript arrays
Local<Array> allItemsArray = Local<Array>::Cast(args[0]);
Local<Array> rootsArray = Local<Array>::Cast(args[1]);
std::unordered_map<std::string, DecisionMakingUnit*> nodeMap;
// Step 1: Create all nodes and store them in the map
for (uint32_t i = 0; i < allItemsArray->Length(); ++i) {
Local<Object> item = Local<Object>::Cast(allItemsArray->Get(i));
DecisionMakingUnit* node = new DecisionMakingUnit();
node->id = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "id"))->ToString());
node->data.parentId = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "parentId"))->ToString());
// Initialize other fields as needed
nodeMap[node->id] = node;
}
// Step 2: Establish parent-child relationships
for (uint32_t i = 0; i < allItemsArray->Length(); ++i) {
Local<Object> item = Local<Object>::Cast(allItemsArray->Get(i));
std::string itemId = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "id"))->ToString());
DecisionMakingUnit* node = nodeMap[itemId];
std::string parentId = *String::Utf8Value(item->Get(String::NewFromUtf8(isolate, "parentId"))->ToString());
DecisionMakingUnit* parent = nodeMap[parentId];
if (parentId != "" && parent) {
node->parent = parent;
node->depth = parent->depth + 1;
parent->children.push_back(node);
} else {
// If no parent found, it's a root node
rootsArray->Set(rootsArray->Length(), item);
}
}
args.GetReturnValue().Set(rootsArray);
}
void Initialize(Local<Object> exports) {
NODE_SET_METHOD(exports, "buildTree", BuildTree);
}
NODE_MODULE(NODE_GYP_MODULE_NAME, Initialize)
class CancelableCircuitBreaker {
constructor(private resolver: GraphQLResolver, private timeout: number, public cancelEvent: null | EventEmitter = null
) {}
wrap() {
return <GraphQLResolver>(async (...args) => {
const winner = await Promise.race([
new Promise((resolve) =>
setTimeout(() => resolve("Timeout"), this.timeout)
),
new Promise((resolve) =>
context.req.on("close", () => resolve("ClientRequestClosed"))
),
this.cancelEvent
? new Promise((resolve) =>
this.cancelEvent?.on("close", () =>
resolve("ClientRequestClosed")
)
)
: null,
this.resolver(...args),
].filter(Boolean));
switch (winner) {
case "Timeout":
throw new http.RequestTimeoutError();
case "Close":
console.log("Client cancelled request before it complete");
return null;
default:
return winner; // return data of resolver as normal
}
});
}
}
export class Observable<T> {
private observers: Observer<T>[] = [];
public lastError?: Error;
public lastValue?: T;
public state: "idle" | "started" | "ended" | "error" = "idle";
constructor(private process: (o: Observer<T>) => void, private option = { mergeValue: false }) {}
private start() {
if (this.state === "started") return;
this.state = "started";
this.process({
next: (d) => this.next(d),
error: (e) => this.error(e),
complete: () => {
this.complete();
},
});
}
subscribe(observer: Observer<T>) {
this.observers.push(observer);
if (this.lastValue !== undefined) observer.next?.(this.lastValue);
this.start();
return () => {
this.unsubscribe(observer);
};
}
unsubscribe(observer: Observer<T>) {
this.observers = this.observers.filter((o) => o === observer);
}
next(newValue: T) {
this.lastValue = newValue;
let value = newValue;
if (this.option.mergeValue) {
value = Object.assign({}, this.lastValue || {}, newValue);
}
this.observers.forEach((observer) => observer.next?.(value));
}
error(error: Error) {
this.state = "error";
this.lastError = error;
this.observers.forEach((observer) => observer.error?.(error));
}
complete(error?: Error) {
this.state = "ended";
this.observers.forEach((observer) => observer.complete?.(this.lastValue, error));
this.observers = [];
}
toPromise() {
return new Promise<T>((resolve, reject) => {
if (this.state === "ended") {
resolve(this.lastValue as T);
}
if (this.state === "error") {
reject(this.lastError);
}
this.subscribe({
complete(lastData, error) {
if (lastData !== undefined) {
resolve(lastData);
}
reject(error);
},
});
});
}
}
const Observables: Record<string, Observable<any>> = {};
export const createUniqObservable = <T>(key: string, getPromise: () => Promise<T>) => {
if (!Observables[key]) {
Observables[key] = new Observable(promiseToObserverProcess(getPromise));
}
return Observables[key] as Observable<T>;
};
export const promiseToObserverProcess = <T>(getPromise: () => Promise<T>) => {
return (observer: Observer<T>) => {
getPromise().then(observer.next).catch(observer.error).finally(observer.complete);
};
};
export type Observer<T> = {
next?: (data: T) => void;
error?: (error: Error) => void;
complete?: (lastData?: T, error?: Error) => void;
};
// example for saga transaction
const { Client } = require('pg');
const jsforce = require('jsforce');
const pgClient = new Client({
connectionString: 'postgres://user:password@localhost:5432/db1',
});
const sfConn = new jsforce.Connection({
loginUrl: 'https://your-salesforce-instance-url.com',
username: 'your-salesforce-username',
password: 'your-salesforce-password'
});
async function createUserSaga(pgClient, sfConn) {
try {
// Step 1: Create user in PostgreSQL
await pgClient.connect();
await pgClient.query('BEGIN');
await pgClient.query('INSERT INTO users (id, name) VALUES ($1, $2)', [1, 'John Doe']);
// Step 2: Create contact in Salesforce
const sfContact = { FirstName: 'John', LastName: 'Doe' };
const sfRecord = await sfConn.sobject('Contact').create(sfContact);
// Commit PostgreSQL transaction if everything is successful
await pgClient.query('COMMIT');
console.log('Saga completed successfully');
} catch (error) {
// Compensating actions in case of failure
console.error('Saga failed, performing compensating actions', error);
// Rollback PostgreSQL transaction
await pgClient.query('ROLLBACK');
// Delete the Salesforce contact if it was created
try {
if (sfRecord && sfRecord.id) {
await sfConn.sobject('Contact').delete(sfRecord.id);
}
} catch (deleteError) {
console.error('Failed to delete Salesforce contact', deleteError);
}
} finally {
// Close PostgreSQL connection
await pgClient.end();
}
}
// Start the saga
createUserSaga(pgClient, sfConn);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment