Skip to content

Instantly share code, notes, and snippets.

@mattsilv7384
Created September 17, 2019 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattsilv7384/877b57f8b68b06ea87e3f93075185d26 to your computer and use it in GitHub Desktop.
Save mattsilv7384/877b57f8b68b06ea87e3f93075185d26 to your computer and use it in GitHub Desktop.
List<StockData> readData(Path dataPath, Symbol symbol) throws IOException, InterruptedException {
//Build schema
Schema schema = new Schema.Builder()
.addColumnString(ColumnNames.DATE)
.addColumnString(ColumnNames.SYMBOL)
.addColumnDouble(ColumnNames.OPEN)
.addColumnDouble(ColumnNames.CLOSE)
.addColumnDouble(ColumnNames.LOW)
.addColumnDouble(ColumnNames.HIGH)
.addColumnDouble(ColumnNames.VOLUME)
.build();
//Load data
RecordReader csvRecordReader = new CSVRecordReader(1);
csvRecordReader.initialize(new FileSplit(dataPath.toFile()));
//Local analysis
DataAnalysis analysis = AnalyzeLocal.analyze(schema, csvRecordReader);
csvRecordReader.reset();
//Transform data
TransformProcess tp = new TransformProcess.Builder(schema)
.duplicateColumn(ColumnNames.OPEN, ColumnNames.OPEN_DENORMALIZED)
.duplicateColumn(ColumnNames.CLOSE, ColumnNames.CLOSE_DENORMALIZED)
.duplicateColumn(ColumnNames.LOW, ColumnNames.LOW_DENORMALIZED)
.duplicateColumn(ColumnNames.HIGH, ColumnNames.HIGH_DENORMALIZED)
.removeColumns(ColumnNames.DATE)
.removeColumns(ColumnNames.VOLUME)
.filter(new StringColumnCondition(ColumnNames.SYMBOL, ConditionOp.NotEqual, symbol.getValue()))
.filter(new InvalidValueColumnCondition(ColumnNames.OPEN))
.filter(new InvalidValueColumnCondition(ColumnNames.CLOSE))
.filter(new InvalidValueColumnCondition(ColumnNames.LOW))
.filter(new InvalidValueColumnCondition(ColumnNames.HIGH))
.normalize(ColumnNames.OPEN, Normalize.MinMax, analysis)
.normalize(ColumnNames.CLOSE, Normalize.MinMax, analysis)
.normalize(ColumnNames.LOW, Normalize.MinMax, analysis)
.normalize(ColumnNames.HIGH, Normalize.MinMax, analysis)
.build();
List<List<Writable>> originalData = new ArrayList<>();
while (csvRecordReader.hasNext()) {
originalData.add(csvRecordReader.next());
}
List<List<Writable>> processedData = LocalTransformExecutor.execute(originalData, tp);
return processedData.stream()
.map(writableToStockMapper)
.collect(Collectors.toList());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment