Skip to content

Instantly share code, notes, and snippets.

@DNFcode
Last active September 5, 2024 02:39
Show Gist options
  • Save DNFcode/7e713a5c4561e0f2c6f38237b73ecfa1 to your computer and use it in GitHub Desktop.
Save DNFcode/7e713a5c4561e0f2c6f38237b73ecfa1 to your computer and use it in GitHub Desktop.
1 Billion Row Challenge Multi Thread
import * as fs from "fs";
import * as path from "path";
import { fileURLToPath } from "url";
import { Worker } from "worker_threads";
import { checkResult } from "./utils.js";
const PERFORMANCE_START = performance.now();
// File and worker configuration
const MEASUREMENTS_FILE_PATH = "./data/measurements.txt";
const WORKER_COUNT = 10;
const WORKER_SCRIPT_NAME = "worker.js";
// Get file stats and set up paths
const FILE_STATS = fs.statSync(MEASUREMENTS_FILE_PATH);
const CURRENT_FILE_PATH = fileURLToPath(import.meta.url);
const CURRENT_DIRECTORY = path.dirname(CURRENT_FILE_PATH);
const WORKER_SCRIPT_PATH = path.resolve(CURRENT_DIRECTORY, WORKER_SCRIPT_NAME);
// Calculate bytes per worker
const BYTES_PER_WORKER = Math.ceil(FILE_STATS.size / WORKER_COUNT);
const workerResults = [];
// Process final results from all workers
function processFinalResults() {
const combinedResults = {};
for (const workerResult of workerResults) {
for (const [cityName, cityData] of Object.entries(workerResult)) {
if (cityName.length > 200) {
console.warn(`Unusually long city name detected: ${cityName}`);
}
if (combinedResults[cityName]) {
// Combine data for cities found in multiple workers
combinedResults[cityName] = {
totalSum: combinedResults[cityName].totalSum + cityData.totalSum,
count: combinedResults[cityName].count + cityData.count,
max: Math.max(combinedResults[cityName].max, cityData.max),
min: Math.min(combinedResults[cityName].min, cityData.min),
};
} else {
// Add new city data
combinedResults[cityName] = cityData;
}
}
}
const totalExecutionTime = performance.now() - PERFORMANCE_START;
console.log(`Total Execution Time: ${totalExecutionTime} ms`);
checkResult(combinedResults);
}
// Start worker threads
for (let i = 0; i < WORKER_COUNT; i++) {
const startByte = i * BYTES_PER_WORKER;
const endByte = Math.min((i + 1) * BYTES_PER_WORKER - 1, FILE_STATS.size);
const worker = new Worker(WORKER_SCRIPT_PATH, {
workerData: {
fromByte: startByte,
toByte: endByte,
},
});
worker.on("message", (message) => {
workerResults.push(message);
if (workerResults.length === WORKER_COUNT) {
processFinalResults();
}
});
}
import { open } from "node:fs/promises";
import * as fs from "fs";
import { parentPort, workerData } from "worker_threads";
const performanceStart = performance.now();
// File and encoding constants
const MEASUREMENTS_FILE_PATH = "./data/measurements.txt";
const FILE_STATS = fs.statSync(MEASUREMENTS_FILE_PATH);
const ENCODING = {
LINE_BREAK: new TextEncoder().encode("\n")[0],
SEMICOLON: new TextEncoder().encode(";")[0],
DOT: new TextEncoder().encode(".")[0],
ZERO: new TextEncoder().encode("0")[0],
MINUS: new TextEncoder().encode("-")[0],
};
// Worker thread data
const { fromByte = 0, toByte = Math.ceil(FILE_STATS.size / 10) } =
workerData ?? {};
const file = await open(MEASUREMENTS_FILE_PATH, "r");
// Hash table configuration
const HASH_TABLE_SIZE = 35317;
const MAX_CITY_NAME_LENGTH = 100;
const RESULT_ENTRY_SIZE = 5;
// Initialize hash table and result arrays
const cityNameHashTable = new Uint8Array(
MAX_CITY_NAME_LENGTH * HASH_TABLE_SIZE
);
const temperatureResultArray = new Int32Array(
RESULT_ENTRY_SIZE * HASH_TABLE_SIZE
);
const fileBuffer = new Uint8Array(1024 * 512);
let bytesToRead = 0;
let remainderLength = 0;
let filePosition = fromByte;
let bufferStart = 0;
// Main file reading loop
while (true) {
const bytesLeft = toByte - filePosition;
if (bytesLeft <= 0) {
processEndOfFile();
break;
}
const readLength = Math.min(fileBuffer.length, bytesLeft + 200);
const { bytesRead } = await file.read(
fileBuffer,
remainderLength,
readLength - remainderLength,
filePosition
);
bufferStart = 0;
if (filePosition === fromByte && fromByte !== 0) {
// Skip first line if not starting from the beginning
while (fileBuffer[bufferStart] !== ENCODING.LINE_BREAK) {
bufferStart++;
}
bufferStart++;
}
bytesToRead = bytesRead + remainderLength;
if (readLength === bytesRead && filePosition + bytesToRead > toByte) {
// Ensure that the whole last line is read
bytesToRead = toByte - filePosition;
while (fileBuffer[bytesToRead - 1] !== ENCODING.LINE_BREAK) {
bytesToRead++;
}
}
processFileChunk();
filePosition += bytesRead;
}
// Process a chunk of the file
function processFileChunk() {
let cityNameStart = bufferStart;
let semicolonIndex = -1;
for (let i = bufferStart; i < bytesToRead; i++) {
if (fileBuffer[i] === ENCODING.SEMICOLON) {
semicolonIndex = i;
} else if (fileBuffer[i] === ENCODING.LINE_BREAK) {
processMeasurementLine(
cityNameStart,
semicolonIndex,
semicolonIndex + 1,
i
);
cityNameStart = i + 1;
}
}
// Handle leftover data
remainderLength = 0;
for (let i = cityNameStart; i < bytesToRead; i++) {
fileBuffer[remainderLength++] = fileBuffer[i];
}
}
// Check if city name matches in hash table
function isCityNameMatch(start, end, indexInHashTable) {
for (let i = 0; i < end - start; i++) {
if (fileBuffer[start + i] !== cityNameHashTable[indexInHashTable + i]) {
return false;
}
}
return true;
}
// Copy data between buffers
function copyBufferData(source, target, sourceStart, sourceEnd, targetOffset) {
for (let i = 0; i < sourceEnd - sourceStart; i++) {
target[targetOffset + i] = source[sourceStart + i];
}
}
// Get or create index in hash table for city name
function getHashTableIndex(cityNameStart, cityNameEnd) {
let hash = 0;
for (let i = cityNameStart; i < cityNameEnd; i++) {
hash = (hash * 97 + fileBuffer[i]) % HASH_TABLE_SIZE;
}
let indexInHashTable = hash * MAX_CITY_NAME_LENGTH;
do {
if (cityNameHashTable[indexInHashTable] === 0) {
copyBufferData(
fileBuffer,
cityNameHashTable,
cityNameStart,
cityNameEnd,
indexInHashTable
);
return hash;
}
if (isCityNameMatch(cityNameStart, cityNameEnd, indexInHashTable)) {
return hash;
}
hash = (hash + 1) % HASH_TABLE_SIZE;
indexInHashTable = hash * MAX_CITY_NAME_LENGTH;
} while (true);
}
// Convert temperature bytes to number
function parseTemperature(start, end) {
let temperature = 0;
let isNegative = false;
for (let i = start; i < end; i++) {
if (fileBuffer[i] === ENCODING.DOT) continue;
if (fileBuffer[i] === ENCODING.MINUS) {
isNegative = true;
continue;
}
temperature = temperature * 10 + fileBuffer[i] - ENCODING.ZERO;
}
return isNegative ? -temperature : temperature;
}
// Process a single measurement line
function processMeasurementLine(
cityNameStart,
cityNameEnd,
temperatureStart,
temperatureEnd
) {
const hash = getHashTableIndex(cityNameStart, cityNameEnd);
const temperature = parseTemperature(temperatureStart, temperatureEnd);
const index = hash * RESULT_ENTRY_SIZE;
if (temperatureResultArray[index + 3] === 0) {
// First entry for this city
temperatureResultArray[index] = temperature; // max
temperatureResultArray[index + 1] = temperature; // min
temperatureResultArray[index + 2] = temperature; // sum
temperatureResultArray[index + 3] = 1; // count
temperatureResultArray[index + 4] = cityNameEnd - cityNameStart; // city name length
} else {
// Update existing entry
temperatureResultArray[index] = Math.max(
temperatureResultArray[index],
temperature
);
temperatureResultArray[index + 1] = Math.min(
temperatureResultArray[index + 1],
temperature
);
temperatureResultArray[index + 2] += temperature;
temperatureResultArray[index + 3]++;
}
}
// Extract final results from the hash table
function extractFinalResults() {
const textDecoder = new TextDecoder();
const results = {};
for (let i = 0; i < HASH_TABLE_SIZE; i++) {
const index = i * RESULT_ENTRY_SIZE;
if (temperatureResultArray[index] !== 0) {
const cityNameStart = i * MAX_CITY_NAME_LENGTH;
const cityNameEnd = cityNameStart + temperatureResultArray[index + 4];
const cityName = textDecoder.decode(
cityNameHashTable.slice(cityNameStart, cityNameEnd)
);
results[cityName] = {
max: temperatureResultArray[index] / 10,
min: temperatureResultArray[index + 1] / 10,
totalSum: temperatureResultArray[index + 2] / 10,
count: temperatureResultArray[index + 3],
};
}
}
return results;
}
// Handle end of file processing
function processEndOfFile() {
const results = extractFinalResults();
const performanceEnd = performance.now();
console.log(performanceEnd - performanceStart);
file.close();
parentPort?.postMessage(results);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment