-
-
Save emres/ec18ee264e4eb0dd8f1a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SchemaValidatorDriver implements Driver, Serializable { | |
private String inDirectory; | |
private String avroSchemaName; | |
private String validatedJSONoutputDir = ""; | |
private String rejectedJSONoutputDir = "";be | |
private String checkpointDirectory = ""; | |
private int checkpointInterval = 0; | |
private static JavaStreamingContext createContext(String inDirectory, String avroSchemaName, | |
String validatedJSONoutputDir, String rejectedJSONoutputDir, | |
String checkpointDirectory, int checkpointInterval) { | |
SparkConf sparkConf = new SparkConf().setAppName("schemavalidator"); | |
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(2000)); | |
// Set checkpoint directory | |
ssc.checkpoint(checkpointDirectory); | |
//scala interop requires this | |
ClassTag<LongWritable> longWritableTag = scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class); | |
ClassTag<Text> textTag = scala.reflect.ClassTag$.MODULE$.apply(Text.class); | |
// We can alternatively use JsonInputFormat or AvroInputFormat | |
ClassTag<MultiLineJsonInputFormat> jsonInputTag = scala.reflect.ClassTag$.MODULE$.apply(MultiLineJsonInputFormat.class); | |
ClassTag<Tuple2<LongWritable, Text>> tuple2Tag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class); | |
// Create a raw InputDStream - can't directly use this from Java | |
InputDStream<Tuple2<LongWritable, Text>> inStream = | |
ssc.ssc().fileStream(inDirectory, new FileFilter(), false, longWritableTag, textTag, jsonInputTag); | |
// Wrap in a JavaInputDStream to make it usable | |
JavaInputDStream<Tuple2<LongWritable, Text>> jinStream = | |
JavaInputDStream.fromInputDStream(inStream, tuple2Tag); | |
JavaDStream<String> cleanStream = jinStream.map(new HadoopInputToStringWorker()); | |
cleanStream.checkpoint(new Duration(10000)); | |
SchemaValidatorDriver schemaValidatorApp = new SchemaValidatorDriver(inDirectory, avroSchemaName, | |
validatedJSONoutputDir, rejectedJSONoutputDir, | |
checkpointDirectory, checkpointInterval); | |
schemaValidatorApp.process(cleanStream); | |
return ssc; | |
} | |
public static void main(String[] args) throws ConfigurationException { | |
Logger logger = LoggerFactory.getLogger(SchemaValidatorDriver.class); | |
PropertiesConfiguration config = new PropertiesConfiguration(); | |
if (System.getProperty("propertiesFile") != null) { | |
config.load(System.getProperty("propertiesFile")); | |
} else { | |
config.load("schemavalidator.properties"); | |
} | |
final String validatedJSONoutputDir = config.getString("job.output.dir"); | |
final String checkpointDirectory = config.getString("job.checkpoint.dir"); | |
final String rejectedJSONoutputDir = config.getString("module.sv.rejected.dir"); | |
final String avroSchemaName = config.getString("module.sv.schema.name"); | |
final String appName = config.getString("spark.app.name"); | |
int batchInterval = config.getInt("job.batch.interval"); | |
int checkpointInterval = config.getInt("job.checkpoint.interval"); | |
final String inDirectory; | |
if (args.length == 0) { | |
inDirectory = config.getString("job.input.dir"); | |
} else { | |
inDirectory = args[1]; | |
} | |
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { | |
@Override | |
public JavaStreamingContext create() { | |
return createContext(inDirectory, avroSchemaName, | |
validatedJSONoutputDir, rejectedJSONoutputDir, | |
checkpointDirectory, 10000); | |
} | |
}; | |
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); | |
ssc.ssc().sc().hadoopConfiguration().set("multilinejsoninputformat.member", "itemSet"); | |
logger.warn("\n * * * hadoopConfiguration: " + | |
ssc.ssc().sc().hadoopConfiguration().get("multilinejsoninputformat.member")); | |
ssc.start(); | |
ssc.awaitTermination(); | |
} | |
public SchemaValidatorDriver(String inDirectory, String avroSchemaName, String validatedJSONoutputDir, String rejectedJSONoutputDir, | |
String checkpointDirectory, int checkpointInterval) { | |
this.inDirectory = inDirectory; | |
this.avroSchemaName = avroSchemaName; | |
this.validatedJSONoutputDir = validatedJSONoutputDir; | |
this.rejectedJSONoutputDir = rejectedJSONoutputDir; | |
this.checkpointDirectory = checkpointDirectory; | |
this.checkpointInterval = checkpointInterval; | |
} | |
@Override | |
public JavaDStream<String> initializeJob(JavaStreamingContext ssc) { | |
return JobInitializer.getStream(ssc, inDirectory, checkpointDirectory, checkpointInterval); | |
} | |
@Override public void process(JavaDStream<String> inStream) { | |
JavaDStream<String> json = inStream.map(new SchemaValidatorWorker(avroSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); | |
json.persist(StorageLevel.MEMORY_AND_DISK_SER()); | |
BigContentUtil.forceOutput(json); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment