Skip to content

Instantly share code, notes, and snippets.

@andrewstevenson
Created September 11, 2016 17:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewstevenson/04ec177d28df1e63ea49b7bd5ff0ccba to your computer and use it in GitHub Desktop.
Save andrewstevenson/04ec177d28df1e63ea49b7bd5ff0ccba to your computer and use it in GitHub Desktop.
/**
* Load a HDFS directory containing CSV text into a dataset
*
* @param input_schema A avro schema describing the source. Used to construct CSV header
* @param input_path The directory in HDFS containing the CSV's
* @param dataset The dataset to load
* @param conf Configuration
* */
def load_csv(input_schema: Schema, input_path: String, dataset : Dataset[GenericData.Record], conf: Configuration ) = {
val fs = new Path(input_path).getFileSystem(conf)
val hdfs_path: Path = fs.makeQualified(new Path("hdfs:" + input_path))
if (fs.exists(hdfs_path)) {
val fields = input_schema.getFields
val field_names = for (field <- fields) yield field.name()
val header = field_names.mkString(",")
val jc = new JCommander()
jc.addCommand("csv-import", new CSVImportCommand(log))
jc.parse("csv-import", hdfs_path.toString, dataset.getName, "--namespace", dataset.getNamespace,
"--header", header)//, "--transform", "com.datamountaineer.ingestor.transformations.TransformNULL")
val parsed: String = jc.getParsedCommand
val command: Command = jc.getCommands.get(parsed).getObjects.get(0).asInstanceOf[Command]
command.asInstanceOf[Configurable].setConf(conf)
command.run()
} else {
log.warn"No input directory found: %s.".format(hdfs_path.toString))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment