Streams Sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { pipeline, Transform, TransformCallback } from 'stream'; | |
import { promisify } from 'util'; | |
import { env } from 'process'; | |
import axios, { | |
AxiosInstance, | |
AxiosRequestConfig, | |
AxiosResponse, | |
ResponseType, | |
} from 'axios'; | |
import * as JSONStream from 'jsonstream-next'; | |
import * as humanizeDuration from 'humanize-duration'; | |
import * as dotenv from 'dotenv'; | |
export class AxiosService { | |
private get dbUserName(): string { | |
return env.DB_USERNAME || ''; | |
} | |
private get dbPassword(): string { | |
return env.DB_PASSWORD || ''; | |
} | |
private get dbURL(): string { | |
return env.DB_URL || ''; | |
} | |
config: AxiosRequestConfig = { | |
auth: { | |
username: this.dbUserName, | |
password: this.dbPassword, | |
}, | |
headers: { 'User-Agent': `JDP-streams-example` }, | |
withCredentials: true, | |
baseURL: this.dbURL, | |
timeout: 67 * 1000, | |
}; | |
client: AxiosInstance; | |
async get<T>( | |
url: string, | |
responseType: ResponseType, | |
params?: any, | |
): Promise<AxiosResponse<T>> { | |
this.config.responseType = responseType; | |
this.config.params = params; | |
return await this.client.get<T>(url, this.config); | |
} | |
createInstance() { | |
this.client = axios.create(this.config); | |
console.log(`${new Date().toISOString()} - Axios client initialized.`); | |
} | |
} | |
export class CounterStream extends Transform { | |
count = 0; | |
constructor() { | |
super({ objectMode: true }); | |
} | |
_transform(chunk: any, _encoding: string, callback: TransformCallback) { | |
this.count++; | |
console.log(this.count); | |
console.log(`${new Date().toISOString()} - Before:`); | |
console.log(chunk); | |
callback(null, chunk); | |
} | |
} | |
export class ItemTransformStream extends Transform { | |
constructor() { | |
super({ objectMode: true }); | |
} | |
_transform(chunk: any, _encoding: string, callback: TransformCallback) { | |
if (chunk.myId) { | |
chunk.newId = chunk.myId; | |
delete chunk.myId; | |
} | |
if (chunk.name) { | |
chunk.myName = chunk.name; | |
delete chunk.name; | |
} | |
console.log(`${new Date().toISOString()} - After:`); | |
console.log(chunk); | |
callback(); | |
} | |
_final(callback: TransformCallback) { | |
callback(); | |
} | |
} | |
export function getDuration(startTime: Date) { | |
const startMs = startTime.getTime(); | |
const endTime = new Date(); | |
const endMs = endTime.getTime(); | |
const value = endMs - startMs; | |
const human = humanizeDuration(value); | |
return { | |
endTime, | |
human, | |
}; | |
} | |
export async function runDataLoadStream(): Promise<void> { | |
const startTime = new Date(); | |
const title = 'ItemLoader'; | |
try { | |
console.log(`${startTime.toISOString()} - Begin ${title}`); | |
console.log('--------------------'); | |
console.log(`${new Date().toISOString()} - Setup Axios service`); | |
dotenv.config(); | |
const axiosService = new AxiosService(); | |
axiosService.createInstance(); | |
console.log(`${new Date().toISOString()} - Setup pipeline and streams`); | |
const pipelineAsync = promisify(pipeline); | |
const counterStream = new CounterStream(); | |
const itemTransformStream = new ItemTransformStream(); | |
console.log(`${new Date().toISOString()} - Get offerings`); | |
const itemStream = await axiosService.get<any[]>( | |
`/items`, | |
'stream', | |
); | |
await pipelineAsync( | |
// @ts-ignore | |
itemStream.data, | |
JSONStream.parse('*'), | |
counterStream, | |
itemTransformStream, | |
); | |
console.log('--------------------'); | |
const jobTime = getDuration(startTime); | |
console.log(`${jobTime.endTime.toISOString()} - Completed ${title}`); | |
console.info('Total items loaded:', counterStream.count); | |
console.info('Duration was', jobTime.human); | |
} catch (err) { | |
console.error(title, err); | |
} | |
process.exit(0); | |
} | |
export async function runDataLoadJSON(): Promise<void> { | |
const startTime = new Date(); | |
const title = 'ItemLoader'; | |
try { | |
console.log(`${startTime.toISOString()} - Begin ${title}`); | |
console.log('--------------------'); | |
console.log(`${new Date().toISOString()} - Setup Axios service`); | |
dotenv.config(); | |
const axiosService = new AxiosService(); | |
axiosService.createInstance(); | |
console.log(`${new Date().toISOString()} - Get items`); | |
const itemJson = await axiosService.get<any[]>( | |
`/items`, | |
'json', | |
); | |
console.log('--------------------'); | |
const jobTime = getDuration(startTime); | |
console.log(`${jobTime.endTime.toISOString()} - Completed ${title}`); | |
console.info('Total items loaded:', itemJson.data.length); | |
console.info('Duration was', jobTime.human); | |
} catch (err) { | |
console.error(title, err); | |
} | |
process.exit(0); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment