Skip to content

Instantly share code, notes, and snippets.

@PrabodDunuwila
Created February 7, 2020 06:03
Show Gist options
  • Save PrabodDunuwila/1a1d1f17c4a73b95926016d1af3c9fe6 to your computer and use it in GitHub Desktop.
Save PrabodDunuwila/1a1d1f17c4a73b95926016d1af3c9fe6 to your computer and use it in GitHub Desktop.
@App:name("CDCMongoDBApp")
@App:description("Change Data Capture on MongoDB collection")
@source(type = 'cdc', url = 'jdbc:mongodb://mongo01/127.0.0.1:27017/StocksDB', username = 'wso2sp', password = 'wso2', table.name = 'stocks', operation = 'insert',
@map(type = 'keyvalue', @attributes(id = 'id', name = 'name', volume = 'volume', stockDetails = 'stockDetails') ))
define stream InsertStream (id string, name string, volume int, stockDetails string);
@source(type = 'cdc', url = 'jdbc:mongodb://mongo01/127.0.0.1:27017/StocksDB', username = 'wso2sp', password = 'wso2', table.name = 'stocks', operation = 'update',
@map(type = 'keyvalue', @attributes(id = 'id', name = 'name', volume = 'volume') ))
define stream UpdateStream (id string, name string, volume int);
@source(type = 'cdc', url = 'jdbc:mongodb://mongo01/127.0.0.1:27017/StocksDB', username = 'wso2sp', password = 'wso2', table.name = 'stocks', operation = 'delete',
@map(type = 'keyvalue', @attributes(id = 'id') ))
define stream DeleteStream (id string);
@sink(type = 'log')
define stream LogInsertStream (id string, name string, volume int, stockDetails string);
@sink(type = 'log')
define stream LogUpdateStream (id string, name string, volume int);
@sink(type = 'log')
define stream LogDeleteStream (id string);
@info(name = 'query insert')
from InsertStream
select id, name, volume, stockDetails
insert into LogInsertStream;
@info(name = 'query update')
from UpdateStream
select id, name, volume
insert into LogUpdateStream;
@info(name = 'query delete')
from DeleteStream
select id
insert into LogDeleteStream;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment