Skip to content

Instantly share code, notes, and snippets.

@haoch
Last active November 8, 2015 14:50
Show Gist options
  • Save haoch/ea86d4d76a49648d2c34 to your computer and use it in GitHub Desktop.
Save haoch/ea86d4d76a49648d2c34 to your computer and use it in GitHub Desktop.
Eagle DSL
MyApp extends StreamApp{
/**
* Scheme Definition
*/
stream("hdfsAuditLogStream",
datasource = "HDFS",
attributes = {
"path" -> string("Path","HDFS Path"),
"size" -> long("Size","HDFS Path Size")
}
)
stream("someOtherStream",resource="schema.json")
/**
* Configuration
*/
val strtConfVal = get[String]("config.key")
val boolConfVal = get[Bool]("config.key")
set("config.key","config.value")
/**
* Schema Processing DAG
*
* Not need to execute manually
*/
stream("hdfsAuditLogStream")
.from(KafakaSourceProvider())
.flatMap(parserFunc)
.alert();
}
/**
* Stream schema inline
*/
SampleApp1 extends StreamApp{
stream("hdfsAuditLogStream",
datasource = "HDFS",
attributes = {"path" -> string("Path","HDFS Path"),"size" -> long("Size","HDFS Path Size")})
.from(KafakaSourceProvider())
.flatMap(parserFunc)
.alert();
}
/**
* Stream schema from db
*/
SampleApp2 extends StreamApp{
stream("hdfsAuditLogStream")
.from(KafakaSourceProvider())
.flatMap(parserFunc)
.alert();
}
/**
* Inline policy in alert
*/
SampleApp3 extends StreamApp {
val sender=get[String]("eagle.alert.sender")
val recipient=get[String]("eagle.alert.recipient")
stream("hdfsAuditLogStream")
.from(KafakaSourceProvider())
.flatMap(parserFunc)
.alert("""from hdfsAuditLogEventStream[(src == '/tmp/')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into outputStream;""""),sender=sender,recipient=recipient)
}
/**
* Use siddhi for general process like transformation, filter, aggreation and so on
*/
SampleApp4 extends StreamApp {
val sender=get[String]("eagle.alert.sender")
val recipient=get[String]("eagle.alert.recipient")
stream("finalAlertStream",resource="finalAlertStream.json")
stream("hdfsAuditLogStream")
.from(KafakaSourceProvider())
.flatMap(parserFunc)
.process("from hdfsAuditLogEventStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into hdfsAlertStream;")
.process("from hdfsAlertStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into finalAlertStream;")
.alert()
}
/**
* Multi Alert Join
*/
SampleApp5 extends StreamApp {
val sender=get[String]("eagle.alert.sender")
val recipient=get[String]("eagle.alert.recipient")
val hdfsAlert=stream("hdfsAuditLogStream")
.from(KafakaSourceProvider())
.flatMap(parserFunc)
.alert("""from hdfsAuditLogEventStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into hdfsAlertStream;""""),sender=sender,recipient=recipient)
val hiveAlert=stream("hiveLogStream")
.from(KafakaSourceProvider())
.flatMap(parserFunc)
.alert("""from hiveLogStream[(user == 'somebody')]#window.externalTime(timestamp,10 min) select count(timestamp) as aggValue having aggValue >= 2 insert into hiveAlertStream;""""),sender=sender,recipient=recipient)
hdfsAlert.innerJoinByWindow(hiveAlert, "hdfsAlertStream.user == hiveAlertStream.user",window)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment