Skip to content

Instantly share code, notes, and snippets.

@erasmas
Last active March 29, 2017 05:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erasmas/1e371d8521328fcad114 to your computer and use it in GitHub Desktop.
Save erasmas/1e371d8521328fcad114 to your computer and use it in GitHub Desktop.
Cascalog workflow to copy data from CSV to Parquet. How do I fix this so that schema fields are not prepended with '?' ?
id airport_ref airport_ident type description frequency_mhz
70518 6528 00CA CTAF CTAF 122.9
307581 6589 01FL ARCAL 122.9
75239 6589 01FL CTAF CEDAR KNOLL TRAFFIC 122.8
60191 6756 04CA CTAF CTAF 122.9
59287 6779 04MS UNIC UNICOM 122.8
60682 6784 04NV UNIC UNICOM 123
60091 6812 05CL CTAF CTAF 122.9
63835 6853 05UT UNIC UNICOM 122.8
70676 6868 06FA APP PALM BEACH APP 124.6
70677 6868 06FA GND GND 121.65
70678 6868 06FA TWR TWR 120.4
65868 6887 06MO CTAF CTAF 122.9
(ns cascalog-sandbox.core
(:require [cascalog.cascading.tap :as tap]
[cascalog.cascading.operations :refer [rename*]]
[cascalog.more-taps :refer [lfs-delimited]]
[cascalog.api :refer :all])
(:import (parquet.cascading ParquetTupleScheme)
(cascading.tuple Fields)
(parquet.schema PrimitiveType MessageType PrimitiveType$PrimitiveTypeName Type$Repetition)))
(def repititions
{:optional Type$Repetition/OPTIONAL
:repeated Type$Repetition/REPEATED
:required Type$Repetition/REQUIRED})
(def types
{:fixed-len-byte-array PrimitiveType$PrimitiveTypeName/FIXED_LEN_BYTE_ARRAY
:float PrimitiveType$PrimitiveTypeName/FLOAT
:double PrimitiveType$PrimitiveTypeName/DOUBLE
:boolean PrimitiveType$PrimitiveTypeName/BOOLEAN
:binary PrimitiveType$PrimitiveTypeName/BINARY
:int32 PrimitiveType$PrimitiveTypeName/INT32
:int64 PrimitiveType$PrimitiveTypeName/INT64
:int96 PrimitiveType$PrimitiveTypeName/INT96
})
(defn create-primitive-type
[repitition type name]
(PrimitiveType. (repitition repititions) (type types) name))
(defn create-parquet-schema
[name mappings]
(let [types (map (fn [{:keys [repitition type name]}]
(create-primitive-type repitition type name)) mappings)
message-type (MessageType. name types)]
(.toString message-type)))
;; Use cascalog.cascading.operations/rename* as suggested here:
;; https://github.com/nathanmarz/cascalog/issues/18
(let [fields ["id" "airport_ref" "airport_ident" "type" "description" "frequency_mhz"]
out-fields (map #(str "?" %) fields)
parquet-fields (Fields. (into-array String fields))
schema (create-parquet-schema "schema" [{:repitition :optional :type :int32 :name "id"}
{:repitition :optional :type :int32 :name "airport_ref"}
{:repitition :optional :type :binary :name "airport_ident"}
{:repitition :optional :type :binary :name "type"}
{:repitition :optional :type :binary :name "description"}
{:repitition :optional :type :float :name "frequency_mhz"}])
scheme (ParquetTupleScheme. parquet-fields parquet-fields schema)
sink (tap/lfs-tap scheme "/Users/dmi3y/Downloads/airport-frequencies.parquet" :sinkmode :replace)
source (lfs-delimited "/Users/dmi3y/Downloads/airport-frequencies.csv" :skip-header? true :delimiter ",")
query (<- out-fields (source :>> out-fields))]
(?- sink
(rename* query out-fields fields)))
# here's how the schema of the Parquet file I get looks like
$ parquet-schema /data/out/airport-frequencies.parquet/part-00000-m-00000.parquet
message schema {
optional int32 ?id;
optional int32 ?airport_ref;
optional binary ?airport_ident;
optional binary ?type;
optional binary ?description;
optional float ?frequency_mhz;
}
(defproject cascalog-sandbox "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:repositories [["conjars.org" "http://conjars.org/repo"]]
:dependencies [[org.clojure/clojure "1.6.0"]
[cascalog "2.1.1"]
[cascalog-more-taps "0.3.0"]
[com.twitter/parquet-cascading "1.5.0"]
[com.twitter/parquet-column "1.5.0"]]
:profiles {:uberjar {:aot :all}
:provided {:dependencies [[org.apache.hadoop/hadoop-client "2.2.0"]
[org.apache.hadoop/hadoop-mapreduce-client-core "2.2.0"]]}
:dev {:dependencies [[org.apache.hadoop/hadoop-minicluster "2.2.0"]]}})
/**
* Same workflow but using Cascading, output fields in Parquet file are obviously fine and not prepended with '?'
*/
package cascading.sandbox;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Lfs;
import cascading.tuple.Fields;
import org.xml.sax.SAXException;
import parquet.cascading.ParquetTupleScheme;
import parquet.schema.MessageType;
import parquet.schema.PrimitiveType;
import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.util.Properties;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static parquet.schema.Type.Repetition.OPTIONAL;
public class WriteToParquetExample {
public static void main(String[] args) throws ParserConfigurationException, SAXException, IOException {
final String[] fieldNames = { "id", "airport_ref", "airport_ident", "type", "description", "frequency_mhz" };
final Fields fields = new Fields(fieldNames);
PrimitiveType[] types = new PrimitiveType[] {
new PrimitiveType(OPTIONAL, INT32, "id"),
new PrimitiveType(OPTIONAL, INT32, "airport_ref"),
new PrimitiveType(OPTIONAL, BINARY, "airport_ident"),
new PrimitiveType(OPTIONAL, BINARY, "type"),
new PrimitiveType(OPTIONAL, BINARY, "description"),
new PrimitiveType(OPTIONAL, FLOAT, "frequency_mhz")
};
final MessageType message = new MessageType("schema", types);
final String schema = message.toString();
ParquetTupleScheme scheme = new ParquetTupleScheme(fields, fields, schema);
Tap source = new Lfs(new TextDelimited(true, ","), "/data/in/airport-frequencies.csv");
Tap sink = new Lfs(scheme, "/data/out/airport-frequencies.parquet", SinkMode.REPLACE);
Pipe copyPipe = new Pipe("copy");
FlowDef flowDef = FlowDef.flowDef()
.addSource(copyPipe, source)
.addTailSink(copyPipe, sink);
final Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, ReadFromParquetExample.class);
HadoopFlowConnector flowConnector = new HadoopFlowConnector();
Flow flow = flowConnector.connect(flowDef);
flow.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment