Skip to content

Instantly share code, notes, and snippets.

@isopropylcyanide
isopropylcyanide / connector-update-fixing-rolled-over-binlogs.sh
Last active September 3, 2021 17:27
connector-update-fixing-rolled-over-binlogs.sh
curl -X PUT '{host}/connectors/{connector-name}/config' \
-H 'Content-Type: application/json' \
--data-raw '{
"name": "{connector-name}",
....
"snapshot.mode": "initial",
"table.whitelist": "T1,T2,T3",
"snapshot.select.statement.overrides": "T1",
"snapshot.select.statement.overrides.myDatabase.T1": "select * from T1 where created_at >= \"2021-02-01\"",
@isopropylcyanide
isopropylcyanide / connector-lostevent-kafka-reset-offset.sh
Last active September 3, 2021 17:23
connector-lostevent-kafka-reset-offset.sh
# Find the partition for the connector (server name is the key) that stores its latest offset. Let that be p
$ kafkacat -b localhost -C -t <connect.offsets.storage.topic> -f 'Partition(%p) %k \n %s\n'
# Sample output where the connector has stored offsets till 46242 and we've missed from 42000-45000
# Partition <p>
# ["my-db-connector",{"server":"server.key"}]
# {"ts_sec":,"file":"mysql-bin.000038","pos":46242, "row":1,"server_id":121,"event":3}
# Write a poison pill message: NULL to that partition. This essentially means to clear offsets
$ echo '["my-db-connector",{"server":"server.key"}]|' \|
@isopropylcyanide
isopropylcyanide / connector-update-fixing-rolled-over-binlogs-post.sh
Last active July 10, 2021 18:50
connector-update-fixing-rolled-over-binlogs-post.sh
# Current binlog = 1932
# Last binlog = 143
# Connector status: 131 < 143. So it is out of date
{"ts_sec":1610704790,"file":"my-database-bin.000004","pos":131,"row":1,"server_id":2001186,"event":2}
# Once you update the connector, it will take a snapshot and update to the earliest binlog
{"file":"my-database-bin.000004","pos":143}
# As it is processing the events and catches up, it will update the status.
@isopropylcyanide
isopropylcyanide / ConcurrentWorkExecutor.java
Last active April 12, 2021 21:16
Concurrently execute work and aggregate results using a completion service. Tests are added. In one file for brevity.
package com.random.aman;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletionService;
/**
* A concurrent work executor that blocks for the final result after individual execution results
* are obtained. The results are fed into the queue represented by completion service as they are
* getting completed. Note that this behavior can be changed by using a single threaded executor
* <p>
* If execution of any individual work results in an exception, an exception is raised
*/
static class OutOfOrderConcurrentWorkExecutor implements ConcurrentWorkExecutor {
@Override
public List<Integer> processExpensiveFunctionElegant(List<Integer> inputs, int firstK) {
try {
return concurrentWorkExecutor.splitJoin(firstK,
inputs,
mathService::process,
Collectors.toList());
} catch (Exception ex) {
log.error("Error processing expensive function", ex);
return null;
public List<PaymentMethods> getTopKPaymentMethodDetails(List<Integer> paymentIds, int k) {
try {
return concurrentWorkExecutor.splitJoin(k,
paymentIds,
paymentService::fetch,
Collectors.toList());
} catch (Exception ex) {
log.error("Error processing top K payment methods", ex);
return null;
public List<Integer> processExpensiveFunctionElegant(List<Integer> inputs, int firstK) {
try {
return concurrentWorkExecutor.splitJoin(firstK,
inputs,
mathService::process,
Collectors.toList());
} catch (Exception ex) {
log.error("Error processing expensive function ", ex);
return null;
public interface ConcurrentWorkExecutor {
/**
* Splits a work iterable and submits each task into an executor where the task input is obtained by
* applying a mapper function. The intermediate non null results are joined until a {@code size} number
* of results are obtained post which they are collected using a custom user defined collector.
*
* @param size the number of results to wait for
* @param iterable the work represented as an iterable
* @param mapper the transformation of individual item U within the iterable to a result T
public class WorkExecutor {
private final MathService mathService;
public List<Integer> processExpensiveFunction(List<Integer> inputs, int firstK) {
try {
List<Integer> outputs = new ArrayList<>();
//our basic executor
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService<Integer> executorService = new ExecutorCompletionService<>(executor);