Skip to content

Instantly share code, notes, and snippets.

@hubgit
Last active December 10, 2023 22:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hubgit/021228f33c5938be71e4346415553866 to your computer and use it in GitHub Desktop.
Save hubgit/021228f33c5938be71e4346415553866 to your computer and use it in GitHub Desktop.
Reader and Writer web streams for Deno
import { TextLineStream } from 'https://deno.land/std@0.153.0/streams/mod.ts'
// const input = await jsonLinesReader('input.jsonl.gz')
// const output = await jsonLinesWriter('output.jsonl.gz')
// for await (const item of input) {
//// do something
// await output.write(item)
// }
export const jsonLinesReader = async <T>(path: string) => {
const file = await Deno.open(path, {
read: true,
})
return file.readable
.pipeThrough(new DecompressionStream('gzip'))
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream())
.pipeThrough(
new TransformStream<string, T>({
transform(chunk, controller) {
if (chunk !== '\n') {
controller.enqueue(JSON.parse(chunk))
}
},
})
)
}
export const jsonArrayReader = async <T>(path: string) => {
const file = await Deno.open(path, {
read: true,
})
const chunks: string[] = []
return file.readable
.pipeThrough(new DecompressionStream('gzip'))
.pipeThrough(new TextDecoderStream())
.pipeThrough(
new TransformStream<string, T>({
transform: (chunk) => {
chunks.push(chunk)
},
flush: (controller) => {
const data = JSON.parse(chunks.join(''))
for (const item of data.items) {
controller.enqueue(item)
}
},
})
)
}
export const jsonLinesWriter = async (path: string) => {
const file = await Deno.open(path, {
create: true,
write: true,
truncate: true,
})
const stream = new TransformStream()
stream.readable
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(JSON.stringify(chunk))
controller.enqueue('\n')
},
})
)
.pipeThrough(new TextEncoderStream())
.pipeThrough(new CompressionStream('gzip'))
.pipeTo(file.writable)
return stream.writable.getWriter()
}
export const tsvLinesWriter = async (path: string) => {
const file = await Deno.open(path, {
create: true,
write: true,
truncate: true,
})
const stream = new TransformStream()
stream.readable
.pipeThrough(
new TransformStream<string[], string>({
transform: (items, controller) => {
controller.enqueue(items.join('\t'))
controller.enqueue('\n')
},
})
)
.pipeThrough(new TextEncoderStream())
.pipeThrough(new CompressionStream('gzip'))
.pipeTo(file.writable)
return stream.writable.getWriter()
}
export const textWriter = async (path: string) => {
const file = await Deno.open(path, {
create: true,
write: true,
truncate: true,
})
const stream = new TransformStream()
stream.readable
.pipeThrough(new TextEncoderStream())
.pipeThrough(new CompressionStream('gzip'))
.pipeTo(file.writable)
return stream.writable.getWriter()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment