Skip to content

Instantly share code, notes, and snippets.

@luo3house
Last active April 29, 2022 06:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luo3house/2e31cc7e5e57f6a3e3546f3ebde5d9ce to your computer and use it in GitHub Desktop.
Save luo3house/2e31cc7e5e57f6a3e3546f3ebde5d9ce to your computer and use it in GitHub Desktop.
promisify an event-based channel, easy to build context-isolated apis
/**
* @name promisifychannel
* @brief Promisify an event-based channel
* @version 2
* @license MIT
* @copyright luo3house
* @requires JSON, ES2015 or later (Promise lib)
*/
// util types
export type KeyType = string
export type EventName = string
export type PayloadType = 'cmd' | 'rsp' | 'event'
export type KeyGenerator<Cmd = any> = (cmd: Cmd) => string
export type EventEmitterLike<Rsp = any> = {
once(key: KeyType, callback: (rsp: Rsp) => void): { remove: () => void }
emit(key: KeyType, rsp: Rsp): void
clear(): void
}
// major types
export type WrappedCmd<Cmd = any> = { key: KeyType; cmd: Cmd }
export type WrappedRsp<Rsp = any> = { key: KeyType; rsp: Rsp }
export type WrappedEvent<Event = any> = { name: EventName; event: Event }
export type PreDecoded<Cmd, Rsp, Event> = WrappedCmd<Cmd> | WrappedRsp<Rsp> | WrappedEvent<Event>
export type ProbeFn<Raw, Cmd, Rsp, Event> = (raw: Raw) => Promise<{
type: PayloadType
// if preDecoded is not empty, decode(Cmd|Rsp|Event) will not be called
preDecoded: PreDecoded<Cmd, Rsp, Event> | null | undefined
}>
export declare module ChannelEncoding {
export type CmdEncoder<Raw, Cmd> = (key: KeyType, cmd: Cmd) => Promise<Raw>
export type CmdDecoder<Raw, Cmd> = (raw: Raw) => Promise<WrappedCmd<Cmd>>
export type RspEncoder<Raw, Rsp> = (key: KeyType, rsp: Rsp) => Promise<Raw>
export type RspDecoder<Raw, Rsp> = (raw: Raw) => Promise<WrappedRsp<Rsp>>
export type EventEncoder<Raw, Event> = (key: KeyType, ev: Event) => Promise<Raw>
export type EventDecoder<Raw, Event> = (raw: Raw) => Promise<WrappedEvent<Event>>
}
export declare module ChannelMessaging {
export type SendRawFn<Raw, Cmd, Rsp, Event> = (raw: Raw, payload: WrappedCmd<Cmd> | WrappedRsp<Rsp> | WrappedEvent<Event>) => Promise<unknown>
export type HandleRawFn<Raw> = (raw: Raw) => void
export type HandleCmdFunc<Cmd, Channel> = (key: KeyType, cmd: Cmd, channel: Channel) => void
export type HandleEventFunc<Event, Channel> = (name: EventName, event: Event, channel: Channel) => void
}
export type ChannelCreateOptions<Raw = any, Cmd = any, Rsp = any, Event = any> = {
/** probe an incoming raw data is cmd or rsp */
probe: ProbeFn<Raw, Cmd, Rsp, Event>
/** encode a cmd */
encodeCmd: ChannelEncoding.CmdEncoder<Raw, Cmd>
/** decode a cmd */
decodeCmd: ChannelEncoding.CmdDecoder<Raw, Cmd>
/** encode a rsp */
encodeRsp: ChannelEncoding.RspEncoder<Raw, Rsp>
/** decode a rsp */
decodeRsp: ChannelEncoding.RspDecoder<Raw, Rsp>
/** eecode an event */
encodeEvent: ChannelEncoding.EventEncoder<Raw, Event>
/** decode an event */
decodeEvent: ChannelEncoding.EventDecoder<Raw, Event>
/** provide a function to send encoded data */
sendRaw: ChannelMessaging.SendRawFn<Raw, Cmd, Rsp, Event>
/** provide a function to handle incoming cmd */
handleCmdFunc?: ChannelMessaging.HandleCmdFunc<Cmd, Channel<Raw, Cmd, Rsp, Event>>
/** provide a function to handle event */
handleEventFunc?: ChannelMessaging.HandleEventFunc<Event, Channel<Raw, Cmd, Rsp, Event>>
/** catch internal error */
onHandleError?: (err: any) => void
/** event emitter like object to manage response callbacks, default use internal */
rspEventEmitter?: EventEmitterLike<Rsp>
/** generate a requets key (promise key), should be unique, default use internal */
keyGenerator?: KeyGenerator<Cmd>
}
export type Channel<Raw = any, Cmd = any, Rsp = any, Event = any> = {
/** handle an arbitrary raw */
handleRaw(raw: Raw): void
/** send a stateless command and get the wrapped cmd, resolved once sendRaw resolves */
sendCmd(cmd: Cmd): Promise<WrappedCmd<Cmd>>
/** send back a response */
sendRsp(key: KeyType, rsp: Rsp): Promise<unknown>
/** send a stateless event, resolved once sendRaw resolves */
sendEvent(name: EventName, event: Event): Promise<unknown>
/** send a command and wait for the response */
invoke(cmd: Cmd): Promise<Rsp>
/** send a command and wait for the response, with a deadline */
invokeWithTimeout(cmd: Cmd, ms: number): Promise<Rsp>
/** clean response callback list */
clean(): void
}
// implements
export function newSimpleKeyGenerator<Cmd = any>(): KeyGenerator<Cmd> {
let i = 1
return () => `${i++}`
}
function newSimpleEventEmitterLike<Rsp>(): EventEmitterLike<Rsp> {
type Callback = (rsp: Rsp) => void
let callbacks: Record<string, Callback> = {}
const removeByKey = (key: KeyType) => delete callbacks[key]
return {
once(key: KeyType, callback: Callback) {
const remove = () => removeByKey(key)
callbacks[key] = (rsp) => {
callback(rsp)
}
return { remove }
},
emit(key: KeyType, rsp: Rsp) {
const cb = callbacks[key]
cb?.(rsp)
},
clear() {
callbacks = {}
},
}
}
export module BuiltinChannelEncoding {
/**
* string based channel encoding
* @example createChannel({ ...BuiltinChannelEncoding.JsonString, ... })
*/
export const JsonString: Pick<ChannelCreateOptions<string, any, any, any>,
| 'probe'
| 'encodeCmd'
| 'decodeCmd'
| 'encodeRsp'
| 'decodeRsp'
| 'encodeEvent'
| 'decodeEvent'> = {
probe: (raw) => new Promise((resolve, reject) => {
try {
const preDecoded = JSON.parse(`${raw}`)
if (preDecoded.cmd) {
resolve({ type: 'cmd', preDecoded })
} else if (preDecoded.rsp) {
resolve({ type: 'rsp', preDecoded })
} else if (preDecoded.event) {
resolve({ type: 'event', preDecoded })
}
} catch (err) {
reject(`error JSON.parse raw: ${JSON.stringify(err)}`)
}
}),
encodeCmd: (key, cmd) => new Promise((resolve) => resolve(JSON.stringify({ key, cmd }))),
encodeRsp: (key, rsp) => new Promise((resolve) => resolve(JSON.stringify({ key, rsp }))),
encodeEvent: (name, event) => new Promise((resolve) => resolve(JSON.stringify({ name, event }))),
decodeCmd: (raw) => new Promise((resolve, reject) => {
try {
resolve(JSON.parse(`${raw}`))
} catch (parseErr) {
reject(`${parseErr} (payload: ${raw})`)
}
}),
decodeRsp: (raw) => new Promise((resolve, reject) => {
try {
resolve(JSON.parse(`${raw}`))
} catch (parseErr) {
reject(`${parseErr} (payload: ${raw})`)
}
}),
decodeEvent: (raw) => new Promise((resolve, reject) => {
try {
resolve(JSON.parse(`${raw}`))
} catch (parseErr) {
reject(`${parseErr} (payload: ${raw})`)
}
}),
}
}
export function createChannel<Raw = any, Cmd = any, Rsp = any, Event = any>
(options: ChannelCreateOptions<Raw, Cmd, Rsp, Event>): Channel<Raw, Cmd, Rsp, Event> {
const {
probe,
encodeCmd,
decodeCmd,
encodeRsp,
decodeRsp,
encodeEvent,
decodeEvent,
sendRaw,
handleCmdFunc,
handleEventFunc,
onHandleError,
} = options
const to_str = JSON.stringify
const keyGenerator = options.keyGenerator ?? newSimpleKeyGenerator()
const rspEventEmitter = options.rspEventEmitter ?? newSimpleEventEmitterLike<Rsp>()
const channel: Channel<Raw, Cmd, Rsp, Event> = {
handleRaw(raw: Raw) {
probe(raw)
.then((probeResult) => {
switch (probeResult.type) {
case 'cmd': {
if (probeResult.preDecoded) {
const wrappedCmd = probeResult.preDecoded as WrappedCmd<Cmd>
handleCmdFunc?.(wrappedCmd.key, wrappedCmd.cmd, channel)
} else {
decodeCmd(raw)
.then((wrappedCmd) => handleCmdFunc?.(wrappedCmd.key, wrappedCmd.cmd, channel))
.catch((err) => onHandleError?.(`error decode cmd: ${to_str(err)}, raw: ${raw}`))
}
break
}
case 'rsp': {
if (probeResult.preDecoded) {
const wrappedRsp = probeResult.preDecoded as WrappedRsp<Rsp>
rspEventEmitter.emit(wrappedRsp.key, wrappedRsp.rsp)
} else {
decodeRsp(raw)
.then((wrappedRsp) => rspEventEmitter.emit(wrappedRsp.key, wrappedRsp.rsp))
.catch((err) => onHandleError?.(`error decode rsp: ${to_str(err)}, raw: ${raw}`))
}
break
}
case 'event': {
if (probeResult.preDecoded) {
const wrappedEvent = probeResult.preDecoded as WrappedEvent<Event>
handleEventFunc?.(wrappedEvent.name, wrappedEvent.event, channel)
} else {
decodeEvent(raw)
.then(wrappedEvent => handleEventFunc?.(wrappedEvent.name, wrappedEvent.event, channel))
.catch(err => onHandleError?.(`error decode event: ${to_str(err)}, raw: ${raw}`))
}
break
}
default: {
onHandleError?.(`unknown probe result: ${to_str(probeResult.type)}`)
break
}
}
})
.catch((err) => onHandleError?.(`error probe raw: ${to_str(err)}, raw: ${to_str(raw)}`))
},
invoke(cmd: Cmd) {
return new Promise<Rsp>((resolve, reject) => {
this.sendCmd(cmd)
.then(({ key }) => rspEventEmitter.once(key, (rsp) => resolve(rsp)))
.catch(reject)
})
},
invokeWithTimeout(cmd: Cmd, millis: number) {
return new Promise<Rsp>((resolve, reject) => {
this.sendCmd(cmd)
.then(({ key }) => {
let subscription: ReturnType<typeof rspEventEmitter.once>
const flag = setTimeout(() => {
subscription?.remove?.()
reject('timeout')
}, millis)
subscription = rspEventEmitter.once(key, (rsp) => {
clearTimeout(flag)
resolve(rsp)
})
})
.catch(reject)
})
},
sendCmd(cmd: Cmd) {
return new Promise(
(resolve, reject) => {
const key = keyGenerator(cmd)
encodeCmd(key, cmd)
.then((cmdRaw) =>
sendRaw(cmdRaw, { key, cmd })
.then(() => resolve({ key,cmd }))
.catch(reject)
)
.catch(reject)
}
)
},
sendRsp(key: KeyType, rsp: Rsp) {
return new Promise((resolve, reject) => {
encodeRsp(key, rsp)
.then((rspRaw) => sendRaw(rspRaw, { key, rsp }).then(resolve).catch(reject))
.catch(reject)
})
},
sendEvent(name: EventName, event: Event) {
return new Promise((resolve, reject) => {
encodeEvent(name, event)
.then((eventRaw) => sendRaw(eventRaw, { name, event }).then(resolve).catch(reject))
.catch(reject)
})
},
clean: () => rspEventEmitter.clear(),
}
return channel
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment