Skip to content

Instantly share code, notes, and snippets.

@GavinRay97
Created August 27, 2020 16:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GavinRay97/4d3c4540cf45db38b39a6c8e55777cb7 to your computer and use it in GitHub Desktop.
Save GavinRay97/4d3c4540cf45db38b39a6c8e55777cb7 to your computer and use it in GitHub Desktop.
Hasura GraphQL Tx over Websockets client
version: '3.6'
services:
postgres:
image: postgres:12
restart: always
volumes:
- db_data:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: postgrespassword
graphql-engine:
image: hasura/graphql-engine:pull3557-2e391cf9
ports:
- '8080:8080'
depends_on:
- 'postgres'
restart: always
environment:
HASURA_GRAPHQL_DATABASE_URL: postgres://postgres:postgrespassword@postgres:5432/postgres
## enable the console served by server
HASURA_GRAPHQL_ENABLE_CONSOLE: 'true' # set to "false" to disable console
## enable debugging mode. It is recommended to disable this in production
HASURA_GRAPHQL_DEV_MODE: 'true'
HASURA_GRAPHQL_ENABLED_LOG_TYPES: startup, http-log, webhook-log, websocket-log, query-log
## uncomment next line to set an admin secret
# HASURA_GRAPHQL_ADMIN_SECRET: myadminsecretkey
volumes:
db_data:
import WebSocket from 'ws'
import {
InitTxPayload,
TxMessageType,
ExecuteTxPayload,
CommitTxPayload,
AbortTxPayload,
INIT_TX_SUCCESS_RESPONSE,
INIT_TX_RESPONSE,
GraphQLOperation,
EXECUTE_TX_SUCCESS_RESPONSE,
EXECUTE_TX_RESPONSE,
COMMIT_OR_ABORT_TX_RESPONSE,
} from './types'
const getUserQuery = `
query q {
users_by_pk(id: 1) {
id
name
}
}
`
const changeUserNameQuery = (id, newName) => `
mutation MyMutation {
__typename
update_users(where: {id: {_eq: ${id}}}, _set: {name: "${newName}"}) {
affected_rows
returning {
name
id
}
}
}`
const changeUserNumericMutation = (id, val) => `
mutation MyMutation {
__typename
update_users(where: {id: {_eq: ${id}} }, _set: { numeric_column: ${val} }) {
affected_rows
returning {
name
id
}
}
}`
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
class TxClient {
socket: WebSocket
ready = false
lastQueryTimestamp: number
constructor(public url: string, public headers?: Record<string, any>) {
this.socket = TxClient.makeSocket(url, headers)
this.socket.on('open', () => (this.ready = true))
// When a transaction is finished, it will have the socket closed
// So listen for this, and replace it. This needs to be slightly throttled
// Which we do by checking timeSinceLastQuery and enforcing a 100ms delay
// Without this, two back-to-back client.transaction() calls will fail
this.socket.on('close', () => {
this.socket = TxClient.makeSocket(url, headers)
})
}
static makeSocket(url, headers) {
return new WebSocket(url, 'graphql-tx', { headers })
}
timeSinceLastQuery() {
return Date.now() - this.lastQueryTimestamp
}
get txns() {
return {
init: (payload?: InitTxPayload['payload']) => {
const msg: InitTxPayload = {
type: TxMessageType.INIT,
payload: payload || {},
}
return this.socket.send(JSON.stringify(msg))
},
execute: (operation: ExecuteTxPayload['payload']['query']) => {
const msg: ExecuteTxPayload = {
type: TxMessageType.EXECUTE,
payload: { query: operation },
}
return this.socket.send(JSON.stringify(msg))
},
commit: () => {
const msg: CommitTxPayload = {
type: TxMessageType.COMMIT,
}
return this.socket.send(JSON.stringify(msg))
},
abort: () => {
const msg: AbortTxPayload = {
type: TxMessageType.ABORT,
}
return this.socket.send(JSON.stringify(msg))
},
}
}
async init(
payload?: InitTxPayload['payload']
): Promise<INIT_TX_SUCCESS_RESPONSE> {
while (!this.ready) await sleep(100)
return new Promise((resolve, reject) => {
this.txns.init(payload)
this.socket.once('message', (msg: WebSocket.Data) => {
const response: INIT_TX_RESPONSE = JSON.parse(String(msg))
switch (response.type) {
case 'initialised':
return resolve(response)
case 'init_error':
return reject(response)
}
})
this.socket.on('error', reject)
})
}
async execute(query: GraphQLOperation): Promise<EXECUTE_TX_SUCCESS_RESPONSE> {
return new Promise((resolve, reject) => {
this.txns.execute(query)
this.socket.once('message', (msg: WebSocket.Data) => {
const response: EXECUTE_TX_RESPONSE = JSON.parse(String(msg))
switch (response.type) {
case 'data':
return resolve(response)
case 'error':
return reject(response)
}
})
this.socket.once('error', reject)
})
}
async commit(): Promise<COMMIT_OR_ABORT_TX_RESPONSE> {
// Throttle
if (this.timeSinceLastQuery() < 100) await sleep(100)
// Set the last query time to now, used for throttling to ensure functionality
this.lastQueryTimestamp = Date.now()
return new Promise((resolve, reject) => {
this.txns.commit()
this.socket.once('close', (code, reason) => {
switch (reason) {
case "Closing connection after 'commit' and 'abort'":
return resolve({ code, reason } as COMMIT_OR_ABORT_TX_RESPONSE)
default:
return reject({ code, reason })
}
})
this.socket.once('error', reject)
})
}
async abort(): Promise<COMMIT_OR_ABORT_TX_RESPONSE> {
// Set the last query time to now, used for throttling to ensure functionality
this.lastQueryTimestamp = Date.now()
return new Promise((resolve, reject) => {
this.txns.abort()
this.socket.once('close', (code, reason) => {
switch (reason) {
case "Closing connection after 'commit' and 'abort'":
return resolve({ code, reason } as COMMIT_OR_ABORT_TX_RESPONSE)
default:
return reject({ code, reason })
}
})
this.socket.once('error', reject)
})
}
}
const tx = new TxClient('ws://localhost:8080/v1/graphql')
async function main() {
try {
await tx.init()
const op1 = await tx.execute({ query: getUserQuery })
console.log('op 1', op1)
const op2 = await tx.execute({
query: changeUserNameQuery(1, 'new name'),
})
console.log('op 2', op2)
const op3 = await tx.execute({ query: changeUserNumericMutation(2, 'asd') })
console.log('op 3', op3)
} catch (err) {
await tx.abort()
console.log('GOT ERROR IN MAIN:', err)
} finally {
await tx.commit()
}
}
main()
export enum TxMessageType {
INIT = 'init',
EXECUTE = 'execute',
COMMIT = 'commit',
ABORT = 'abort',
}
export interface GQLWebsocketTxPayload {
type: TxMessageType
}
export interface InitTxPayload extends GQLWebsocketTxPayload {
type: TxMessageType.INIT
payload: {
/** @default 'read-committed' */
isolation?: 'read-committed' | 'serializable' | 'repeatable-read'
headers?: Record<string, string>
}
}
export type INIT_TX_SUCCESS_RESPONSE = {
type: 'initialised'
}
export type INIT_TX_ERROR_RESPONSE = {
type: 'init_error'
payload: 'transaction cannot be initialised more than once in a single WebSocket session'
}
export type INIT_TX_RESPONSE = INIT_TX_SUCCESS_RESPONSE | INIT_TX_ERROR_RESPONSE
export interface GraphQLOperation {
operationName?: string
query: string
variables?: Record<string, any>
}
export interface ExecuteTxPayload extends GQLWebsocketTxPayload {
type: TxMessageType.EXECUTE
payload: {
request_id?: string
query: GraphQLOperation
}
}
export type EXECUTE_TX_SUCCESS_RESPONSE = {
type: 'data'
id: string
request_id: string
payload: {
data: {
[key: string]: object | object[]
}
}
}
export type EXECUTE_TX_ERROR_RESPONSE = {
type: 'error'
id: string
request_id: string
errors: Array<{
message: string
extension: {
code: string
path: string
}
}>
}
export type EXECUTE_TX_RESPONSE =
| EXECUTE_TX_SUCCESS_RESPONSE
| EXECUTE_TX_ERROR_RESPONSE
export interface CommitTxPayload extends GQLWebsocketTxPayload {
type: TxMessageType.COMMIT
}
export interface AbortTxPayload extends GQLWebsocketTxPayload {
type: TxMessageType.ABORT
}
export type COMMIT_OR_ABORT_TX_RESPONSE = {
code: 1000
reason: "Closing connection after 'commit' and 'abort'"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment