Skip to content

Instantly share code, notes, and snippets.

@cyrbon
Forked from akiatoji/A_README.md
Created April 17, 2016 14:10
Show Gist options
  • Save cyrbon/7088357e57c22de5e3bfbf3cecbf6c69 to your computer and use it in GitHub Desktop.
Save cyrbon/7088357e57c22de5e3bfbf3cecbf6c69 to your computer and use it in GitHub Desktop.
How to aggregate gazillion geo location data using Hadoop and Cassandra

How to Aggregate Gazillion Geo Data for Instant Display (WIP)

(Cleaning up for CQL3 interface using DataStax driver)

Let's say you have a large amount of Geo data. Let's say it's GPS data from thousands of autonomous drones mapping geo areas around the country. Or it could be something boring like a truck fleet positional information.

Either way, it's lots and lots of data in the form of Coordinates associated with some kind of value. And now that you have gazillion geo data, you want to visualize this data. A heatmap, perhaps.

If the data is relatively small as in up to a few hundred K's, you can simply put this into a heatmap and it'll probably work.

Well, if you have a fast Mac like me, that is.

The problem is, when you have huge amounts of data - data so large that you can't get clear picture without visualizing it - that's when this exact approach starts to fail. Heatmaps just can't take in millions of data points and allow you to scroll/zoom around at will.

GeoHex to the Rescue

Just as you would with any raw big data, we need to aggregate this geo data. In geo terms, this means instead of putting every lat/lon data on the map, we aggregate it into areas, then show the agg counts of the areas as heat on the map.

There are a couple of ways to do this. One is to use something like a GeoHash. This will then let you do greater than/less than query. This, however, is a sucky way to do this, because ge/le on two axis means a rectangle. Heat is concentrated in circular shapes, not rectangle. You don't want to show heatmap of big rectangles, that's just silly. You want circular geo aggregation instead.

This is where GeoHex comes in. A GeoHex divides the entire world into equally shaped hexagons. Instead of 500 million points on a map, a geohex of 5-10 mile radius allows you to aggregate data into a couple of hundred K's worth of aggregated data points, making it very manageable.

Real Men Do MapReduce

You can generate geohex from lat/lon fairly easily. You can generate multiple geohexes (of varying sizes) from same lat/lon fairly easily as well.

OTOH, processing millions of geohex an take a very long time especially if you use something that crawls. Like Ruby. Well, Ruby isn't the only one either. Do it in Go or whatever. You can write it in any language, it still will crawl as long as you do it in a serial fashion.

Now, code that translates lat/lon into various geohexes is dirt easy to prop it into raw MapReduce in Hadoop (or better yet, Spark) and see amazing speed up.

GeoHexApp.java, GeoHexMapper.java and GeoHexFileReducer.java below do this. 50 million records (47.5M to be exact) can be processed within a couple of minutes with this code, running under Hadoop 2.5.0 on a Retina MacBookPro 15 with Yarn vcore set to 8.

The mapper uses concatenated key to generate multiple geohexes on one run. This means GeoHex is prefixed with size code, so it can be separated into different size groups, or queried using size prefix.

(I initially tried this with a Pig script using UDF, but gave up after realizing it's much easier with raw MapReduce)

Cassandra Rocks

So now we have millions of lat/lon data aggregated into a few hundred thousand. We now want to store this data for easy retrieval.

For this, I now put data directly into Cassandra from Reducer as seen in GeoHexCassandraReducer.java. Aggregate count is put in to a composite key of (primary, geohex) which essentially makes it into a super wide column. In this case a row with almost 700K columns to be exact.

This means I have a day's worth of aggregated geo data, separated into ~700K different geohexes in one row. Tomorrow, I can add next day's data as a row. And another row the day after, etc.

Writing to Cassandra is a little slower than writing to a file, of course. It took a whopping 12 minutes to aggregate 47million rows into 700K aggregates, which were then written to Cassandra. Note that I did not do any tuning on Cassandra or Hadoop for this.

(wip)

package aki.atoji;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.cassandra.thrift.*;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.nio.ByteBuffer;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Created by Aki
*/
public class GeoHexApp extends Configured implements Tool {
static final String KEYSPACE = "ghkey";
static final String OUTPUT_COLUMN_FAMILY = "geohex";
private static final String PRIMARY_KEY = "row_key";
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new GeoHexApp(), args);
System.exit(0);
}
public int run(String[] args) throws Exception {
final Job job = Job.getInstance(getConf(), "GeoHexApp");
final Configuration conf = job.getConfiguration();
job.setJarByClass(GeoHexApp.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(GeoHexMapper.class);
job.setReducerClass(GeoHexCassandraReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputFormatClass(CqlOutputFormat.class);
job.setOutputKeyClass(Map.class);
job.setOutputValueClass(ByteBuffer.class);
ConfigHelper.setOutputInitialAddress(conf, "localhost");
ConfigHelper.setOutputRpcPort(conf, "9160");
ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setOutputKeyspace(conf, "ghkey");
ConfigHelper.setOutputColumnFamily(conf, "ghkey", "geohex");
job.getConfiguration().set(PRIMARY_KEY, "blob,geohex");
String query = "UPDATE " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY + " SET value = ? ";
CqlConfigHelper.setOutputCql(job.getConfiguration(), query);
return (job.waitForCompletion(true) ? 0 : 1);
}
package aki.atoji;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
/**
* Simple aggregating reducer
*/
public class GeoHexCassandraReducer extends Reducer<Text, LongWritable, Map<String, ByteBuffer>, List<ByteBuffer>> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value: values) {
count += value.get();
}
Map<String, ByteBuffer> ckey = new HashMap<>();
ckey.put("key", ByteBufferUtil.bytes("snapshot"));
ckey.put("geohex", ByteBufferUtil.bytes(key.toString()));
ckey.put("value", ByteBufferUtil.bytes(key.toString()));
List<ByteBuffer> variables = new ArrayList<>();
variables.add(ByteBufferUtil.bytes(String.valueOf(count)));
context.write(ckey, variables);
}
}
package vaki.atoji;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Simple aggregating reducer with File output
*/
public class GeoHexFileReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value: values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
package aki.atoji
import net.geohex.GeoHex;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* GeoHex mapper. Takes lat,lon lines then generate multiple GeoHexes.
* We use key prefix technique here so we can aggregate in one MR pass.
*/
public class GeoHexMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] latlons = line.split("\\s+");
String[] vals = latlons[0].split(",");
if (vals==null || vals.length != 2 ) {
System.err.println("Bad lat lon entry");
System.err.println(line);
return;
}
String lat = vals[0];
String lon = vals[1];
System.err.println("Vals: " + lat + " / " + lon + " " + (lat.equals("")));
String gh4, gh5, gh6;
if (lat == null || lon == null || lat.equals("") || lon.equals("")) {
System.err.println("Bad lat lon values");
System.err.println(line);
return;
}
gh4 = "4_" + GeoHex.encode(Float.parseFloat(lat), Float.parseFloat(lon), 4);
gh5 = "5_" + GeoHex.encode(Float.parseFloat(lat), Float.parseFloat(lon), 5);
gh6 = "6_" + GeoHex.encode(Float.parseFloat(lat), Float.parseFloat(lon), 6);
final LongWritable one = new LongWritable(1);
context.write(new Text(gh4), one);
context.write(new Text(gh5), one);
context.write(new Text(gh6), one);
}
}
@cyrbon
Copy link
Author

cyrbon commented Apr 17, 2016

Great article! Small typo fix (OTOTH -> OTOH):
https://gist.github.com/cyrbon/7088357e57c22de5e3bfbf3cecbf6c69/revisions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment