Skip to content

Instantly share code, notes, and snippets.

@abhisheyke
Last active July 19, 2018 02:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abhisheyke/6f838adf6651491bd4f263956f403c74 to your computer and use it in GitHub Desktop.
Save abhisheyke/6f838adf6651491bd4f263956f403c74 to your computer and use it in GitHub Desktop.
// Brief version of code...
val netflow = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers) // kafka:9092
.option("value.deserializer", classOf[StringDeserializer].getCanonicalName)
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.option(subscribeType, "topic1")
.load().select(from_json(functions.col("value").cast("string"), NetflowStructType.netflowMsg, jsonOptions).alias("nftable"))
.select("nftable.*").na.fill(defaultValues)
.withWatermark("timestamp", "3 minutes")
.groupBy(window(col("timestamp"), "15 minutes", "15 minutes"),
col("src_host_name")
).agg(
sum("netflow.in_bytes").alias("total_in_bytes"),
sum("netflow.in_pkts").alias("total_in_pkts"),
last("src_network_name").alias("src_network_name"),
max("netflow.last_switched").alias("last_flow_time"),
min("authorized").alias("authorized"),
count("*").alias("total_count"),
current_timestamp().alias("interval_start_time")
).selectExpr("CAST(src_host_name AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers) //kafka:9092
.option("topic", "topic2")
.start()
.awaitTermination()
Spark Job submitter to Kubernetes ..
bin/spark-submit --master k8s://$K8S_API_SERVER --deploy-mode cluster --name $SPARK_SUBMIT_CLIENT_NAME \
--class com.aggr.Aggregator \
--conf spark.streaming.blockInterval=1000 \
--conf spark.locality.wait=1s \
--conf spark.streaming.kafka.maxRatePerPartition=$KAFKA_MAX_RATE_PER_PARTITION \
--conf spark.executor.memoryOverhead=$EXECUTOR_MEMORY_OVERHEAD \
--conf spark.kubernetes.driver.label.app=$SPARK_SUBMIT_CLIENT_NAME \
--conf spark.kubernetes.executor.label.app=$SPARK_SUBMIT_CLIENT_NAME \
--conf spark.kubernetes.driver.pod.name=sape-$SPARK_SUBMIT_CLIENT_NAME-driver \
--conf spark.kubernetes.submission.waitAppCompletion=true \
--conf spark.executor.memory=$EXECUTOR_HEAP_SPACE \
--conf spark.driver.memory=$DRIVER_HEAP_SPACE \
--conf spark.executor.instances=$EXECUTOR_INSTANCES \
--conf spark.kubernetes.container.image=$DOCKER_IMAGE_URL \
--conf spark.kubernetes.namespace=$K8S_NAMESPACE \
--conf spark.kubernetes.driverEnv.TZ=$TZ \
--conf spark.kubernetes.container.image.pullPolicy=$IMAGE_PULLPOLICY \
--conf spark.kubernetes.driver.limit.cores=$DRIVER_CPU_LIMIT \
--conf spark.kubernetes.executor.limit.cores=$EXECUTOR_CPU_LIMIT \
--conf spark.driver.cores=$DRIVER_CPU_REQUEST \
--conf spark.executor.cores=$EXECUTOR_CPU_REQUEST \
local:///usr/local/uber-aggregation.jar
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment