Skip to content

Instantly share code, notes, and snippets.

🏠
Working from home

Neenad Ingole ninadingole

🏠
Working from home
Block or report user

Report or block ninadingole

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
@ninadingole
ninadingole / sbt.scala
Last active Mar 17, 2019
Organising Scala Tests In SBT
View sbt.scala
#########################################
#### Inside project/E2E.class object ####
#########################################
import sbt.{Configuration, Defaults, Test, inConfig}
import sbt._
import sbt.Keys._
object E2E {
final val E2ETest = Configuration.of("EndToEndTest", "e2e") extend (Test)
View rkafka.prm
REPLICAT rkafka
TARGETDB LIBFILE libggjava.so SET property=dirprm/rkafka_handler.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
MAP *.*.*, TARGET *.*.*;
View rkafka_handler.props
gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
# See http://docs.oracle.com/goldengate/bd123110/gg-bd/GADBD/using-kafka-connect-handler.htm#GUID-A87CAFFA-DACF-43A0-8C6C-5C64B578D606
gg.handler.kafkaconnect.topicMappingTemplate=ora-ogg-${schemaName}-${tableName}-avro
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
View kafkaconnect.properties
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
#value.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schemas.enable=false
#value.converter.schemas.enable=false
View event_processor.scala
def buildEventStream = {
import AppSerdes.movieBEventSerde.eventConsumed
builder.stream[Int, BusinessEvent]("events")
}
private val eventStreams: KStreamS[Int, BusinessEvent] = buildEventStream
def filterEventsByType(eventType: String): KStreamS[Int, BusinessEvent] = {
eventStreams.filter((_: Int, event: BusinessEvent) => event.eventType.equalsIgnoreCase(eventType))
}
View cdc_processor.scala
private def buildMovieStream: KStreamS[String, movie.Envelope] = {
import AppSerdes.movieSerde.consumed
builder.stream[String, movie.Envelope](Utils.getTopic("movies"))
}
private def buildMovieSalesStream = {
import AppSerdes.movieSalesSerde.consumed
builder.stream[String, Envelope](Utils.getTopic("sales"))
}
View list_topics.sh
$>./bin/kafka-topics --zookeeper localhost:2181 --list
__confluent.support.metrics
__consumer_offsets
_confluent-ksql-default__command_topic
_schemas
connect-configs
connect-offsets
connect-status
connect-statuses
dbhistory.inventory
View movie_connector.json
{
"name": "moviesdemo-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
View give_permissions.sh
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
View mysql_conf.ini
[mysqld]
server-id = 223344 # any random number
log_bin = mysql-bin
binlog_format = row # use row or ROW
binlog_row_image = full # use full or FULL
expire_logs_days = 10
gtid_mode = on # (optional)
enforce_gtid_consistency = on # (optional)
You can’t perform that action at this time.