Skip to content

Instantly share code, notes, and snippets.

@turlockmike
Last active September 12, 2023 19:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save turlockmike/936b3c9cab4a1dd272e7645dce5d0b88 to your computer and use it in GitHub Desktop.
Save turlockmike/936b3c9cab4a1dd272e7645dce5d0b88 to your computer and use it in GitHub Desktop.
Event Handlers
//filepath: src/handlers/messages/contract-created.ts
import {vaidationMiddleware} from '@helloextend/api-utils'
//Handle a single Cloud Event.
function contractCreatedHandler(ev: CloudEvent, ctx: Context) {
const messageBody = JSON.parse(ev.message.body) //Or something. Whatever the cloudEvent specification says
const isRetry = ctx.get<number>('event.metadata.retries') > 0
const lastRetry = ctx.get<date>('event.metadata.lastTriedAt')
// Do Something
return new CloudEventResponse.ok()
}
//Add valdiation middleware
const mid = validationMiddleware({
req: {
schema: V1Schema
}
})
const messageHandler = smerf.cloudEvents().use(mid).handler(contractCreatedHandler)
const retryHandler = messageHandler
//const dlqHandler = some other handler with the same
export default {
handler:
smerf.use(mid).handler(contractCreatedHandler)
// Retry Logic would be based on the status code returned. The backoff timing would be customized in the smerf.config.ts file, same with dlq settings.
// in smerf.config.ts
```{
events: {
'contracts-created': {
backoff: [5, 10, 20, 30]
dlq: true,
schedule: '*/5 * * * *'
}
}
```}
/////////////////////////////////////////WIP//////////////////////////////////////////////////////////
//filepath: src/handlers/topics/contracts.ts
// For Handling an entire topic
//Filename /topics/contract.ts
function batchHandler(cloudEvents, context) {
foreach(cloudEvent in cloudEvents) {
switch(cloudEvent.type) {
case "ContractsCreated":
//Do Something
case "ContractsUpdated":
//Do Something
context.retrySyncEmitter.emit(cloudEvent) // I think providing these emmitters in context would be nice.
}
}
return BatchResponse.ok() // or BatchResponse.failed()
},)
export const createRetryHandler<ContractsCreated | ContractsUpdated>((cloudEvent, context) => {
switch(cloudEvent.type) {
case "ContractsCreated":
//Do Something
case "ContractsUpdated":
//Do Something
}
return CloudEventResponse.ok() // Or CloudEventResponse.failed()
})
export const createDLQHandler<ContractsCreated | ContractsUpdated>((cloudEvent, context) => {
switch(cloudEvent.type) {
case "ContractsCreated":
//Do Something
case "ContractsUpdated":
//Do Something
}
return CloudEventResponse.ok()
})
// Some configuration can happen in the config
// in smerf.config.ts
```{
topics: {
'contracts': {
backoff: [5, 10, 20, 30],
dlq: true,
// etc.
}
}
```}
@robpott
Copy link

robpott commented Aug 4, 2023

I would avoid implementing split batch in kafka

return new TopicResponse(successfulEvents, failedEvents)

@HeatherFlux
Copy link

I like the folder pathing strategy separating topics from events.

@turlockmike
Copy link
Author

turlockmike commented Aug 8, 2023

So, the intention behind the TopicResponse.ok(successfulEvents, failedEvents) would be that it would return a successful response to adapter so it can continue processing, and the adapter would determine what to do with failed events. This is just a high level interface, a lot of the complexity would be the adapters responsibility, but we allow devs to specify which events were successful vs failed, write retry and dlq handlers to support those events, but leave the adapter the responsibility for determining the workflow pattern and integration with infrastructure components. In the case of kafka, we could implement whatever pattern you think makes the most sense and covers the most use cases.

@robpott
Copy link

robpott commented Sep 12, 2023

code would be something like...

function contractCreatedHandler(
  ev: CloudEvent<"contracts-created", V1Schema>,
  ctx: Context
) {
  const messageBody = ev.data;

  const isRetry = ev.failure?.attemptCount > 0;
  const lastRetry = ev.failure?.timestamp;
  // Do Something
  return new CloudEventResponse.ok();
}

@robpott
Copy link

robpott commented Sep 12, 2023

what is the retryEmitter doing? Is it retry sending OR send the event for retry of processing?

context.retrySyncEmitter.emit(cloudEvent)

@robpott
Copy link

robpott commented Sep 12, 2023

should cloudEvents be an async iterable OR array is another debate... async iterable would allow for back pressure modeling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment