Skip to content

Instantly share code, notes, and snippets.

@cwensel
Last active September 15, 2017 21:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cwensel/c093adb247f55168cf24a66626a932b3 to your computer and use it in GitHub Desktop.
Save cwensel/c093adb247f55168cf24a66626a932b3 to your computer and use it in GitHub Desktop.
Two Cascading Flows, first to read from S3 into a Kafka queue, second to read from the queue to a partitioned directory structure.
public class S3LogsViaKafka
{
public static final String DD_MMM_YYYY = "dd-MMM-yyyy";
public static final TimeZone UTC = TimeZone.getTimeZone( "UTC" );
public static final DateType DMY = new DateType( DD_MMM_YYYY, UTC );
public static final Fields KEY = new Fields( "date", DMY );
public static final Fields LINE = new Fields( "line", String.class );
public static final Fields KEY_LINE = KEY.append( LINE );
public static void main( String[] args )
{
if( args.length != 3 )
return;
System.out.println( "source s3 uri = " + args[ 0 ] );
System.out.println( "kafka host = " + args[ 1 ] );
System.out.println( "sink file path = " + args[ 2 ] );
// read from an S3 bucket
Tap inputTap = new S3Tap( new TextLine(), URI.create( args[ 0 ] ) );
// write and read from a Kafka queue
Tap queueTap = new KafkaTap<>( new TextKafkaScheme( TOPIC_FIELDS.append( OFFSET_FIELDS ).append( KEY_LINE ) ), args[ 1 ], "logs" );
// write to disk, using log data to create the directory structure
DelimitedPartition partitioner = new DelimitedPartition( KEY.append( S3Logs.operation ), "/", "logs.csv" );
Tap outputTap = new PartitionTap(
new DirTap( new TextDelimited( true, ",", "\"" ), args[ 2 ], SinkMode.REPLACE ), partitioner
);
Pipe ingress = new Pipe( "head" );
// extract the log timestamp and reduce to day/month/year for use as the queue key
ingress = new Each( ingress, new Fields( "line" ), new RegexParser( S3Logs.time, REGEX, 3 ), new Fields( "time", "line" ) );
ingress = new Each( ingress, S3Logs.time, new DateFormatter( KEY, DD_MMM_YYYY, UTC ), KEY_LINE );
// watch the progress on the console
ingress = new Each( ingress, new Debug( true ) );
Flow ingressFlow = new LocalFlowConnector().connect( flowDef()
.setName( "ingress" )
.addSource( ingress, inputTap )
.addSink( ingress, queueTap )
.addTail( ingress )
);
// start reading from S3 and writing to a Kafka queue
ingressFlow.start();
Pipe egress = new Pipe( "head" );
// parse the full log into its fields and primitive values -- S3Logs.FIELDS declard field names and field types
egress = new Each( egress, new Fields( "line" ), new RegexParser( S3Logs.FIELDS, REGEX ), KEY.append( S3Logs.FIELDS ) );
// watch the progress on the console
egress = new Each( egress, new Debug( true ) );
Flow egressFlow = new LocalFlowConnector().connect( flowDef()
.setName( "egress" )
.addSource( egress, queueTap )
.addSink( egress, outputTap )
.addTail( egress )
);
// start reading from the Kafka queue and writing to the directory as ./[dd-MMM-yyyy]/[S3 operation]/logs.csv
egressFlow.start();
egressFlow.complete();
System.out.println( "completed egress" );
ingressFlow.complete();
System.out.println( "completed ingress" );
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment