Skip to content

Instantly share code, notes, and snippets.

@DmitryBe
Created July 28, 2017 09:02
Show Gist options
  • Save DmitryBe/a5d5dc169888fa39d3d47852a773fa6d to your computer and use it in GitHub Desktop.
Save DmitryBe/a5d5dc169888fa39d3d47852a773fa6d to your computer and use it in GitHub Desktop.
Read AVRO from S3
// avro schema path
val avroSchemaPath = "path-to/avro-schema.avsc"
val avroSchemaStr= scala.io.Source.fromFile(avroSchemaPath).mkString
val avroSchemaParser = new Schema.Parser
val avroSchema = avroSchemaParser.parse(avroSchemaStr)
// create avro generic record reader
val avroGenericRecordReader = new GenericDatumReader[GenericRecord](avroSchema)
System.setProperty("HADOOP_USER_NAME", "user_name")
System.setProperty("hadoop.home.dir", "/")
// create hadoop config
val hadoopConfig = new Configuration()
hadoopConfig.set("fs.s3a.access.key", "access_key")
hadoopConfig.set("fs.s3a.secret.key", "secret_key")
hadoopConfig.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName)
// create hadoop fs
val fs = FileSystem.get(URI.create("s3a://<bucket-name>/"), hadoopConfig)
// create avro reader
val path = new Path("path-to-avro-message.avro")
val bufferedStream = new BufferedInputStream(fs.open(path))
val avroReader = new DataFileStream[GenericRecord](bufferedStream, avroGenericRecordReader)
// keep reading
val rec = avroReader.next()
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.169",
"org.apache.hadoop" % "hadoop-aws" % "2.8.1",
"org.apache.hadoop" % "hadoop-client" % "2.8.1",
"org.apache.hadoop" % "hadoop-common" % "2.8.1"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment