Skip to content

Instantly share code, notes, and snippets.

@tstelzer
Last active May 6, 2022 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tstelzer/8e1dc14a8c390fbdb825e6638b31ef73 to your computer and use it in GitHub Desktop.
Save tstelzer/8e1dc14a8c390fbdb825e6638b31ef73 to your computer and use it in GitHub Desktop.
import * as T from '@effect/core';
import {pipeEffect} from '@effect/core/io/Effect';
import {millis} from '@tsplus/stdlib/data/Duration';
import fs from 'fs';
import fetch from 'node-fetch';
import os from 'os';
import path from 'path';
import {fileURLToPath} from 'url';
const formatUsage = (n: number): string => `${(n / 2 ** 20).toFixed(2)} MB`;
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const LOG_FILE = path.join(__dirname, '../memory-usage.csv');
fs.writeFileSync(LOG_FILE, 'heap usage in mb' + os.EOL);
function noop() {
return undefined;
}
export function main() {
const i = setInterval(
() =>
fs.appendFile(
LOG_FILE,
Math.round(process.memoryUsage().heapUsed / 1024 / 1024) +
os.EOL,
noop,
),
200,
);
const delay = T.effect.delay(millis(1000));
function getRandomData() {
return fetch('https://baconipsum.com/api/?type=meat-and-filler&paras=1')
.then(r => r.json())
.then(a => a[0]);
}
return pipeEffect(
T.stream.repeatEffect(() => T.effect.promise(getRandomData)),
T.stream.mapEffect(a => delay(T.effect.succeed(() => a))),
T.stream.tap(a => {
console.log(a);
return T.effect.succeed(() => undefined);
}),
T.stream.runDrain,
T.effect.tap(() => {
console.log(formatUsage(process.memoryUsage().heapUsed));
clearInterval(i);
return T.effect.succeed(() => undefined);
}),
T.effect.unsafeRunPromiseExit,
);
}
main();
import * as T from '@effect/core';
import {pipeEffect} from '@effect/core/io/Effect';
import {millis} from '@tsplus/stdlib/data/Duration';
import events from 'events';
import fs from 'fs';
import os from 'os';
import path from 'path';
import readline from 'readline';
import {fileURLToPath} from 'url';
const formatUsage = (n: number): string => `${(n / 2 ** 20).toFixed(2)} MB`;
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const LOG_FILE = path.join(__dirname, '../memory-usage.csv');
fs.writeFileSync(LOG_FILE, 'heap usage in mb' + os.EOL);
function noop() {
return undefined;
}
const rl = readline.createInterface({
input: fs.createReadStream('file-200MB.txt'),
crlfDelay: Infinity,
});
const delay = T.effect.delay(millis(10));
export function main() {
const i = setInterval(
() =>
fs.appendFile(
LOG_FILE,
Math.round(process.memoryUsage().heapUsed / 1024 / 1024) +
os.EOL,
noop,
),
200,
);
return pipeEffect(
T.stream.async<unknown, never, string>(emit => {
console.log('start');
rl.on('line', line => emit(T.effect.succeed(() => line)));
events.once(rl, 'close').then(() => {
emit.end();
console.log(formatUsage(process.memoryUsage().heapUsed));
clearInterval(i);
console.log('end');
});
}),
T.stream.mapEffect(a => delay(T.effect.succeed(() => a))),
T.stream.runDrain,
T.effect.unsafeRunPromiseExit,
);
}
main();
import * as T from '@effect/core';
import {pipeEffect} from '@effect/core/io/Effect';
import events from 'events';
import fs from 'fs';
import os from 'os';
import path from 'path';
import readline from 'readline';
import {fileURLToPath} from 'url';
const formatUsage = (n: number): string => `${(n / 2 ** 20).toFixed(2)} MB`;
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const LOG_FILE = path.join(__dirname, '../memory-usage.csv');
fs.writeFileSync(LOG_FILE, 'heap usage in mb' + os.EOL);
function noop() {
return undefined;
}
export function main() {
const i = setInterval(
() =>
fs.appendFile(
LOG_FILE,
Math.round(process.memoryUsage().heapUsed / 1024 / 1024) +
os.EOL,
noop,
),
10,
);
return pipeEffect(
T.stream.async<unknown, never, string>(emit => {
console.log('start');
const rl = readline.createInterface({
input: fs.createReadStream('file-50MB.txt'),
crlfDelay: Infinity,
});
rl.on('line', line => emit(T.effect.succeed(() => line)));
events.once(rl, 'close').then(() => {
emit.end();
console.log('end');
});
}),
T.stream.runDrain,
T.effect.tap(() => {
console.log(formatUsage(process.memoryUsage().heapUsed));
clearInterval(i);
return T.effect.succeed(() => undefined);
}),
T.effect.unsafeRunPromiseExit,
);
}
main();
import events from 'events';
import fs from 'fs';
import os from 'os';
import path from 'path';
import readline from 'readline';
import {fileURLToPath} from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const LOG_FILE = path.join(__dirname, '../memory-usage.csv');
fs.writeFileSync(LOG_FILE, 'heap usage in mb' + os.EOL);
function noop() {
return undefined;
}
export async function main() {
const i = setInterval(
() =>
fs.appendFile(
LOG_FILE,
Math.round(process.memoryUsage().heapUsed / 1024 / 1024) +
os.EOL,
noop,
),
1000,
);
const rl = readline.createInterface({
input: fs.createReadStream('file-50MB.txt'),
crlfDelay: Infinity,
});
rl.on('line', line => {
console.log(line);
});
await events.once(rl, 'close');
clearInterval(i);
console.log('end');
}
main();
import * as C from '@effect-ts/core/Collections/Immutable/Chunk';
import * as N from '@effect-ts/node/Runtime';
import * as S from '@effect-ts/core/Effect/Experimental/Stream';
import * as T from '@effect-ts/core/Effect';
import {pipe} from '@effect-ts/core/Function';
import events from 'events';
import fs from 'fs';
import os from 'os';
import path from 'path';
import readline from 'readline';
import {fileURLToPath} from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const LOG_FILE = path.join(__dirname, '../memory-usage.csv');
fs.writeFileSync(LOG_FILE, 'heap usage in mb' + os.EOL);
function noop() {
return undefined;
}
export function main() {
const i = setInterval(
() =>
fs.appendFile(
LOG_FILE,
Math.round(process.memoryUsage().heapUsed / 1024 / 1024) +
os.EOL,
noop,
),
10,
);
return pipe(
S.async<unknown, never, string>(emit => {
console.log('start');
const rl = readline.createInterface({
input: fs.createReadStream('file-50MB.txt'),
crlfDelay: Infinity,
});
rl.on('line', line => emit(T.succeed(C.from([line]))));
events.once(rl, 'close').then(() => {
emit.end();
console.log('end');
});
}),
S.runDrain,
T.tap(() => {
clearInterval(i);
return T.succeed(undefined);
}),
N.runMain,
);
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment