Last active
July 19, 2018 02:15
-
-
Save abhisheyke/6f838adf6651491bd4f263956f403c74 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Brief version of code... | |
val netflow = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", bootstrapServers) // kafka:9092 | |
.option("value.deserializer", classOf[StringDeserializer].getCanonicalName) | |
.option("startingOffsets", "latest") | |
.option("failOnDataLoss", false) | |
.option(subscribeType, "topic1") | |
.load().select(from_json(functions.col("value").cast("string"), NetflowStructType.netflowMsg, jsonOptions).alias("nftable")) | |
.select("nftable.*").na.fill(defaultValues) | |
.withWatermark("timestamp", "3 minutes") | |
.groupBy(window(col("timestamp"), "15 minutes", "15 minutes"), | |
col("src_host_name") | |
).agg( | |
sum("netflow.in_bytes").alias("total_in_bytes"), | |
sum("netflow.in_pkts").alias("total_in_pkts"), | |
last("src_network_name").alias("src_network_name"), | |
max("netflow.last_switched").alias("last_flow_time"), | |
min("authorized").alias("authorized"), | |
count("*").alias("total_count"), | |
current_timestamp().alias("interval_start_time") | |
).selectExpr("CAST(src_host_name AS STRING) AS key", "to_json(struct(*)) AS value") | |
.writeStream.format("kafka") | |
.option("kafka.bootstrap.servers", bootstrapServers) //kafka:9092 | |
.option("topic", "topic2") | |
.start() | |
.awaitTermination() | |
Spark Job submitter to Kubernetes .. | |
bin/spark-submit --master k8s://$K8S_API_SERVER --deploy-mode cluster --name $SPARK_SUBMIT_CLIENT_NAME \ | |
--class com.aggr.Aggregator \ | |
--conf spark.streaming.blockInterval=1000 \ | |
--conf spark.locality.wait=1s \ | |
--conf spark.streaming.kafka.maxRatePerPartition=$KAFKA_MAX_RATE_PER_PARTITION \ | |
--conf spark.executor.memoryOverhead=$EXECUTOR_MEMORY_OVERHEAD \ | |
--conf spark.kubernetes.driver.label.app=$SPARK_SUBMIT_CLIENT_NAME \ | |
--conf spark.kubernetes.executor.label.app=$SPARK_SUBMIT_CLIENT_NAME \ | |
--conf spark.kubernetes.driver.pod.name=sape-$SPARK_SUBMIT_CLIENT_NAME-driver \ | |
--conf spark.kubernetes.submission.waitAppCompletion=true \ | |
--conf spark.executor.memory=$EXECUTOR_HEAP_SPACE \ | |
--conf spark.driver.memory=$DRIVER_HEAP_SPACE \ | |
--conf spark.executor.instances=$EXECUTOR_INSTANCES \ | |
--conf spark.kubernetes.container.image=$DOCKER_IMAGE_URL \ | |
--conf spark.kubernetes.namespace=$K8S_NAMESPACE \ | |
--conf spark.kubernetes.driverEnv.TZ=$TZ \ | |
--conf spark.kubernetes.container.image.pullPolicy=$IMAGE_PULLPOLICY \ | |
--conf spark.kubernetes.driver.limit.cores=$DRIVER_CPU_LIMIT \ | |
--conf spark.kubernetes.executor.limit.cores=$EXECUTOR_CPU_LIMIT \ | |
--conf spark.driver.cores=$DRIVER_CPU_REQUEST \ | |
--conf spark.executor.cores=$EXECUTOR_CPU_REQUEST \ | |
local:///usr/local/uber-aggregation.jar |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment