Last active
September 5, 2024 02:39
-
-
Save DNFcode/7e713a5c4561e0f2c6f38237b73ecfa1 to your computer and use it in GitHub Desktop.
1 Billion Row Challenge Multi Thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as fs from "fs"; | |
import * as path from "path"; | |
import { fileURLToPath } from "url"; | |
import { Worker } from "worker_threads"; | |
import { checkResult } from "./utils.js"; | |
const PERFORMANCE_START = performance.now(); | |
// File and worker configuration | |
const MEASUREMENTS_FILE_PATH = "./data/measurements.txt"; | |
const WORKER_COUNT = 10; | |
const WORKER_SCRIPT_NAME = "worker.js"; | |
// Get file stats and set up paths | |
const FILE_STATS = fs.statSync(MEASUREMENTS_FILE_PATH); | |
const CURRENT_FILE_PATH = fileURLToPath(import.meta.url); | |
const CURRENT_DIRECTORY = path.dirname(CURRENT_FILE_PATH); | |
const WORKER_SCRIPT_PATH = path.resolve(CURRENT_DIRECTORY, WORKER_SCRIPT_NAME); | |
// Calculate bytes per worker | |
const BYTES_PER_WORKER = Math.ceil(FILE_STATS.size / WORKER_COUNT); | |
const workerResults = []; | |
// Process final results from all workers | |
function processFinalResults() { | |
const combinedResults = {}; | |
for (const workerResult of workerResults) { | |
for (const [cityName, cityData] of Object.entries(workerResult)) { | |
if (cityName.length > 200) { | |
console.warn(`Unusually long city name detected: ${cityName}`); | |
} | |
if (combinedResults[cityName]) { | |
// Combine data for cities found in multiple workers | |
combinedResults[cityName] = { | |
totalSum: combinedResults[cityName].totalSum + cityData.totalSum, | |
count: combinedResults[cityName].count + cityData.count, | |
max: Math.max(combinedResults[cityName].max, cityData.max), | |
min: Math.min(combinedResults[cityName].min, cityData.min), | |
}; | |
} else { | |
// Add new city data | |
combinedResults[cityName] = cityData; | |
} | |
} | |
} | |
const totalExecutionTime = performance.now() - PERFORMANCE_START; | |
console.log(`Total Execution Time: ${totalExecutionTime} ms`); | |
checkResult(combinedResults); | |
} | |
// Start worker threads | |
for (let i = 0; i < WORKER_COUNT; i++) { | |
const startByte = i * BYTES_PER_WORKER; | |
const endByte = Math.min((i + 1) * BYTES_PER_WORKER - 1, FILE_STATS.size); | |
const worker = new Worker(WORKER_SCRIPT_PATH, { | |
workerData: { | |
fromByte: startByte, | |
toByte: endByte, | |
}, | |
}); | |
worker.on("message", (message) => { | |
workerResults.push(message); | |
if (workerResults.length === WORKER_COUNT) { | |
processFinalResults(); | |
} | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { open } from "node:fs/promises"; | |
import * as fs from "fs"; | |
import { parentPort, workerData } from "worker_threads"; | |
const performanceStart = performance.now(); | |
// File and encoding constants | |
const MEASUREMENTS_FILE_PATH = "./data/measurements.txt"; | |
const FILE_STATS = fs.statSync(MEASUREMENTS_FILE_PATH); | |
const ENCODING = { | |
LINE_BREAK: new TextEncoder().encode("\n")[0], | |
SEMICOLON: new TextEncoder().encode(";")[0], | |
DOT: new TextEncoder().encode(".")[0], | |
ZERO: new TextEncoder().encode("0")[0], | |
MINUS: new TextEncoder().encode("-")[0], | |
}; | |
// Worker thread data | |
const { fromByte = 0, toByte = Math.ceil(FILE_STATS.size / 10) } = | |
workerData ?? {}; | |
const file = await open(MEASUREMENTS_FILE_PATH, "r"); | |
// Hash table configuration | |
const HASH_TABLE_SIZE = 35317; | |
const MAX_CITY_NAME_LENGTH = 100; | |
const RESULT_ENTRY_SIZE = 5; | |
// Initialize hash table and result arrays | |
const cityNameHashTable = new Uint8Array( | |
MAX_CITY_NAME_LENGTH * HASH_TABLE_SIZE | |
); | |
const temperatureResultArray = new Int32Array( | |
RESULT_ENTRY_SIZE * HASH_TABLE_SIZE | |
); | |
const fileBuffer = new Uint8Array(1024 * 512); | |
let bytesToRead = 0; | |
let remainderLength = 0; | |
let filePosition = fromByte; | |
let bufferStart = 0; | |
// Main file reading loop | |
while (true) { | |
const bytesLeft = toByte - filePosition; | |
if (bytesLeft <= 0) { | |
processEndOfFile(); | |
break; | |
} | |
const readLength = Math.min(fileBuffer.length, bytesLeft + 200); | |
const { bytesRead } = await file.read( | |
fileBuffer, | |
remainderLength, | |
readLength - remainderLength, | |
filePosition | |
); | |
bufferStart = 0; | |
if (filePosition === fromByte && fromByte !== 0) { | |
// Skip first line if not starting from the beginning | |
while (fileBuffer[bufferStart] !== ENCODING.LINE_BREAK) { | |
bufferStart++; | |
} | |
bufferStart++; | |
} | |
bytesToRead = bytesRead + remainderLength; | |
if (readLength === bytesRead && filePosition + bytesToRead > toByte) { | |
// Ensure that the whole last line is read | |
bytesToRead = toByte - filePosition; | |
while (fileBuffer[bytesToRead - 1] !== ENCODING.LINE_BREAK) { | |
bytesToRead++; | |
} | |
} | |
processFileChunk(); | |
filePosition += bytesRead; | |
} | |
// Process a chunk of the file | |
function processFileChunk() { | |
let cityNameStart = bufferStart; | |
let semicolonIndex = -1; | |
for (let i = bufferStart; i < bytesToRead; i++) { | |
if (fileBuffer[i] === ENCODING.SEMICOLON) { | |
semicolonIndex = i; | |
} else if (fileBuffer[i] === ENCODING.LINE_BREAK) { | |
processMeasurementLine( | |
cityNameStart, | |
semicolonIndex, | |
semicolonIndex + 1, | |
i | |
); | |
cityNameStart = i + 1; | |
} | |
} | |
// Handle leftover data | |
remainderLength = 0; | |
for (let i = cityNameStart; i < bytesToRead; i++) { | |
fileBuffer[remainderLength++] = fileBuffer[i]; | |
} | |
} | |
// Check if city name matches in hash table | |
function isCityNameMatch(start, end, indexInHashTable) { | |
for (let i = 0; i < end - start; i++) { | |
if (fileBuffer[start + i] !== cityNameHashTable[indexInHashTable + i]) { | |
return false; | |
} | |
} | |
return true; | |
} | |
// Copy data between buffers | |
function copyBufferData(source, target, sourceStart, sourceEnd, targetOffset) { | |
for (let i = 0; i < sourceEnd - sourceStart; i++) { | |
target[targetOffset + i] = source[sourceStart + i]; | |
} | |
} | |
// Get or create index in hash table for city name | |
function getHashTableIndex(cityNameStart, cityNameEnd) { | |
let hash = 0; | |
for (let i = cityNameStart; i < cityNameEnd; i++) { | |
hash = (hash * 97 + fileBuffer[i]) % HASH_TABLE_SIZE; | |
} | |
let indexInHashTable = hash * MAX_CITY_NAME_LENGTH; | |
do { | |
if (cityNameHashTable[indexInHashTable] === 0) { | |
copyBufferData( | |
fileBuffer, | |
cityNameHashTable, | |
cityNameStart, | |
cityNameEnd, | |
indexInHashTable | |
); | |
return hash; | |
} | |
if (isCityNameMatch(cityNameStart, cityNameEnd, indexInHashTable)) { | |
return hash; | |
} | |
hash = (hash + 1) % HASH_TABLE_SIZE; | |
indexInHashTable = hash * MAX_CITY_NAME_LENGTH; | |
} while (true); | |
} | |
// Convert temperature bytes to number | |
function parseTemperature(start, end) { | |
let temperature = 0; | |
let isNegative = false; | |
for (let i = start; i < end; i++) { | |
if (fileBuffer[i] === ENCODING.DOT) continue; | |
if (fileBuffer[i] === ENCODING.MINUS) { | |
isNegative = true; | |
continue; | |
} | |
temperature = temperature * 10 + fileBuffer[i] - ENCODING.ZERO; | |
} | |
return isNegative ? -temperature : temperature; | |
} | |
// Process a single measurement line | |
function processMeasurementLine( | |
cityNameStart, | |
cityNameEnd, | |
temperatureStart, | |
temperatureEnd | |
) { | |
const hash = getHashTableIndex(cityNameStart, cityNameEnd); | |
const temperature = parseTemperature(temperatureStart, temperatureEnd); | |
const index = hash * RESULT_ENTRY_SIZE; | |
if (temperatureResultArray[index + 3] === 0) { | |
// First entry for this city | |
temperatureResultArray[index] = temperature; // max | |
temperatureResultArray[index + 1] = temperature; // min | |
temperatureResultArray[index + 2] = temperature; // sum | |
temperatureResultArray[index + 3] = 1; // count | |
temperatureResultArray[index + 4] = cityNameEnd - cityNameStart; // city name length | |
} else { | |
// Update existing entry | |
temperatureResultArray[index] = Math.max( | |
temperatureResultArray[index], | |
temperature | |
); | |
temperatureResultArray[index + 1] = Math.min( | |
temperatureResultArray[index + 1], | |
temperature | |
); | |
temperatureResultArray[index + 2] += temperature; | |
temperatureResultArray[index + 3]++; | |
} | |
} | |
// Extract final results from the hash table | |
function extractFinalResults() { | |
const textDecoder = new TextDecoder(); | |
const results = {}; | |
for (let i = 0; i < HASH_TABLE_SIZE; i++) { | |
const index = i * RESULT_ENTRY_SIZE; | |
if (temperatureResultArray[index] !== 0) { | |
const cityNameStart = i * MAX_CITY_NAME_LENGTH; | |
const cityNameEnd = cityNameStart + temperatureResultArray[index + 4]; | |
const cityName = textDecoder.decode( | |
cityNameHashTable.slice(cityNameStart, cityNameEnd) | |
); | |
results[cityName] = { | |
max: temperatureResultArray[index] / 10, | |
min: temperatureResultArray[index + 1] / 10, | |
totalSum: temperatureResultArray[index + 2] / 10, | |
count: temperatureResultArray[index + 3], | |
}; | |
} | |
} | |
return results; | |
} | |
// Handle end of file processing | |
function processEndOfFile() { | |
const results = extractFinalResults(); | |
const performanceEnd = performance.now(); | |
console.log(performanceEnd - performanceStart); | |
file.close(); | |
parentPort?.postMessage(results); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment