Skip to content

Instantly share code, notes, and snippets.

@tomaszezula
Created November 9, 2015 14:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomaszezula/382efe6ced7e9446ef23 to your computer and use it in GitHub Desktop.
Save tomaszezula/382efe6ced7e9446ef23 to your computer and use it in GitHub Desktop.
// App configuration, provides access to Spark context, Kafka brokers, topics etc.
val conf = new Map(...)
// ETLEngine represents a custom API layer
val etl = new ETLEngine(conf)
// A stream of new proxy logs
val proxyStream: DStream[Row] = etl.extract(new ProxyStreamExtractor
.transform(new ProxyRowTransformer)
// Split into two distinct sets
val successFilter = proxyStream.filter(rdd => ...)
val failedFilter = proxyStream.filter(rdd => ...)
// Trigger a new workflow
// Hive context needed for SQL operations
val hiveCtx: HiveContext = ...
// Start with successfully parsed lines
etl.process(successFilter)
.analyse(new WindowAnalyzer(hiveCtx))
.analyse(new ThreatIntelSweeper(hiveCtx))
.load(new SuccessLoader(hiveCtx))
// Now process the errors
.process(failedFilter)
.load(new ErrorLoader(hiveCtx))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment