Last active
August 29, 2015 13:56
-
-
Save prodeezy/9000271 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ***** Construct a custom Scan object based on custom filters ***** | |
Filter filterList = MessageOperations.getFilter("app1", | |
"foo", | |
"123", | |
2014, 2, 12, | |
-1, -1, -1, -1); | |
Scan tableScan = new Scan(); | |
tableScan.setFilter(filterList); | |
tableScan.addFamily("c".getBytes()); | |
//******* Configure custom Tap that takes the Scan object ******** | |
Fields keyFields = new Fields("MTU_Rowkey", String.class); | |
String family = "c"; | |
Fields messageColumnFields = new Fields(); | |
//********* Setup column fields for HbaseScheme ******** | |
TreeMap<String, Type> messageFieldsMap = new TreeMap<String, Type>(); | |
messageFieldsMap.put(APP_FIELD, String.class); | |
messageFieldsMap.put(USER_FIELD, String.class); | |
messageFieldsMap.put(ARRIVAL_TIMESTAMP_FIELD, Long.class); | |
messageFieldsMap.put(TIMESTAMP_FIELD, Long.class); | |
messageFieldsMap.put(REVENUE_FIELD, Integer.class); | |
for(String fieldName:messageFieldsMap.keySet()) { | |
messageColumnFields.append(new Fields(fieldName, messageFieldsMap.get(fieldName))) ; | |
} | |
String statement = "select * from \"example\".\"messages-table\" as RevenueMessages\n"; | |
//*********Custom Tap that accepts my Scan object*********** | |
Tap myTap = new CustomHbaseTap( tableScan, | |
"message_table", | |
new HBaseScheme(keyFields, family, messageColumnFields)); | |
Tap resultsTap = new Hfs(new TextLine(), "build/test/output/results.txt"); | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName("sql flow") | |
.addSource( "example.messages-table", myTap ) | |
.addSink( "results", resultsTap); | |
SQLPlanner sqlPlanner = new SQLPlanner().setSql(statement); | |
flowDef.addAssemblyPlanner( sqlPlanner ); | |
FlowConnector flowConnector = createHadoopFlowConnector(); | |
Flow flow = flowConnector.connect(flowDef); | |
flow.complete(); | |
TupleEntryIterator iterator = resultsTap.openForRead( flow.getFlowProcess() ); | |
while( iterator.hasNext() ) { | |
System.out.println( "[ResultRow] => " + iterator.next() ); | |
} | |
iterator.close(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment