Skip to content

Instantly share code, notes, and snippets.

@hammer
Last active October 17, 2022 04:16
Show Gist options
  • Save hammer/76996fb8426a0ada233e to your computer and use it in GitHub Desktop.
Save hammer/76996fb8426a0ada233e to your computer and use it in GitHub Desktop.
Concise example of how to write an Avro record out as JSON in Scala
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.hadoop.fs.Path
import parquet.avro.{AvroParquetReader, AvroParquetWriter}
import scala.util.control.Breaks.break
object HelloAvro {
def main(args: Array[String]) {
// Build a schema
val schema = SchemaBuilder
.record("person")
.fields
.name("name").`type`().stringType().noDefault()
.name("ID").`type`().intType().noDefault()
.endRecord
// Build an object conforming to the schema
val user1 = new GenericRecordBuilder(schema)
.set("name", "Jeff")
.set("ID", 1)
.build
// JSON encoding of the object (a single record)
val writer = new GenericDatumWriter[GenericRecord](schema)
val baos = new ByteArrayOutputStream
val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos)
writer.write(user1, jsonEncoder)
jsonEncoder.flush
println("JSON encoded record: " + baos)
// binary encoding of the object (a single record)
baos.reset
val binaryEncoder = EncoderFactory.get.binaryEncoder(baos, null)
writer.write(user1, binaryEncoder)
binaryEncoder.flush
println("Binary encoded record: " + baos.toByteArray)
// Build another object conforming to the schema
val user2 = new GenericRecordBuilder(schema)
.set("name", "Sam")
.set("ID", 2)
.build
// Write both records to an Avro object container file
val file = new File("users.avro")
file.deleteOnExit
val dataFileWriter = new DataFileWriter[GenericRecord](writer)
dataFileWriter.create(schema, file)
dataFileWriter.append(user1)
dataFileWriter.append(user2)
dataFileWriter.close
// Read the records back from the file
val datumReader = new GenericDatumReader[GenericRecord](schema)
val dataFileReader = new DataFileReader[GenericRecord](file, datumReader)
var user: GenericRecord = null;
while (dataFileReader.hasNext) {
user = dataFileReader.next(user)
println("Read user from Avro file: " + user)
}
// Write both records to a Parquet file
val tmp = File.createTempFile(getClass.getSimpleName, ".tmp")
tmp.deleteOnExit
tmp.delete
val tmpParquetFile = new Path(tmp.getPath)
val parquetWriter = new AvroParquetWriter[GenericRecord](tmpParquetFile, schema)
parquetWriter.write(user1)
parquetWriter.write(user2)
parquetWriter.close
// Read both records back from the Parquet file
val parquetReader = new AvroParquetReader[GenericRecord](tmpParquetFile)
while (true) {
Option(parquetReader.read) match {
case Some(matchedUser) => println("Read user from Parquet file: " + matchedUser)
case None => println("Finished reading Parquet file"); break
}
}
}
}
@rockyadi
Copy link

rockyadi commented Oct 16, 2016

Thanks for this piece of code ! i'm new to scala so this is really useful. Could you please provide the avro schema and the json output please, the avro scehma based on which you created the schema val

@nemppu
Copy link

nemppu commented Apr 7, 2017

Thanks for a great example! However, I noticed a small bug at line 39:
println("Binary encoded record: " + baos.toByteArray)

It prints out the memory address of the byte array. I suggest the following fix:
println("Binary encoded record (hex): " + baos.toByteArray.map(x => f"$x%02x").mkString(" "))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment