This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CDAP Pipelines are saved and restored using JSON format. This file can be imported as a new Pipeline using Studio. | |
Tony Duarte | |
Cask Training Specialist |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org; | |
import co.cask.cdap.api.spark.JavaSparkExecutionContext; | |
import co.cask.cdap.api.spark.JavaSparkMain; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import co.cask.cdap.api.app.AbstractApplication; | |
import co.cask.cdap.api.mapreduce.AbstractMapReduce; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def transform(record, emitter, context): (1)(2) | |
import sys (3) | |
# Debug write of record components to cdap.log file (4) | |
sys.stdout.write("XXXoffset: %i\n" % record['offset']) (5) | |
sys.stdout.write("XXXbody: %s\n" % record['body']) | |
# now write the unmodified record object to the next pipeline stage | |
emitter.emit(record) (6) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class CdapWorkflowApp extends AbstractApplication { | |
@Override | |
public void configure() { | |
addSpark(new MySparkRunr()); (1) | |
addMapReduce(new MyMR()); | |
addWorkflow(new MyWF()); (2) | |
scheduleWorkflow(Schedules.builder("every5Min").createTimeSchedule("*/5 * * * *"), "MyWF"); (3) | |
} | |
public static class MyWF extends AbstractWorkflow { (4) |