Skip to content

Instantly share code, notes, and snippets.

<workflow-app name="Variable list of sequential jobs" xmlns="uri:oozie:workflow:0.2">
<start to="java_A"/>
<action name="java_A">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.job.A</main-class>
</java>
<ok to="java_B"/>
<error to="kill"/>
test("Test generate sequential jobs") {
val xml = RunWorkflow
.getXMLString(new RunSequentialJobsWorkflow()
.createPipeline("com.job.A,com.job.B,com.job.C")) // naive way to interpolate ${JobsList}
// there are over 9999 elegant ways to do it in Scala
println(xml)
xml.length should be > 0
}
@Test
public void testRunAndDumpMetricsToConsole() throws Exception{
List<Users> users = readUsers();
reporter.start(10, TimeUnit.SECONDS);
IntStream.range(1, 20).forEach(i-> {
for(User user : users){
for(Trigger trigger: triggers){
if(System.getProperty("play", "notSet").equals("play")) {
one_day_ago = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'start_date': one_day_ago
}
dag = DAG(dag_id='my_pipeline', default_args=default_args)
generate_build_id = BuildIdOperator(dag=dag)
public class ScriptFactory{
private final Map<String, Script> cache = new HashMap<>()
private static final GroovyClassLoader CLASS_LOADER = new GroovyClassLoader()
private String template
/**
@param source is a dynamic expression to be evaluated: user.attr1 == 'x' || user.attr2 == 'y'
@return instance of script
*/
class MyBaseOperator(BaseOperator):
TASK_ID = 'BUILD_ID_TASK'
PIPELINE_BUILD_ID_KEY = 'BUILD_ID'
@apply_defaults
def __init__(self, *args, **kwargs):
super(MyBaseOperator, self).__init__(*args, **kwargs)
def execute(self, context):
lass UntarOperator(BashOperator):
"""
Download and unpack artifact
:param artifact_tar_gz_name: name of artifact from previous download step
:type url: string
:param lookup_task_id an id of task that downloaded artifact
:type user: string
:param password
// A builder with single spout and bolt. Spout could emit tuples using two streams:
// user event with fieldGouping with streamId UserEvent.class.getSimpleName()
// game event with streamId GameEvent.class.getSimpleName()
builder.setSpout(GenericEventSpout.class.getSimpleName(), new GenericEventSpout(), 2)
builder.setBolt(UserContextBolt.class.getSimpleName(), new UserContextBolt(), 2)
.addConfigurations(env)
.fieldsGrouping(GenericEventSpout.class.getSimpleName(), UserEvent.class.getSimpleName(), new Fields(Const.userId.name()))
.allGrouping(GenericEventSpout.class.getSimpleName(), GameEvent.class.getSimpleName());
// Here are stream declarations in Spout:
@seregasheypak
seregasheypak / group.scala
Created August 31, 2015 09:19
group and iterate over group values
def groupAndGenerateNewSurrogateKey: Pipe = {
pipe.groupBy('naturalKey){ group =>
group.mapStream[Long, (Long, Long)]('someValueField -> ('someValueField, 'newSurrogateKey)) { items: Iterator[Long] =>
val newSurrogateKey = KeyGen.generate()
println(s"new group key:[$newSurrogateKey]") //outputs generated key
println(s"items: ${items.toList}") //correctly outputs grouped items
items.map((_,newSurrogateKey)).toList
}
}.project('someValueField, 'newSurrogateKey)
}
//Typical put implmentation
public boolean put(SomeBean bean){
HTableInterface hTable = null;
try {
//Controller holds reference to HConnection.
//It's created once during servlet.init in servlet and shared across several controllers, just an object ref
HConnection hConnection = getConnection();
hTable = hConnection.getTable(getName());
return hTable.checkAndPut(createKey(bean), CF_B, CQ_B, null, createPut(bean));
}