Skip to content

Instantly share code, notes, and snippets.

@JLDLaughlin
Last active February 26, 2021 15:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JLDLaughlin/653e4d3a98c7a2fef4b7ed7c26b20163 to your computer and use it in GitHub Desktop.
Save JLDLaughlin/653e4d3a98c7a2fef4b7ed7c26b20163 to your computer and use it in GitHub Desktop.
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java
index 15c2af574..a7b76aebd 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java
@@ -52,7 +52,7 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn,
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
this.lastCommitLsn = lastCommitLsn;
- sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin());
+ sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin(), lastCommitLsn);
sourceInfoSchema = sourceInfo.schema();
this.lastSnapshotRecord = lastSnapshotRecord;
@@ -141,7 +141,7 @@ public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant c
public void updateCommitPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, TableId tableId, Long xmin) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
this.lastCommitLsn = lastCompletelyProcessedLsn;
- sourceInfo.update(lsn, commitTime, txId, tableId, xmin);
+ sourceInfo.update(lsn, commitTime, txId, tableId, xmin, lastCompletelyProcessedLsn);
}
boolean hasLastKnownPosition() {
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java
index 6f0d075f7..294e42039 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java
@@ -10,6 +10,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;
+import io.debezium.connector.postgresql.connection.Lsn;
public class PostgresSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {
@@ -23,6 +24,7 @@ public PostgresSourceInfoStructMaker(String connector, String version, CommonCon
.field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TXID_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.LSN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
+ .field(SourceInfo.SEQUENCE_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.XMIN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.build();
}
@@ -47,6 +49,17 @@ public Struct struct(SourceInfo sourceInfo) {
if (sourceInfo.lsn() != null) {
result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong());
}
+ if (sourceInfo.sequence() != null) {
+ StringBuilder sequence = new StringBuilder("[");
+ for (Lsn lsn : sourceInfo.sequence()) {
+ if (lsn != null) {
+ sequence.append(lsn.asLong());
+ sequence.append(",");
+ }
+ }
+ sequence.append("]");
+ result.put(SourceInfo.SEQUENCE_KEY, sequence.toString());
+ }
if (sourceInfo.xmin() != null) {
result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin());
}
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java
index 30b19a50e..7e0d7da5f 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java
@@ -75,11 +75,13 @@
public static final String TXID_KEY = "txId";
public static final String XMIN_KEY = "xmin";
public static final String LSN_KEY = "lsn";
+ public static final String SEQUENCE_KEY = "sequence";
public static final String LAST_SNAPSHOT_RECORD_KEY = "last_snapshot_record";
private final String dbName;
private Lsn lsn;
+ private Lsn[] sequence;
private Long txId;
private Long xmin;
private Instant timestamp;
@@ -103,6 +105,12 @@ protected SourceInfo(PostgresConnectorConfig connectorConfig) {
* @param xmin the xmin of the slot, may be null
* @return this instance
*/
+ protected SourceInfo update(Lsn lsn, Instant commitTime, Long txId, TableId tableId, Long xmin, Lsn lastCommitLsn) {
+ update(lsn, commitTime, txId, tableId, xmin);
+ this.sequence = new Lsn[]{ lastCommitLsn, lsn };
+ return this;
+ }
+
protected SourceInfo update(Lsn lsn, Instant commitTime, Long txId, TableId tableId, Long xmin) {
this.lsn = lsn;
if (commitTime != null) {
@@ -138,6 +146,10 @@ public Long xmin() {
return this.xmin;
}
+ public Lsn[] sequence() {
+ return this.sequence;
+ }
+
@Override
protected String database() {
return dbName;
@@ -179,6 +191,9 @@ public String toString() {
if (xmin != null) {
sb.append(", xmin=").append(xmin);
}
+ if (sequence != null) {
+ sb.append(", sequence=").append(sequence);
+ }
if (timestamp != null) {
sb.append(", timestamp=").append(timestamp);
}
diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java
index 9b799e3ac..8b702b552 100644
--- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java
+++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java
@@ -2466,6 +2466,71 @@ public void shouldProduceMessagesOnlyForConfiguredTables() throws Exception {
stopConnector();
}
+ private List<Long> getSequence(SourceRecord record) {
+ assertTrue(record.value() instanceof Struct);
+ Struct source = ((Struct) record.value()).getStruct("source");
+ String stringSequence = source.getString("sequence");
+ List<Long> sequence = new ArrayList<>();
+ assertEquals('[', stringSequence.charAt(0));
+ assertEquals(']', stringSequence.charAt(stringSequence.length() - 1));
+ for (String s : stringSequence.substring(1, stringSequence.length() - 1).split(",")) {
+ sequence.add(Long.parseLong(s.trim()));
+ }
+ return sequence;
+ }
+
+ @Test
+ public void shouldHaveLastCommitLsn() throws InterruptedException {
+ TestHelper.execute(SETUP_TABLES_STMT);
+ start(PostgresConnector.class, TestHelper.defaultConfig()
+ .with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V2)
+ .with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
+ .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
+ .build());
+ assertConnectorIsRunning();
+
+ waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
+ assertNoRecordsToConsume();
+
+ final int n_inserts = 3;
+ for (int i = 0; i < n_inserts; ++i) {
+ TestHelper.execute(INSERT_STMT);
+ }
+
+ List<SourceRecord> records = new ArrayList<>();
+ Awaitility.await("Skip empty transactions and find the data").atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords() * 3)).until(() -> {
+ int n_transactions = 0;
+ while (n_transactions < n_inserts) {
+ final List<SourceRecord> candidate = consumeRecordsByTopic(2).allRecordsInOrder();
+ if (candidate.get(1).topic().contains("transaction")) {
+ // empty transaction, should be skipped
+ continue;
+ }
+ records.addAll(candidate);
+ records.addAll(consumeRecordsByTopic(2).allRecordsInOrder());
+ ++n_transactions;
+ }
+ return true;
+ });
+
+ assertEquals(4 * n_inserts, records.size());
+ List<Long> second_transaction_sequence = getSequence(records.get(5));
+ assertEquals(second_transaction_sequence.size(), 2);
+ long second_last_commit_lsn = second_transaction_sequence.get(0);
+ assertEquals(second_last_commit_lsn, getSequence(records.get(6)).get(0).longValue());
+
+ List<Long> third_transaction_sequence = getSequence(records.get(9));
+ assertEquals(third_transaction_sequence.size(), 2);
+ long third_last_commit_lsn = third_transaction_sequence.get(0);
+ assertEquals(third_last_commit_lsn, getSequence(records.get(10)).get(0).longValue());
+
+ assertTrue(second_last_commit_lsn < third_last_commit_lsn);
+
+ // now stop the connector
+ stopConnector();
+ assertNoRecordsToConsume();
+ }
+
private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) {
String insertStmt = "INSERT INTO text_table(j, jb, x, u) " +
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " +
diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java
index b791bdf09..d956777b7 100644
--- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java
+++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java
@@ -72,6 +72,7 @@ public void schemaIsCorrect() {
.field("table", Schema.STRING_SCHEMA)
.field("txId", Schema.OPTIONAL_INT64_SCHEMA)
.field("lsn", Schema.OPTIONAL_INT64_SCHEMA)
+ .field("sequence", Schema.OPTIONAL_STRING_SCHEMA)
.field("xmin", Schema.OPTIONAL_INT64_SCHEMA)
.build();
@elindsey
Copy link

I couldn't figure out how to leave comments on gist line numbers, so leaving a few minor comments here instead. Looks pretty good! 🙂

+        if (sourceInfo.sequence() != null) {
+            List<Long> sequence = new ArrayList();
+            for (Lsn lsn : sourceInfo.sequence()) {
+                if (lsn != null) {
+                    sequence.add(lsn.asLong());
+                }
+            }
+            result.put(SourceInfo.SEQUENCE_KEY, sequence.toString());
+        }

^ This is an allocation or two for the ArrayList, two allocations for the Longs (since they autobox), then an allocation for the String. It likely isn't performance critical, but might as well write it tight if it's easy. 🙂
I think you can ditch the List and do a String.format() or append to a StringBuffer to get it in the exact output format you want with less overhead.

+    private Lsn[] sequence;

^ The downside to the array is that it loses all semantic meaning for what it contains, and it's easier to shoot yourself in the foot running off the end. Two other options are to expose lastCommitLsn directly or to make an explicit Sequence type. I see quirks with all three so don't have any strong opinion here, but maybe Brennan or upstream have better guidance.

+        for (String s : stringSequence.substring(1, stringSequence.length() - 1).split(",")) {

^ For a unit test like this, I would assert that character 0 is '[' and the final character is ']' instead of only skipping them. It'll catch regressions, but also communicates to test maintainers what the expected output looks like. In other languages I'd use a regex, but I've never found regexes in Java to be particularly legible...

@JLDLaughlin
Copy link
Author

Thank you, @elindsey!

I've updated the code per your first suggestion, but I used a StringBuilder instead of a StringBuffer (Debezium's checkstyle rules explicitly check that you aren't using StringBuffer). I've also updated the code per your third point!

To your second point, I totally agree! Gunnar had three suggestions for how this change could be implemented. I initially decided not to go with the struct because of the schema evolution issues @umanwizard pointed out in the thread. This decision worked out, because I later figured out that every field on a source object must be a scalar type. Debezium is working on support for CloudEvents, which requires source information to be sent as a CloudEvent. CloudEvents only supports scalar types, meaning that all source attributes must be scalar types, leaving me with the ambiguous string. (Alternatively, I could implement some sort of struct -> String translation in the CloudEvents code, but it would be the first time a source would have to do that.)

@elindsey
Copy link

Ahh, that makes sense! I missed the context on CloudEvents, thanks for the explanation. 🙂

@JLDLaughlin
Copy link
Author

No worries, sorry that I didn't include it with the gist!

@umanwizard
Copy link

+                if (lsn != null) {
+                    sequence.append(lsn.asLong());
+                    sequence.append(",");
+                }

I think we should actually write null in the output if one of the inputs is null, rather than just skipping it.

Other than that this looks fine, let's submit it to the DBZ folks and see what they think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment