Skip to content

Instantly share code, notes, and snippets.

@devacto
Created May 23, 2020 07:50
Show Gist options
  • Save devacto/2c2b4268bd0d1052a11f0e2e7af9cc6e to your computer and use it in GitHub Desktop.
Save devacto/2c2b4268bd0d1052a11f0e2e7af9cc6e to your computer and use it in GitHub Desktop.
## Bank Mandiri KSQL POT
### Download the MySQL Driver
`curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.17.tar.gz" | tar -xzf - -C /Users/victor/desktop/mandiri-ksql-pot/mysql-driver --strip-components=1 mysql-connector-java-8.0.17/mysql-connector-java-8.0.17-bin.jar`
### Deploy Confluent Platform using Docker
`docker-compose up --build`
### Deploy Customer Source Kafka Connector
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @source-connector-configs/mysql_customer_source_connector_config.json`
### Deploy Transaction Source Kafka Connector
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @source-connector-configs/mysql_transaction_source_connector_config.json`
### Deploy Response Source Kafka Connector
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @source-connector-configs/mysql_response_source_connector_config.json`
### Deploy Customer Sink Kafka Connector
`curl --silent --request POST http://localhost:8083/connectors/ --header 'Content-Type: application/json' --data @sink-connector-configs/mysql_customer_sink_connector.json`
### KSQL: Register the Customer table
`CREATE TABLE customer WITH (KAFKA_TOPIC='mysql_customer', VALUE_FORMAT ='AVRO', KEY='account_number');`
CREATE TABLE customer_table_rekey WITH (KAFKA_TOPIC='mysql_customer', VALUE_FORMAT ='AVRO', KEY='phone_number');
`SELECT ROWKEY, account_number FROM customer;`
### KSQL: Create Response Stream
`CREATE STREAM response WITH (KAFKA_TOPIC='mysql_response', VALUE_FORMAT ='AVRO');`
### KSQL: Increase Credit Limit Stream
`SELECT c.account_number, c.name, c.attended_last_trade_show, c.phone_number, c.credit_card_limit, c.updated_at, r.text FROM response_rekey r LEFT JOIN customer_table_rekey c ON r.phone_number = c.phone_number WHERE r.text = 'INCREASEMYLIMIT';`
### KSQL: Create Transaction Stream
`CREATE STREAM trans WITH (KAFKA_TOPIC='mysql_transaction',VALUE_FORMAT='AVRO');`
### KSQL: Create Enriched Transaction Stream
`CREATE STREAM trans_enriched WITH (PARTITIONS=1) AS SELECT t.id, t.account_number, t.merchant, t.amount, c.name, c.credit_card_limit FROM trans t LEFT JOIN customer c ON t.ACCOUNT_NUMBER = c.ACCOUNT_NUMBER;`
### KSQL: Create Table for Total Trans in 30 Days for 1 User
`CREATE TABLE month_trans_by_account AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount FROM trans t WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-08-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-08-31','yyyy-MM-dd') GROUP BY t.account_number;`
### KSQL: Hypermart Transactions in 12 months
`CREATE TABLE spbu_trans_by_account AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount FROM trans t WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-01-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-12-31','yyyy-MM-dd') and t.merchant = 'SPBU' GROUP BY t.account_number;`
`CREATE TABLE hypermart_trans_by_account AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount FROM trans t WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-06-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-08-31','yyyy-MM-dd') and t.merchant = 'Hypermart' and sum_amount > 2000 GROUP BY t.account_number;`
create table voucher_customers as select
create table hypermart_eligible_customers as select * from spbu_trans_by_account s left join large_limit_customer l on s.account_number = l.account_number;
create table large_credit_usage_keyed as select *, cast(account_number as key) from large_credit_usage partition by key;
create table hypermart_promo_customers as select * from hypermart_eligible_customers c left join LARGE_CREDIT_USAGE_BY_ACCOUNT_NUMBER h on c.rowkey = h.rowkey;
CREATE STREAM LARGE_CREDIT_USAGE_STREAM WITH (KAFKA_TOPIC='LARGE_CREDIT_USAGE', VALUE_FORMAT='AVRO');
CREATE STREAM LARGE_CREDIT_USAGE_STREAM_BY_ACCOUNT_NUMBER AS SELECT * FROM LARGE_CREDIT_USAGE_STREAM PARTITION BY ACCOUNT_NUMBER;
CREATE TABLE LARGE_CREDIT_USAGE_BY_ACCOUNT_NUMBER WITH (KAFKA_TOPIC='LARGE_CREDIT_USAGE_STREAM_BY_ACCOUNT_NUMBER', VALUE_FORMAT='AVRO');
CREATE TABLE HYPERMART_VOUCHER_MESSAGE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'HYPERMART_VOUCHER_MESSAGE') AS SELECT ('Anda mendapatkan voucher hypermart sebesar 1 juta.')"MESSAGE", hypermart_promo_customers.PHONE_NUMBER "PHONE_NUMBER" FROM hypermart_promo_customers hypermart_promo_customers;
CREATE TABLE HYPERMART_VOUCHER_MESSAGE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'HYPERMART_VOUCHER_MESSAGE') AS SELECT ('Anda mendapatkan voucher hypermart sebesar 1 juta.')"MESSAGE", hypermart_promo_customers.H_PHONE_NUMBER "PHONE_NUMBER" FROM hypermart_promo_customers hypermart_promo_customers;
### KSQL: Create Stream for Sum Credit Usage in 30 Days for one user
`CREATE TABLE credit_usage AS SELECT t.account_number, COUNT(t.id) AS transaction_count, SUM(t.amount) AS sum_amount, c.credit_card_limit AS cc_limit, c.phone_number, (SUM(CAST(t.amount as double)) / (CAST(c.credit_card_limit as double)) * 100.00) AS credit_usage FROM trans t LEFT JOIN customer c ON t.account_number = c.account_number WHERE t.time BETWEEN STRINGTOTIMESTAMP('2019-08-01','yyyy-MM-dd') and STRINGTOTIMESTAMP('2019-08-31','yyyy-MM-dd') GROUP BY t.account_number, c.credit_card_limit, c.phone_number;`
### KSQL: Create Stream for 80% Credit Line Usage
`CREATE TABLE large_credit_usage as SELECT * from credit_usage where credit_usage > 80;`
### KSQL: Send Message for Large Credit Usage
`create table increase_credit_message as select 'Pemakaian kartu kredit untuk account ' + cast(T_ACCOUNT_NUMBER as string) + ' telah mencapai ' + cast(credit_usage as string) + ' persen. Untuk menaikkan limit kartu kredit anda ke ' + cast(cc_limit + 100000 as string) + ', reply dengan INCREASEMYLIMIT.' as message, phone_number from large_credit_usage;`
### To use KSQL CLI from current Docker setup
`docker exec -it ksql-cli /bin/bash`
`ksql http://ksql-server:8088`
To detach from container: Ctrl-P, Ctrl-Q
### KSQL: To process data from beginning of each Kafka topic
`SET 'auto.offset.reset'='earliest';`
### Prune Docker resources after they are being used
`docker container prune && docker network prune && docker volume prune`
### Kafka Delete Topic
`/Users/victor/Downloads/from_desktop/drc-demo/confluent-5.3.0/bin/kafka-topics --delete --zookeeper localhost:2181 --topic mysql_transaction`
`/Users/victor/Downloads/from_desktop/drc-demo/confluent-5.3.0/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic mysql_customer --from-beginning`
@devacto
Copy link
Author

devacto commented May 23, 2020

{
"name": "mysql-customer-source-connector",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql:3306/transaction",
    "connection.user": "root",
    "connection.password": "example",
    "table.whitelist": "customer",
    "mode": "timestamp",
    "timestamp.column.name": "updated_at",
    "validate.non.null": true,
    "poll.interval.ms": "500",
    "topic.prefix": "mysql_",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enable": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true",
    "db.timezone": "Asia/Jakarta",
    "transforms":"createKey,extractInt",
    "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields":"account_number",
    "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field":"account_number"
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment