Skip to content

Instantly share code, notes, and snippets.

View Rajesh2015's full-sized avatar
🎯
Focusing

Rajesh Kumar Dash Rajesh2015

🎯
Focusing
View GitHub Profile
# Take first image only as there might have duplicate entryfor same image as same image mutiple instances of same food item
classification_df = annotation_df_model.groupby('image').first().reset_index()
# create a new volumn file name by concating the classname this will help us in using df for image data generator
classification_df['image_name'] = classification_df.apply(lambda row: f"{row['class_name']}/{row['image']}", axis=1)
# train test split with dividing the class in same propertion
train_df, temp_df = train_test_split(
classification_df,
test_size=0.2,
stratify=classification_df['class_name'], # ensures class distribution is preserved
random_state=42)
{
"events":[
{
"eventType": "someevent",
"number" : 1,
"startTime" : 1659343873429,
"endTime" : 1659343874329,
"duration" : 900,
"inspiratoryTime" : 424,
"Rate" : 50,
import json
import boto3
from datetime import datetime
s3 = boto3.resource('s3')
session = boto3.Session()
firehose_client = session.client("firehose")
def lambda_handler(event, context):
bucket_name_from_event = event['Records'][0]['s3']['bucket']['name']
import com.typesafe.config.ConfigFactory
import io.delta.tables.DeltaTable
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
object KafkaConsumerWithDeltaTable {
def main(args: Array[String]): Unit = {
val rootConfig = ConfigFactory.load("application.conf").getConfig("conf")
val s3Config = rootConfig.getConfig("s3_conf")
name := "streaming-examples"
version := "0.1"
scalaVersion := "2.11.8"
// Spark Core, Spark SQL and Spark Streaming dependencies
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.4"
name := "streaming-examples"
version := "0.1"
scalaVersion := "2.11.8"
// Spark Core, Spark SQL and Spark Streaming dependencies
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.4"
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.io.StdIn.readLine
class TxnKafkaProducer {
val newArgs: Array[String] = Array("20", "kafka-topic", "ec2-54-194-95-122.eu-west-1.compute.amazonaws.com:9092")
val events: Int = newArgs(0).toInt
import java.util.concurrent._
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import scala.collection.JavaConversions._
class Consumer(val brokers: String,
val groupId: String,
val topic: String) {
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic cricket