Skip to content

Instantly share code, notes, and snippets.

View mykidong's full-sized avatar

Kidong Lee mykidong

View GitHub Profile
List<String> pathList = new ArrayList<>();
pathList.add("/META-INF/avro/request-header.avsc");
pathList.add("/META-INF/avro/record-header.avsc");
pathList.add("/META-INF/avro/record.avsc");
pathList.add("/META-INF/avro/records.avsc");
pathList.add("/META-INF/avro/produce-request.avsc");
AvroSchemaLoader avroSchemaLoader = AvroSchemaLoader.singletonForSchemaPaths((String[])pathList.toArray(new String[0]));
String schemaKey = "io.shunters.coda.avro.api.ProduceRequest";
{
"namespace":"io.shunters.coda.avro.api",
"type":"record",
"doc":"Record Array",
"name":"Records",
"fields":[
{
"name":"firstOffset",
"type":"long"
},
{
"namespace":"io.shunters.coda.avro.api",
"type":"record",
"doc":"Request Header",
"name":"RequestHeader",
"fields":[
{
"name":"correlationId",
"type":"int"
},
{
"namespace":"io.shunters.coda.avro.api",
"type":"record",
"doc":"Produce Request",
"name":"ProduceRequest",
"fields":[
{
"name":"requestHeader",
"type":"io.shunters.coda.avro.api.RequestHeader"
},
package io.shunters.coda.protocol;
import org.apache.avro.Schema;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
<appender name="eventLoggerDailyRollingFileAppender" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="/any-temp/collection-logger.log" />
<param name="DatePattern" value="'.'yyyy-MM-dd" />
<param name="Encoding" value="UTF-8" />
<param name="MaxBackupIndex" value="3"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%m%n" />
</layout>
</appender>
private static Logger eventLogger = LoggerFactory.getLogger("event-logger");
...
@Override
public void onEvent(EventLog eventLog, long sequence, boolean endOfBatch)
throws Exception {
String eventType = eventLog.getEventType();
String json = eventLog.getValue();
if (this.eventLoggingEnabled) {
@Override
public void onEvent(EventLog eventLog, long l, boolean b) throws Exception {
String topic = eventLog.getEventType();
String json = eventLog.getValue();
// send event log to kafka.
if (this.sendMessageEnabled) {
producer.send(new ProducerRecord<Integer, String>(topic, json));
}
}
this.putDisruptor = DisruptorCreator.singleton(DisruptorCreator.DISRUPTOR_NAME_PUT, EventLog.FACTORY, 1024, produceToKafkaHandler, loggerEventHandler);
this.putEventTranslator = new BaseTranslator.EventLogTranslator();
...
@Override
public void onEvent(final EventLog eventLog, final long sequence, final boolean endOfBatch) throws Exception {
String version = eventLog.getVersion();
String json = eventLog.getValue();
public class BaseTranslator {
public static class EventLogTranslator extends EventLog implements EventTranslator<EventLog>
{
@Override
public void translateTo(EventLog eventLog, long sequence) {
eventLog.setVersion(this.getVersion());
eventLog.setEventType(this.getEventType());
eventLog.setValue(this.getValue());
}