Skip to content

Instantly share code, notes, and snippets.

@stefandanaita
Last active January 7, 2023 19:26
Show Gist options
  • Save stefandanaita/88c4d8b187400d5b07524cd0a12843b2 to your computer and use it in GitHub Desktop.
Save stefandanaita/88c4d8b187400d5b07524cd0a12843b2 to your computer and use it in GitHub Desktop.
Code for a worker that can receive logpush http requests and parse the contents using streams
// SPDX-License-Identifier: MIT-0
export interface Env {
}
export default {
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
if (!request.body) {
return new Response("Oops", { status: 500 });
}
const events = request.body
.pipeThrough(new DecompressionStream("gzip"))
.pipeThrough(new TextDecoderStream())
.pipeThrough(readlineStream());
for await (const event of streamAsyncIterator(events)) {
// Do stuff with the event
const parsedEvent = JSON.parse(event);
console.log(parsedEvent.Event.RayID);
}
return new Response("Hello World!");
},
};
async function* streamAsyncIterator(stream: ReadableStream) {
// Get a lock on the stream
const reader = stream.getReader();
try {
while (true) {
// Read from the stream
const { done, value } = await reader.read();
// Exit if we're done
if (done) {
return;
}
// Else yield the chunk
yield value;
}
}
finally {
reader.releaseLock();
}
}
interface ReadlineTransformerOptions {
skipEmpty: boolean;
}
const defaultOptions: ReadlineTransformerOptions = {
skipEmpty: true,
};
export class ReadlineTransformer implements Transformer {
options: ReadlineTransformerOptions;
lastString: string;
separator: RegExp;
public constructor(options?: ReadlineTransformerOptions) {
this.options = { ...defaultOptions, ...options };
this.lastString = '';
this.separator = /[\r\n]+/;
}
public transform(chunk: string, controller: TransformStreamDefaultController<string>) {
// prepend with previous string (empty if none)
const str = `${this.lastString}${chunk}`;
// Extract lines from chunk
const lines = str.split(this.separator);
// Save last line as it might be incomplete
this.lastString = (lines.pop() || '').trim();
// eslint-disable-next-line no-restricted-syntax
for (const line of lines) {
const d = this.options.skipEmpty ? line.trim() : line;
if (d.length > 0) controller.enqueue(d);
}
}
public flush(controller: TransformStreamDefaultController<string>) {
if (this.lastString.length > 0) controller.enqueue(this.lastString);
}
}
export const readlineStream = () => new TransformStream(new ReadlineTransformer());
@ignoramous
Copy link

Please consider assigning a license, such as MIT-0 or 0-BSD? Thanks.

@stefandanaita
Copy link
Author

Please consider assigning a license, such as MIT-0 or 0-BSD? Thanks.

✔️

@ignoramous
Copy link

Nice. Thanks (: In case it is helpful, I wrote up a javascript impl based on this that can either filter on the encoded LogPush stream (u8) as-is or filter on its decoded stream (str): https://github.com/serverless-dns/serverless-dns/blob/7a6657afb8177bd7368b8ea5195fc974fafe69e5/src/commons/lf-transformer.js#L14-L175

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment