Skip to content

Instantly share code, notes, and snippets.

@prodeezy
Last active August 29, 2015 13:56
Show Gist options
  • Save prodeezy/9000271 to your computer and use it in GitHub Desktop.
Save prodeezy/9000271 to your computer and use it in GitHub Desktop.
// ***** Construct a custom Scan object based on custom filters *****
Filter filterList = MessageOperations.getFilter("app1",
"foo",
"123",
2014, 2, 12,
-1, -1, -1, -1);
Scan tableScan = new Scan();
tableScan.setFilter(filterList);
tableScan.addFamily("c".getBytes());
//******* Configure custom Tap that takes the Scan object ********
Fields keyFields = new Fields("MTU_Rowkey", String.class);
String family = "c";
Fields messageColumnFields = new Fields();
//********* Setup column fields for HbaseScheme ********
TreeMap<String, Type> messageFieldsMap = new TreeMap<String, Type>();
messageFieldsMap.put(APP_FIELD, String.class);
messageFieldsMap.put(USER_FIELD, String.class);
messageFieldsMap.put(ARRIVAL_TIMESTAMP_FIELD, Long.class);
messageFieldsMap.put(TIMESTAMP_FIELD, Long.class);
messageFieldsMap.put(REVENUE_FIELD, Integer.class);
for(String fieldName:messageFieldsMap.keySet()) {
messageColumnFields.append(new Fields(fieldName, messageFieldsMap.get(fieldName))) ;
}
String statement = "select * from \"example\".\"messages-table\" as RevenueMessages\n";
//*********Custom Tap that accepts my Scan object***********
Tap myTap = new CustomHbaseTap( tableScan,
"message_table",
new HBaseScheme(keyFields, family, messageColumnFields));
Tap resultsTap = new Hfs(new TextLine(), "build/test/output/results.txt");
FlowDef flowDef = FlowDef.flowDef()
.setName("sql flow")
.addSource( "example.messages-table", myTap )
.addSink( "results", resultsTap);
SQLPlanner sqlPlanner = new SQLPlanner().setSql(statement);
flowDef.addAssemblyPlanner( sqlPlanner );
FlowConnector flowConnector = createHadoopFlowConnector();
Flow flow = flowConnector.connect(flowDef);
flow.complete();
TupleEntryIterator iterator = resultsTap.openForRead( flow.getFlowProcess() );
while( iterator.hasNext() ) {
System.out.println( "[ResultRow] => " + iterator.next() );
}
iterator.close();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment