This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public enum ResponseCondition implements Predicate<Integer> { | |
ON_RESPONSE_4XX_5XX { | |
@Override | |
public boolean test(Integer responseCode) { | |
return responseCode >= 400 && responseCode <= 599; | |
} | |
}, | |
ON_RESPONSE_4XX_5XX_NON_NOT_FOUND { | |
@Override |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"steps": [ | |
{ | |
"join_preparation": { | |
"select#": 1, | |
"steps": [ | |
{ | |
"IN_uses_bisection": true | |
}, | |
{ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
curl -X PUT '{host}/connectors/{connector-name}/config' \ | |
-H 'Content-Type: application/json' \ | |
--data-raw '{ | |
"name": "{connector-name}", | |
.... | |
"snapshot.mode": "initial", | |
"table.whitelist": "T1,T2,T3", | |
"snapshot.select.statement.overrides": "T1", | |
"snapshot.select.statement.overrides.myDatabase.T1": "select * from T1 where created_at >= \"2021-02-01\"", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Current binlog = 1932 | |
# Last binlog = 143 | |
# Connector status: 131 < 143. So it is out of date | |
{"ts_sec":1610704790,"file":"my-database-bin.000004","pos":131,"row":1,"server_id":2001186,"event":2} | |
# Once you update the connector, it will take a snapshot and update to the earliest binlog | |
{"file":"my-database-bin.000004","pos":143} | |
# As it is processing the events and catches up, it will update the status. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"state": "FAILED", | |
"trace": "org.apache.kafka.connect.errors.ConnectException: The connector is trying to read | |
binlog starting at binlog file 'my-database-bin.000004', pos=654822673, skipping 20 events | |
plus 1 rows, but this is no longer available on the server. Reconfigure the connector to | |
use a snapshot when needed.", | |
"worker_id": "10.33.10.196:8083", | |
"generation": 121 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
..... | |
{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException....","worker_id":"10.33.10.196:8083","generation":121} | |
{"state":"UNASSIGNED","trace":null,"worker_id":"10.33.10.196:8083","generation":121} | |
... Issue update | |
{"state":"RUNNING","trace":null,"worker_id":"10.33.10.196:8083","generation":121} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Current binlog = my-database-bin.000004. In sync but the host has changed. | |
{"ts_sec":1610704790,"file":"my-database-bin.000004","pos":1932,"row":1,"server_id":2001186,"event":2} | |
-- Once you update the connector, it will take a snapshot and update to the earliest binlog | |
{"file":"new-host-bin.000001","pos":2000} | |
-- As it is processing the events, it will update the status. Notice the new server id | |
{"ts_sec":1610708790,"file":"new-host-bin.000001","pos":2001,"row":1,"server_id":1973403,"event":2} | |
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Note the oldest binlog file position | |
mysql -u <> -h <> -e "show binary logs"; | |
# Know the offset in the oldest binlog and its timestamp T | |
mysqlbinlog -vv /usr/local/var/mysql/mysql-bin.000038 --base64-output=DECODE-ROWS | less --base64-output=DECODE-ROWS | |
# If timestamp of lost event <= T, events have not rolled over (yet). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Find the partition for the connector (server name is the key) that stores its latest offset. Let that be p | |
$ kafkacat -b localhost -C -t <connect.offsets.storage.topic> -f 'Partition(%p) %k \n %s\n' | |
# Sample output where the connector has stored offsets till 46242 and we've missed from 42000-45000 | |
# Partition <p> | |
# ["my-db-connector",{"server":"server.key"}] | |
# {"ts_sec":,"file":"mysql-bin.000038","pos":46242, "row":1,"server_id":121,"event":3} | |
# Write a poison pill message: NULL to that partition. This essentially means to clear offsets | |
$ echo '["my-db-connector",{"server":"server.key"}]|' \| |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- myTable has over millions of rows | |
-- create index on a column first | |
create index idx_updated_at on `myDatabase.myTable`(updated_at); | |
-- now the column can be safely used as an override | |
-- "snapshot.select.statement.overrides.myDatabase.myTable": | |
-- "select * from `myDatabase.myTable` where updated_at > \"2021-02-01 18:00:00\"", |