Skip to content

Instantly share code, notes, and snippets.

@charlesdarkwind
Created November 29, 2018 19:45
Show Gist options
  • Save charlesdarkwind/66c8c87cd302d6c4e605f4316c66f006 to your computer and use it in GitHub Desktop.
Save charlesdarkwind/66c8c87cd302d6c4e605f4316c66f006 to your computer and use it in GitHub Desktop.
JAVA Hadoop MapReduce Job. Calculates mean, standard deviation and relative stdDev for crypto prices or any financial assets.
import java.io.File;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class Crypto extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Crypto(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
FileUtils.deleteQuietly(new File(args[1]));
Job job = Job.getInstance(getConf(), "Crypto");
job.setJarByClass(this.getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable> {
public void map(LongWritable offset, Text csv, Context context)
throws IOException, InterruptedException {
String[] tokens = csv.toString().split(",");
if (tokens[1].equals("\"1h\"")) {
Text coin = new Text(tokens[0]);
DoubleWritable prix = new DoubleWritable(Double.parseDouble(tokens[6]));
context.write(coin, prix);
}
}
}
public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
DecimalFormat df = new DecimalFormat("####0.00");
public static final int period = 24;
public void reduce(Text coin, Iterable<DoubleWritable> prixs, Context context)
throws IOException, InterruptedException {
Double somme = 0.0, moyenne = 0.0, ecartType = 0.0;
int length = 0;
List<DoubleWritable> cache = new ArrayList<DoubleWritable>();
for (DoubleWritable prix : prixs) {
if (length >= period) continue;
length++;
somme += prix.get();
cache.add(prix);
}
moyenne = somme / length;
for(DoubleWritable prix : cache)
ecartType += Math.pow(prix.get() - moyenne, 2);
ecartType = Math.sqrt(ecartType / (length - 1));
Double rounded = Double.valueOf(df.format(ecartType * 100 / moyenne));
context.write(coin, new DoubleWritable(rounded));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment