Skip to content

Instantly share code, notes, and snippets.

@emres
Created March 3, 2015 14:12
Show Gist options
  • Save emres/ec18ee264e4eb0dd8f1a to your computer and use it in GitHub Desktop.
Save emres/ec18ee264e4eb0dd8f1a to your computer and use it in GitHub Desktop.
public class SchemaValidatorDriver implements Driver, Serializable {
private String inDirectory;
private String avroSchemaName;
private String validatedJSONoutputDir = "";
private String rejectedJSONoutputDir = "";be
private String checkpointDirectory = "";
private int checkpointInterval = 0;
private static JavaStreamingContext createContext(String inDirectory, String avroSchemaName,
String validatedJSONoutputDir, String rejectedJSONoutputDir,
String checkpointDirectory, int checkpointInterval) {
SparkConf sparkConf = new SparkConf().setAppName("schemavalidator");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(2000));
// Set checkpoint directory
ssc.checkpoint(checkpointDirectory);
//scala interop requires this
ClassTag<LongWritable> longWritableTag = scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag<Text> textTag = scala.reflect.ClassTag$.MODULE$.apply(Text.class);
// We can alternatively use JsonInputFormat or AvroInputFormat
ClassTag<MultiLineJsonInputFormat> jsonInputTag = scala.reflect.ClassTag$.MODULE$.apply(MultiLineJsonInputFormat.class);
ClassTag<Tuple2<LongWritable, Text>> tuple2Tag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
// Create a raw InputDStream - can't directly use this from Java
InputDStream<Tuple2<LongWritable, Text>> inStream =
ssc.ssc().fileStream(inDirectory, new FileFilter(), false, longWritableTag, textTag, jsonInputTag);
// Wrap in a JavaInputDStream to make it usable
JavaInputDStream<Tuple2<LongWritable, Text>> jinStream =
JavaInputDStream.fromInputDStream(inStream, tuple2Tag);
JavaDStream<String> cleanStream = jinStream.map(new HadoopInputToStringWorker());
cleanStream.checkpoint(new Duration(10000));
SchemaValidatorDriver schemaValidatorApp = new SchemaValidatorDriver(inDirectory, avroSchemaName,
validatedJSONoutputDir, rejectedJSONoutputDir,
checkpointDirectory, checkpointInterval);
schemaValidatorApp.process(cleanStream);
return ssc;
}
public static void main(String[] args) throws ConfigurationException {
Logger logger = LoggerFactory.getLogger(SchemaValidatorDriver.class);
PropertiesConfiguration config = new PropertiesConfiguration();
if (System.getProperty("propertiesFile") != null) {
config.load(System.getProperty("propertiesFile"));
} else {
config.load("schemavalidator.properties");
}
final String validatedJSONoutputDir = config.getString("job.output.dir");
final String checkpointDirectory = config.getString("job.checkpoint.dir");
final String rejectedJSONoutputDir = config.getString("module.sv.rejected.dir");
final String avroSchemaName = config.getString("module.sv.schema.name");
final String appName = config.getString("spark.app.name");
int batchInterval = config.getInt("job.batch.interval");
int checkpointInterval = config.getInt("job.checkpoint.interval");
final String inDirectory;
if (args.length == 0) {
inDirectory = config.getString("job.input.dir");
} else {
inDirectory = args[1];
}
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(inDirectory, avroSchemaName,
validatedJSONoutputDir, rejectedJSONoutputDir,
checkpointDirectory, 10000);
}
};
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
ssc.ssc().sc().hadoopConfiguration().set("multilinejsoninputformat.member", "itemSet");
logger.warn("\n * * * hadoopConfiguration: " +
ssc.ssc().sc().hadoopConfiguration().get("multilinejsoninputformat.member"));
ssc.start();
ssc.awaitTermination();
}
public SchemaValidatorDriver(String inDirectory, String avroSchemaName, String validatedJSONoutputDir, String rejectedJSONoutputDir,
String checkpointDirectory, int checkpointInterval) {
this.inDirectory = inDirectory;
this.avroSchemaName = avroSchemaName;
this.validatedJSONoutputDir = validatedJSONoutputDir;
this.rejectedJSONoutputDir = rejectedJSONoutputDir;
this.checkpointDirectory = checkpointDirectory;
this.checkpointInterval = checkpointInterval;
}
@Override
public JavaDStream<String> initializeJob(JavaStreamingContext ssc) {
return JobInitializer.getStream(ssc, inDirectory, checkpointDirectory, checkpointInterval);
}
@Override public void process(JavaDStream<String> inStream) {
JavaDStream<String> json = inStream.map(new SchemaValidatorWorker(avroSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir));
json.persist(StorageLevel.MEMORY_AND_DISK_SER());
BigContentUtil.forceOutput(json);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment