This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
curl -X PUT '{host}/connectors/{connector-name}/config' \ | |
-H 'Content-Type: application/json' \ | |
--data-raw '{ | |
"name": "{connector-name}", | |
.... | |
"snapshot.mode": "initial", | |
"table.whitelist": "T1,T2,T3", | |
"snapshot.select.statement.overrides": "T1", | |
"snapshot.select.statement.overrides.myDatabase.T1": "select * from T1 where created_at >= \"2021-02-01\"", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Find the partition for the connector (server name is the key) that stores its latest offset. Let that be p | |
$ kafkacat -b localhost -C -t <connect.offsets.storage.topic> -f 'Partition(%p) %k \n %s\n' | |
# Sample output where the connector has stored offsets till 46242 and we've missed from 42000-45000 | |
# Partition <p> | |
# ["my-db-connector",{"server":"server.key"}] | |
# {"ts_sec":,"file":"mysql-bin.000038","pos":46242, "row":1,"server_id":121,"event":3} | |
# Write a poison pill message: NULL to that partition. This essentially means to clear offsets | |
$ echo '["my-db-connector",{"server":"server.key"}]|' \| |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Current binlog = 1932 | |
# Last binlog = 143 | |
# Connector status: 131 < 143. So it is out of date | |
{"ts_sec":1610704790,"file":"my-database-bin.000004","pos":131,"row":1,"server_id":2001186,"event":2} | |
# Once you update the connector, it will take a snapshot and update to the earliest binlog | |
{"file":"my-database-bin.000004","pos":143} | |
# As it is processing the events and catches up, it will update the status. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.random.aman; | |
import com.google.common.collect.Lists; | |
import lombok.AllArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.CompletionService; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A concurrent work executor that blocks for the final result after individual execution results | |
* are obtained. The results are fed into the queue represented by completion service as they are | |
* getting completed. Note that this behavior can be changed by using a single threaded executor | |
* <p> | |
* If execution of any individual work results in an exception, an exception is raised | |
*/ | |
static class OutOfOrderConcurrentWorkExecutor implements ConcurrentWorkExecutor { | |
@Override |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public List<Integer> processExpensiveFunctionElegant(List<Integer> inputs, int firstK) { | |
try { | |
return concurrentWorkExecutor.splitJoin(firstK, | |
inputs, | |
mathService::process, | |
Collectors.toList()); | |
} catch (Exception ex) { | |
log.error("Error processing expensive function", ex); | |
return null; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public List<PaymentMethods> getTopKPaymentMethodDetails(List<Integer> paymentIds, int k) { | |
try { | |
return concurrentWorkExecutor.splitJoin(k, | |
paymentIds, | |
paymentService::fetch, | |
Collectors.toList()); | |
} catch (Exception ex) { | |
log.error("Error processing top K payment methods", ex); | |
return null; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public List<Integer> processExpensiveFunctionElegant(List<Integer> inputs, int firstK) { | |
try { | |
return concurrentWorkExecutor.splitJoin(firstK, | |
inputs, | |
mathService::process, | |
Collectors.toList()); | |
} catch (Exception ex) { | |
log.error("Error processing expensive function ", ex); | |
return null; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface ConcurrentWorkExecutor { | |
/** | |
* Splits a work iterable and submits each task into an executor where the task input is obtained by | |
* applying a mapper function. The intermediate non null results are joined until a {@code size} number | |
* of results are obtained post which they are collected using a custom user defined collector. | |
* | |
* @param size the number of results to wait for | |
* @param iterable the work represented as an iterable | |
* @param mapper the transformation of individual item U within the iterable to a result T |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class WorkExecutor { | |
private final MathService mathService; | |
public List<Integer> processExpensiveFunction(List<Integer> inputs, int firstK) { | |
try { | |
List<Integer> outputs = new ArrayList<>(); | |
//our basic executor | |
ExecutorService executor = Executors.newCachedThreadPool(); | |
ExecutorCompletionService<Integer> executorService = new ExecutorCompletionService<>(executor); |