Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
CDAP Workflow
This is run using the CDAP SDK, so you will need to have that installed (along with maven) to compile and run this code.
The Cask web site has documentation with full instructions for installation.
I have included the shell script that I use to compile and then run the code on my Mac - and it has hard-coded references to the SDK zip
file that you will want to update. The script assumes that it is co-located with the maven POM file and that the sdk was also
installed there - your enviroment will probably need to adjust those paths if you want to use the script.
$ chmod u+x ./compileRun.sh
$ ./compileRun.sh
I've also included my pom.xml - which is mostly just the standard reference POM for CDAP, but I put it in for completeness.
Tony Duarte
Cask Training Specalist
package org;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.schedule.Schedules;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.workflow.AbstractWorkflow;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.Arrays;
public class CdapWorkflowApp extends AbstractApplication {
@Override
public void configure() {
addSpark(new MySparkRunr());
addMapReduce(new MyMR());
addWorkflow(new MyWF());
scheduleWorkflow(Schedules.builder("every5Min").createTimeSchedule("*/5 * * * *"), "MyWF");
}
public static class MyWF extends AbstractWorkflow {
@Override
public void configure() {
addSpark(MySparkRunr.class.getSimpleName());
addMapReduce(MyMR.class.getSimpleName());
}
}
public static class MySparkRunr extends AbstractSpark {
@Override
public void configure() {
setMainClass(MySparkMain.class);
}
}
public static class MySparkMain implements JavaSparkMain {
@Override
public void run(JavaSparkExecutionContext cdapJsec) throws Exception {
JavaSparkContext jsc = new JavaSparkContext();
JavaRDD<String> myStrRdd = jsc.parallelize(Arrays.asList("1","2","3"),2);
// save data to a file - creates /tmp/SparkOut/part-00000 and /tmp/SparkOut/part-00001
myStrRdd.saveAsTextFile("/tmp/SparkOut");
jsc.stop ();
}
}
public static class MyMR extends AbstractMapReduce {
@Override
public void initialize() throws Exception {
MapReduceContext cdapContext = getContext();
Job job = cdapContext.getHadoopJob();
job.setMapperClass(Mapper.class); // Use Hadoop 2.x Identity Mapper and Reducer
job.setReducerClass(Reducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/tmp/SparkOut"));
// save data to a file - creates /tmp/HadoopOut/part-r-00000
FileOutputFormat.setOutputPath(job, new Path("/tmp/HadoopOut"));
}
}
}
#!/bin/bash
set -x # echo commands as they are executed
###############
# Remove SDK and Unzip the SDK to unclutter the logs
# unzip in bg while running compilation
cdap sdk stop &>/dev/null
rm -r cdap-sdk-4.0.0 &>/dev/null
unzip cdap-sdk-4.0.0.zip &>/dev/null &
# Start CDAP standalone if not started
if cdap sdk status | grep -q -e "Connection refused" -e "not running" -e "does not appear to be running" ; then
cdap sdk stop
cdap sdk start
fi
###############
mvn package -DskipTests # Build maven project
# Clean up any data in /tmp
rm -r /tmp/SparkOut
rm -r /tmp/HadoopOut
# Clean up any old jars (quietly)
cdap cli stop app CdapWorkflowApp programs &>/dev/null
cdap cli delete app CdapWorkflowApp &>/dev/null
cdap cli delete artifact CdapWorkflowApp 1.0-SNAPSHOT &>/dev/null
# Now load new jar
cdap cli deploy app target/CdapWorkflowApp-1.0-SNAPSHOT.jar
cdap cli list artifacts
# start everything
cdap cli start workflow CdapWorkflowApp.MyWF # run once outside schedule
cdap cli resume schedule CdapWorkflowApp.every5Min # now start running on a schedule
# get info about
sleep 5; cdap cli get spark runs CdapWorkflowApp.MySparkRunr
cdap cli get spark status CdapWorkflowApp.MySparkRunr
cdap cli get mapreduce status CdapWorkflowApp.MyMR
cdap cli get mapreduce runs CdapWorkflowApp.MyMR
sleep 65; cdap cli get spark runs CdapWorkflowApp.MySparkRunr
cdap cli get spark status CdapWorkflowApp.MySparkRunr
cdap cli get mapreduce status CdapWorkflowApp.MyMR
cdap cli get mapreduce runs CdapWorkflowApp.MyMR
# display lots of status - most completely superfluous
cdap cli list apps # should show app id
cdap cli list mapreduce # should show mapreduce id
cdap cli list spark # to see spark id
cdap cli list workflows #
cdap cli get workflow schedules CdapWorkflowApp.MyWF # to see workflow id
cdap cli get schedule status CdapWorkflowApp.every5Min
cdap cli get mapreduce status CdapWorkflowApp.MyMR
cdap cli get spark status CdapWorkflowApp.MyWF.MySparkRunr
# get spark & mr status one last time before exiting
sleep 305; cdap cli get spark runs CdapWorkflowApp.MySparkRunr
cdap cli get spark status CdapWorkflowApp.MySparkRunr
cdap cli get mapreduce status CdapWorkflowApp.MyMR
cdap cli get mapreduce runs CdapWorkflowApp.MyMR
head /tmp/SparkOut/*
head /tmp/HadoopOut/*
# Stop
cdap cli suspend schedule CdapWorkflowApp.every5Min
echo "Read /opt/cdap/sdk-4.x/logs/cdap.log to see logging from your Application"
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2014-2015 Cask Data, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org</groupId>
<artifactId>CdapWorkflowApp</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>CDAP WorkflowApp Java/MapReduce/Spark App Demo</name>
<properties>
<app.main.class>org.CdapWorkflowApp</app.main.class>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<cdap.version>4.0.0</cdap.version>
<spark.core.version>1.6.1</spark.core.version>
<slf4j.version>1.7.5</slf4j.version>
<guava.version>13.0.1</guava.version>
<gson.version>2.2.4</gson.version>
<hadoop.version>2.3.0</hadoop.version>
<!-- Don't compile any tests for tutorial code -->
<maven.test.skip>true</maven.test.skip>
</properties>
<dependencies>
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-api</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-api-spark</artifactId>
<version>${cdap.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-unit-test</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.core.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-core</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-json</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-server</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<artifactId>jasper-compiler</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-runtime</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jsp-api</artifactId>
<groupId>javax.servlet.jsp</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.14.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.3.7</version>
<extensions>true</extensions>
<configuration>
<archive>
<manifest>
<mainClass>${app.main.class}</mainClass>
</manifest>
</archive>
<instructions>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
<Embed-Directory>lib</Embed-Directory>
</instructions>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>bundle</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.14.1</version>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.3.7</version>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment