Skip to content

Instantly share code, notes, and snippets.

@crockpotveggies
Created October 25, 2017 20:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save crockpotveggies/70c3d8b24844590f1ddcadb9d49f4240 to your computer and use it in GitHub Desktop.
Save crockpotveggies/70c3d8b24844590f1ddcadb9d49f4240 to your computer and use it in GitHub Desktop.
// note the column names don't exactly match, we are arbitrarily assigning them
val schema = new Schema.Builder()
.addColumnsString("Timestamp")
.addColumnCategorical("VesselType")
.addColumnsString("MMSI")
.addColumnsString("Lat","Lon") // will convert to Double later
.addColumnCategorical("Status")
.addColumnsDouble("ROT","SOG","COG")
.addColumnInteger("Heading")
.addColumnsString("IMO","Callsign","Name")
.addColumnCategorical("ShipType","CargoType")
.addColumnsInteger("Width","Length")
.addColumnCategorical("FixingDevice")
.addColumnDouble("Draught")
.addColumnsString("Destination","ETA")
.addColumnCategorical("SourceType")
.addColumnsString("end")
.build()
val transform = new TransformProcess.Builder(schema)
.removeAllColumnsExceptFor("Timestamp","MMSI","Lat","Lon")
.transform(new StringToTimeTransform("Timestamp","dd/MM/YYYY HH:mm:ss",DateTimeZone.UTC))
.transform(new ConcatenateStringColumns("LatLon", ",", List("Lat","Lon")))
.convertToSequence("MMSI", new NumericalColumnComparator("Timestamp", true))
.transform(
new ReduceSequenceByWindowTransform(
new Reducer.Builder(ReduceOp.Count).keyColumns("MMSI")
.countColumns("Timestamp")
.customReduction("LatLon", new Reductions.GeoAveragingReduction("LatLon"))
.takeFirstColumns("Timestamp")
.build(),
new TimeWindowFunction("Timestamp",10L,TimeUnit.MINUTES)
)
)
.removeAllColumnsExceptFor("LatLon")
.build
// note we temporarily switch between java/scala APIs for convenience
val rawData = sc
.textFile(dataFile.getAbsolutePath)
.filter(row => !row.startsWith("# Timestamp")) // filter out the header
.toJavaRDD // datavec API uses Spark's Java API
.map(new StringToWritablesFunction(new CSVRecordReader()))
// once transform is applied, decombine lat/lon
// then convert to arrays and split to test/train
val records = SparkTransformExecutor
.executeToSequence(rawData,transform)
.rdd
.map{ row: java.util.List[java.util.List[Writable]] =>
row.map{ seq => seq.map(_.toString).map(_.split(",").toList.map(coord => new DoubleWritable(coord.toDouble))).flatten }
}
val split = records.randomSplit(Array[Double](0.8,0.2))
val trainSequences = split(0)
val testSequences = split(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment