Skip to content

Instantly share code, notes, and snippets.

@jesatrix
Last active August 3, 2021 19:44
Embed
What would you like to do?
Streams Sample
import { pipeline, Transform, TransformCallback } from 'stream';
import { promisify } from 'util';
import { env } from 'process';
import axios, {
AxiosInstance,
AxiosRequestConfig,
AxiosResponse,
ResponseType,
} from 'axios';
import * as JSONStream from 'jsonstream-next';
import * as humanizeDuration from 'humanize-duration';
import * as dotenv from 'dotenv';
export class AxiosService {
private get dbUserName(): string {
return env.DB_USERNAME || '';
}
private get dbPassword(): string {
return env.DB_PASSWORD || '';
}
private get dbURL(): string {
return env.DB_URL || '';
}
config: AxiosRequestConfig = {
auth: {
username: this.dbUserName,
password: this.dbPassword,
},
headers: { 'User-Agent': `JDP-streams-example` },
withCredentials: true,
baseURL: this.dbURL,
timeout: 67 * 1000,
};
client: AxiosInstance;
async get<T>(
url: string,
responseType: ResponseType,
params?: any,
): Promise<AxiosResponse<T>> {
this.config.responseType = responseType;
this.config.params = params;
return await this.client.get<T>(url, this.config);
}
createInstance() {
this.client = axios.create(this.config);
console.log(`${new Date().toISOString()} - Axios client initialized.`);
}
}
export class CounterStream extends Transform {
count = 0;
constructor() {
super({ objectMode: true });
}
_transform(chunk: any, _encoding: string, callback: TransformCallback) {
this.count++;
console.log(this.count);
console.log(`${new Date().toISOString()} - Before:`);
console.log(chunk);
callback(null, chunk);
}
}
export class ItemTransformStream extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk: any, _encoding: string, callback: TransformCallback) {
if (chunk.myId) {
chunk.newId = chunk.myId;
delete chunk.myId;
}
if (chunk.name) {
chunk.myName = chunk.name;
delete chunk.name;
}
console.log(`${new Date().toISOString()} - After:`);
console.log(chunk);
callback();
}
_final(callback: TransformCallback) {
callback();
}
}
export function getDuration(startTime: Date) {
const startMs = startTime.getTime();
const endTime = new Date();
const endMs = endTime.getTime();
const value = endMs - startMs;
const human = humanizeDuration(value);
return {
endTime,
human,
};
}
export async function runDataLoadStream(): Promise<void> {
const startTime = new Date();
const title = 'ItemLoader';
try {
console.log(`${startTime.toISOString()} - Begin ${title}`);
console.log('--------------------');
console.log(`${new Date().toISOString()} - Setup Axios service`);
dotenv.config();
const axiosService = new AxiosService();
axiosService.createInstance();
console.log(`${new Date().toISOString()} - Setup pipeline and streams`);
const pipelineAsync = promisify(pipeline);
const counterStream = new CounterStream();
const itemTransformStream = new ItemTransformStream();
console.log(`${new Date().toISOString()} - Get offerings`);
const itemStream = await axiosService.get<any[]>(
`/items`,
'stream',
);
await pipelineAsync(
// @ts-ignore
itemStream.data,
JSONStream.parse('*'),
counterStream,
itemTransformStream,
);
console.log('--------------------');
const jobTime = getDuration(startTime);
console.log(`${jobTime.endTime.toISOString()} - Completed ${title}`);
console.info('Total items loaded:', counterStream.count);
console.info('Duration was', jobTime.human);
} catch (err) {
console.error(title, err);
}
process.exit(0);
}
export async function runDataLoadJSON(): Promise<void> {
const startTime = new Date();
const title = 'ItemLoader';
try {
console.log(`${startTime.toISOString()} - Begin ${title}`);
console.log('--------------------');
console.log(`${new Date().toISOString()} - Setup Axios service`);
dotenv.config();
const axiosService = new AxiosService();
axiosService.createInstance();
console.log(`${new Date().toISOString()} - Get items`);
const itemJson = await axiosService.get<any[]>(
`/items`,
'json',
);
console.log('--------------------');
const jobTime = getDuration(startTime);
console.log(`${jobTime.endTime.toISOString()} - Completed ${title}`);
console.info('Total items loaded:', itemJson.data.length);
console.info('Duration was', jobTime.human);
} catch (err) {
console.error(title, err);
}
process.exit(0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment