Skip to content

Instantly share code, notes, and snippets.

@andrewstevenson
Last active September 11, 2016 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewstevenson/f2af4e2774f86b468d93a12405c41f37 to your computer and use it in GitHub Desktop.
Save andrewstevenson/f2af4e2774f86b468d93a12405c41f37 to your computer and use it in GitHub Desktop.
serializeAvro(stock, schema ) match {
case Some(avro) => {
val event : Event = buildFlumeEvent(stock, avro)
if (flumeClient.isActive) {
flumeClient.sendEvent(event)
} else {
println("Flume agent is down!!! Discarding quote for " + symbol)
flumeClient.cleanUp()
getFlumeClient("localhost", 4141)
}
}
case None => println("Couldn't send quote for " + symbol)
}
def serializeAvro(datum: Object, schema: Schema): Option[Array[Byte]] = {
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val writer : ReflectDatumWriter[Object] = new ReflectDatumWriter[Object](schema);
val encoder : BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null);
out.reset();
try {
writer.write(datum, encoder)
encoder.flush()
Some(out.toByteArray())
} catch {
case e: NullPointerException =>; {
e.printStackTrace()
None
}
}
}
def buildFlumeEvent(stock : Stock, avro : Array[Byte]): Event = {
val event : Event = EventBuilder.withBody(avro);
val headers : Map[String, String] = Map("symbol" ->; stock.getSymbol,
avro_schema_url_header_key ->; schemaLocationInHDFS)
event.setHeaders(headers.asJava)
event
}
public void sendEvents(List[Event] events) {
// Send the event
try {
client.appendBatch(events);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port, batchsize);
props.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "deflate");
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void sendEvent(Event event) {
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port, batchsize);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment