Last active
September 12, 2023 19:11
-
-
Save turlockmike/936b3c9cab4a1dd272e7645dce5d0b88 to your computer and use it in GitHub Desktop.
Event Handlers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//filepath: src/handlers/messages/contract-created.ts | |
import {vaidationMiddleware} from '@helloextend/api-utils' | |
//Handle a single Cloud Event. | |
function contractCreatedHandler(ev: CloudEvent, ctx: Context) { | |
const messageBody = JSON.parse(ev.message.body) //Or something. Whatever the cloudEvent specification says | |
const isRetry = ctx.get<number>('event.metadata.retries') > 0 | |
const lastRetry = ctx.get<date>('event.metadata.lastTriedAt') | |
// Do Something | |
return new CloudEventResponse.ok() | |
} | |
//Add valdiation middleware | |
const mid = validationMiddleware({ | |
req: { | |
schema: V1Schema | |
} | |
}) | |
const messageHandler = smerf.cloudEvents().use(mid).handler(contractCreatedHandler) | |
const retryHandler = messageHandler | |
//const dlqHandler = some other handler with the same | |
export default { | |
handler: | |
smerf.use(mid).handler(contractCreatedHandler) | |
// Retry Logic would be based on the status code returned. The backoff timing would be customized in the smerf.config.ts file, same with dlq settings. | |
// in smerf.config.ts | |
```{ | |
events: { | |
'contracts-created': { | |
backoff: [5, 10, 20, 30] | |
dlq: true, | |
schedule: '*/5 * * * *' | |
} | |
} | |
```} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/////////////////////////////////////////WIP////////////////////////////////////////////////////////// | |
//filepath: src/handlers/topics/contracts.ts | |
// For Handling an entire topic | |
//Filename /topics/contract.ts | |
function batchHandler(cloudEvents, context) { | |
foreach(cloudEvent in cloudEvents) { | |
switch(cloudEvent.type) { | |
case "ContractsCreated": | |
//Do Something | |
case "ContractsUpdated": | |
//Do Something | |
context.retrySyncEmitter.emit(cloudEvent) // I think providing these emmitters in context would be nice. | |
} | |
} | |
return BatchResponse.ok() // or BatchResponse.failed() | |
},) | |
export const createRetryHandler<ContractsCreated | ContractsUpdated>((cloudEvent, context) => { | |
switch(cloudEvent.type) { | |
case "ContractsCreated": | |
//Do Something | |
case "ContractsUpdated": | |
//Do Something | |
} | |
return CloudEventResponse.ok() // Or CloudEventResponse.failed() | |
}) | |
export const createDLQHandler<ContractsCreated | ContractsUpdated>((cloudEvent, context) => { | |
switch(cloudEvent.type) { | |
case "ContractsCreated": | |
//Do Something | |
case "ContractsUpdated": | |
//Do Something | |
} | |
return CloudEventResponse.ok() | |
}) | |
// Some configuration can happen in the config | |
// in smerf.config.ts | |
```{ | |
topics: { | |
'contracts': { | |
backoff: [5, 10, 20, 30], | |
dlq: true, | |
// etc. | |
} | |
} | |
```} | |
code would be something like...
function contractCreatedHandler(
ev: CloudEvent<"contracts-created", V1Schema>,
ctx: Context
) {
const messageBody = ev.data;
const isRetry = ev.failure?.attemptCount > 0;
const lastRetry = ev.failure?.timestamp;
// Do Something
return new CloudEventResponse.ok();
}
what is the retryEmitter doing? Is it retry sending OR send the event for retry of processing?
context.retrySyncEmitter.emit(cloudEvent)
should cloudEvents be an async iterable OR array is another debate... async iterable would allow for back pressure modeling
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
So, the intention behind the TopicResponse.ok(successfulEvents, failedEvents) would be that it would return a successful response to adapter so it can continue processing, and the adapter would determine what to do with failed events. This is just a high level interface, a lot of the complexity would be the adapters responsibility, but we allow devs to specify which events were successful vs failed, write retry and dlq handlers to support those events, but leave the adapter the responsibility for determining the workflow pattern and integration with infrastructure components. In the case of kafka, we could implement whatever pattern you think makes the most sense and covers the most use cases.