Skip to content

Instantly share code, notes, and snippets.

@junajan
Created February 2, 2021 19:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save junajan/687afe2678f016504330b620a1618ed7 to your computer and use it in GitHub Desktop.
Save junajan/687afe2678f016504330b620a1618ed7 to your computer and use it in GitHub Desktop.
const _ = require('lodash')
const moment = require('moment')
const fse = require('fs-extra')
const csv = require('fast-csv')
const path = require('path')
const FILENAME = 'bitstampUSD.csv';
var outputStream = csv.createWriteStream();
var writeStream = fse.createWriteStream("outputfile.csv");
outputStream.pipe(writeStream);
let processed = 0;
const finalData = [
'unix', 'date', 'open', 'high', 'low', 'close', 'volume'
];
outputStream.write(finalData);
function parseRow(row) {
return {
unix: Number(row[0]),
dateHours: moment(Number(row[0] * 1000)).format('YYYY-MM-DDTHH'),
date: moment(Number(row[0] * 1000)).format('YYYY-MM-DDTHH:MM:SS'),
price: parseFloat(row[1]),
volume: parseFloat(row[2]),
}
}
function processBuffer (buffer) {
//const date = moment(row.Date).format('YYYY-MM-DD') + ' ' + row.Time
const date = moment(Number(buffer[0].unix * 1000)).format('YYYY-MM-DD HH:00:00Z');
const unix = Number(moment(date).format('x')) / 1000;
const prices = _.map(buffer, 'price');
const volumes = _.map(buffer, 'volume');
const aggregated = [
unix,
date,
buffer[0].price,
Math.max(...prices),
Math.min(...prices),
buffer[buffer.length - 1].price,
_.sum(volumes),
];
outputStream.write(aggregated)
}
let buffer = [];
let currentTime = null;
fse.createReadStream(FILENAME)
.pipe(csv({
objectMode: true,
headers: false
}))
.on("data", (_row) => {
processed++
const row = parseRow(_row);
if (!currentTime) {
currentTime = row.dateHours;
}
if (row.dateHours !== currentTime) {
processBuffer(buffer);
buffer = []
currentTime = row.dateHours;
}
if (row.dateHours === currentTime) {
buffer.push(row);
}
if(!(processed % 1000))
console.log("Processed %d rows", processed)
})
.on("end", () => {
outputStream.end();
console.log("done")
});
@junajan
Copy link
Author

junajan commented Feb 3, 2021

const _ = require('lodash')
const moment = require('moment')
const fse = require('fs-extra')
const csv = require('fast-csv')
const path = require('path')

const FILENAME = 'bitstampUSD.csv';

const SECONDS_PER_GROUP = 30 * 60; // count of seconds in 15 minutes

var outputStream = csv.createWriteStream();
var writeStream = fse.createWriteStream("outputfile-30m.csv");
outputStream.pipe(writeStream);


let processed = 0;

const finalData = [
  'unix', 'date', 'open', 'high', 'low', 'close', 'volume'
];
outputStream.write(finalData);

function parseRow(row) {
  const unixTime = Number(row[0]);
  const secondsAfterGroupStarted = unixTime % SECONDS_PER_GROUP;
  const groupStartTime = unixTime - secondsAfterGroupStarted;

  return {
    unix: unixTime,
    groupStartTime,
    date: moment(unixTime * 1000).format('YYYY-MM-DDTHH:mm:SS'),
    price: parseFloat(row[1]),
    volume: parseFloat(row[2]),
  }
}

function processBuffer (buffer) {
    const date = moment(Number(buffer[0].groupStartTime * 1000)).format('YYYY-MM-DD HH:mm:ssZ');
    const unix = Number(moment(date).format('x')) / 1000;

    const prices = _.map(buffer, 'price');
    const volumes = _.map(buffer, 'volume');

    const aggregated = [
      unix,
      date,
      buffer[0].price,
      Math.max(...prices),
      Math.min(...prices),
      buffer[buffer.length - 1].price,
      _.sum(volumes),
    ];

    outputStream.write(aggregated)
}


let buffer = [];
let currentTime = null;

fse.createReadStream(FILENAME)
  .pipe(csv({
    objectMode: true,
    headers: false
  }))
  .on("data", (_row) => {
    processed++

    const row = parseRow(_row);

    if (!currentTime) {
      currentTime = row.groupStartTime;
    }

    if (row.groupStartTime !== currentTime) {
      processBuffer(buffer);
      buffer = []
      currentTime = row.groupStartTime;
    }

    if (row.groupStartTime === currentTime) {
      buffer.push(row);
    }
    
    if(!(processed % 1000))
      console.log("Processed %d rows", processed)
  })
  .on("end", () => {
    outputStream.end();
    
    console.log("done")
  });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment