Created
March 20, 2017 22:54
-
-
Save MrTonyDuarte/fc5e73dc9f243b40831c9f34648b8286 to your computer and use it in GitHub Desktop.
CDAP Workflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This is run using the CDAP SDK, so you will need to have that installed (along with maven) to compile and run this code. | |
The Cask web site has documentation with full instructions for installation. | |
I have included the shell script that I use to compile and then run the code on my Mac - and it has hard-coded references to the SDK zip | |
file that you will want to update. The script assumes that it is co-located with the maven POM file and that the sdk was also | |
installed there - your enviroment will probably need to adjust those paths if you want to use the script. | |
$ chmod u+x ./compileRun.sh | |
$ ./compileRun.sh | |
I've also included my pom.xml - which is mostly just the standard reference POM for CDAP, but I put it in for completeness. | |
Tony Duarte | |
Cask Training Specalist |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org; | |
import co.cask.cdap.api.spark.JavaSparkExecutionContext; | |
import co.cask.cdap.api.spark.JavaSparkMain; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import co.cask.cdap.api.app.AbstractApplication; | |
import co.cask.cdap.api.mapreduce.AbstractMapReduce; | |
import co.cask.cdap.api.mapreduce.MapReduceContext; | |
import co.cask.cdap.api.schedule.Schedules; | |
import co.cask.cdap.api.spark.AbstractSpark; | |
import co.cask.cdap.api.workflow.AbstractWorkflow; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import java.util.Arrays; | |
public class CdapWorkflowApp extends AbstractApplication { | |
@Override | |
public void configure() { | |
addSpark(new MySparkRunr()); | |
addMapReduce(new MyMR()); | |
addWorkflow(new MyWF()); | |
scheduleWorkflow(Schedules.builder("every5Min").createTimeSchedule("*/5 * * * *"), "MyWF"); | |
} | |
public static class MyWF extends AbstractWorkflow { | |
@Override | |
public void configure() { | |
addSpark(MySparkRunr.class.getSimpleName()); | |
addMapReduce(MyMR.class.getSimpleName()); | |
} | |
} | |
public static class MySparkRunr extends AbstractSpark { | |
@Override | |
public void configure() { | |
setMainClass(MySparkMain.class); | |
} | |
} | |
public static class MySparkMain implements JavaSparkMain { | |
@Override | |
public void run(JavaSparkExecutionContext cdapJsec) throws Exception { | |
JavaSparkContext jsc = new JavaSparkContext(); | |
JavaRDD<String> myStrRdd = jsc.parallelize(Arrays.asList("1","2","3"),2); | |
// save data to a file - creates /tmp/SparkOut/part-00000 and /tmp/SparkOut/part-00001 | |
myStrRdd.saveAsTextFile("/tmp/SparkOut"); | |
jsc.stop (); | |
} | |
} | |
public static class MyMR extends AbstractMapReduce { | |
@Override | |
public void initialize() throws Exception { | |
MapReduceContext cdapContext = getContext(); | |
Job job = cdapContext.getHadoopJob(); | |
job.setMapperClass(Mapper.class); // Use Hadoop 2.x Identity Mapper and Reducer | |
job.setReducerClass(Reducer.class); | |
job.setMapOutputKeyClass(LongWritable.class); | |
job.setMapOutputValueClass(Text.class); | |
FileInputFormat.addInputPath(job, new Path("/tmp/SparkOut")); | |
// save data to a file - creates /tmp/HadoopOut/part-r-00000 | |
FileOutputFormat.setOutputPath(job, new Path("/tmp/HadoopOut")); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -x # echo commands as they are executed | |
############### | |
# Remove SDK and Unzip the SDK to unclutter the logs | |
# unzip in bg while running compilation | |
cdap sdk stop &>/dev/null | |
rm -r cdap-sdk-4.0.0 &>/dev/null | |
unzip cdap-sdk-4.0.0.zip &>/dev/null & | |
# Start CDAP standalone if not started | |
if cdap sdk status | grep -q -e "Connection refused" -e "not running" -e "does not appear to be running" ; then | |
cdap sdk stop | |
cdap sdk start | |
fi | |
############### | |
mvn package -DskipTests # Build maven project | |
# Clean up any data in /tmp | |
rm -r /tmp/SparkOut | |
rm -r /tmp/HadoopOut | |
# Clean up any old jars (quietly) | |
cdap cli stop app CdapWorkflowApp programs &>/dev/null | |
cdap cli delete app CdapWorkflowApp &>/dev/null | |
cdap cli delete artifact CdapWorkflowApp 1.0-SNAPSHOT &>/dev/null | |
# Now load new jar | |
cdap cli deploy app target/CdapWorkflowApp-1.0-SNAPSHOT.jar | |
cdap cli list artifacts | |
# start everything | |
cdap cli start workflow CdapWorkflowApp.MyWF # run once outside schedule | |
cdap cli resume schedule CdapWorkflowApp.every5Min # now start running on a schedule | |
# get info about | |
sleep 5; cdap cli get spark runs CdapWorkflowApp.MySparkRunr | |
cdap cli get spark status CdapWorkflowApp.MySparkRunr | |
cdap cli get mapreduce status CdapWorkflowApp.MyMR | |
cdap cli get mapreduce runs CdapWorkflowApp.MyMR | |
sleep 65; cdap cli get spark runs CdapWorkflowApp.MySparkRunr | |
cdap cli get spark status CdapWorkflowApp.MySparkRunr | |
cdap cli get mapreduce status CdapWorkflowApp.MyMR | |
cdap cli get mapreduce runs CdapWorkflowApp.MyMR | |
# display lots of status - most completely superfluous | |
cdap cli list apps # should show app id | |
cdap cli list mapreduce # should show mapreduce id | |
cdap cli list spark # to see spark id | |
cdap cli list workflows # | |
cdap cli get workflow schedules CdapWorkflowApp.MyWF # to see workflow id | |
cdap cli get schedule status CdapWorkflowApp.every5Min | |
cdap cli get mapreduce status CdapWorkflowApp.MyMR | |
cdap cli get spark status CdapWorkflowApp.MyWF.MySparkRunr | |
# get spark & mr status one last time before exiting | |
sleep 305; cdap cli get spark runs CdapWorkflowApp.MySparkRunr | |
cdap cli get spark status CdapWorkflowApp.MySparkRunr | |
cdap cli get mapreduce status CdapWorkflowApp.MyMR | |
cdap cli get mapreduce runs CdapWorkflowApp.MyMR | |
head /tmp/SparkOut/* | |
head /tmp/HadoopOut/* | |
# Stop | |
cdap cli suspend schedule CdapWorkflowApp.every5Min | |
echo "Read /opt/cdap/sdk-4.x/logs/cdap.log to see logging from your Application" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<!-- | |
Copyright © 2014-2015 Cask Data, Inc. | |
Licensed under the Apache License, Version 2.0 (the "License"); you may not | |
use this file except in compliance with the License. You may obtain a copy of | |
the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
License for the specific language governing permissions and limitations under | |
the License. | |
--> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>org</groupId> | |
<artifactId>CdapWorkflowApp</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<packaging>jar</packaging> | |
<name>CDAP WorkflowApp Java/MapReduce/Spark App Demo</name> | |
<properties> | |
<app.main.class>org.CdapWorkflowApp</app.main.class> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<cdap.version>4.0.0</cdap.version> | |
<spark.core.version>1.6.1</spark.core.version> | |
<slf4j.version>1.7.5</slf4j.version> | |
<guava.version>13.0.1</guava.version> | |
<gson.version>2.2.4</gson.version> | |
<hadoop.version>2.3.0</hadoop.version> | |
<!-- Don't compile any tests for tutorial code --> | |
<maven.test.skip>true</maven.test.skip> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>co.cask.cdap</groupId> | |
<artifactId>cdap-api</artifactId> | |
<version>${cdap.version}</version> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>co.cask.cdap</groupId> | |
<artifactId>cdap-api-spark</artifactId> | |
<version>${cdap.version}</version> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>co.cask.cdap</groupId> | |
<artifactId>cdap-unit-test</artifactId> | |
<version>${cdap.version}</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_2.10</artifactId> | |
<version>${spark.core.version}</version> | |
<scope>provided</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
<version>${slf4j.version}</version> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>com.google.guava</groupId> | |
<artifactId>guava</artifactId> | |
<version>${guava.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>com.google.code.gson</groupId> | |
<artifactId>gson</artifactId> | |
<version>${gson.version}</version> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-mapreduce-client-core</artifactId> | |
<version>${hadoop.version}</version> | |
<scope>provided</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>asm</groupId> | |
<artifactId>asm</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>io.netty</groupId> | |
<artifactId>netty</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-common</artifactId> | |
<version>${hadoop.version}</version> | |
<scope>provided</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>commons-logging</groupId> | |
<artifactId>commons-logging</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>log4j</groupId> | |
<artifactId>log4j</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.apache.avro</groupId> | |
<artifactId>avro</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.apache.zookeeper</groupId> | |
<artifactId>zookeeper</artifactId> | |
</exclusion> | |
<exclusion> | |
<artifactId>guava</artifactId> | |
<groupId>com.google.guava</groupId> | |
</exclusion> | |
<exclusion> | |
<artifactId>jersey-core</artifactId> | |
<groupId>com.sun.jersey</groupId> | |
</exclusion> | |
<exclusion> | |
<artifactId>jersey-json</artifactId> | |
<groupId>com.sun.jersey</groupId> | |
</exclusion> | |
<exclusion> | |
<artifactId>jersey-server</artifactId> | |
<groupId>com.sun.jersey</groupId> | |
</exclusion> | |
<exclusion> | |
<artifactId>servlet-api</artifactId> | |
<groupId>javax.servlet</groupId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.mortbay.jetty</groupId> | |
<artifactId>jetty</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.mortbay.jetty</groupId> | |
<artifactId>jetty-util</artifactId> | |
</exclusion> | |
<exclusion> | |
<artifactId>jasper-compiler</artifactId> | |
<groupId>tomcat</groupId> | |
</exclusion> | |
<exclusion> | |
<artifactId>jasper-runtime</artifactId> | |
<groupId>tomcat</groupId> | |
</exclusion> | |
<exclusion> | |
<artifactId>jsp-api</artifactId> | |
<groupId>javax.servlet.jsp</groupId> | |
</exclusion> | |
<exclusion> | |
<artifactId>slf4j-api</artifactId> | |
<groupId>org.slf4j</groupId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
</dependencies> | |
<build> | |
<pluginManagement> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-surefire-plugin</artifactId> | |
<version>2.14.1</version> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.1</version> | |
<configuration> | |
<source>1.7</source> | |
<target>1.7</target> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.felix</groupId> | |
<artifactId>maven-bundle-plugin</artifactId> | |
<version>2.3.7</version> | |
<extensions>true</extensions> | |
<configuration> | |
<archive> | |
<manifest> | |
<mainClass>${app.main.class}</mainClass> | |
</manifest> | |
</archive> | |
<instructions> | |
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency> | |
<Embed-Transitive>true</Embed-Transitive> | |
<Embed-Directory>lib</Embed-Directory> | |
</instructions> | |
</configuration> | |
<executions> | |
<execution> | |
<phase>package</phase> | |
<goals> | |
<goal>bundle</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</pluginManagement> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-surefire-plugin</artifactId> | |
<version>2.14.1</version> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.felix</groupId> | |
<artifactId>maven-bundle-plugin</artifactId> | |
<version>2.3.7</version> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment