Skip to content

Instantly share code, notes, and snippets.

public interface ConcurrentWorkExecutor {
/**
* Splits a work iterable and submits each task into an executor where the task input is obtained by
* applying a mapper function. The intermediate non null results are joined until a {@code size} number
* of results are obtained post which they are collected using a custom user defined collector.
*
* @param size the number of results to wait for
* @param iterable the work represented as an iterable
* @param mapper the transformation of individual item U within the iterable to a result T
public class WorkExecutor {
private final MathService mathService;
public List<Integer> processExpensiveFunction(List<Integer> inputs, int firstK) {
try {
List<Integer> outputs = new ArrayList<>();
//our basic executor
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService<Integer> executorService = new ExecutorCompletionService<>(executor);
#include <bits/stdc++.h>
enum direction {
N, S, W, E
};
//pre compute where robot will move when he moves left from a fixed direction
map<direction, direction> leftDir = {
{N, W}, {W, S}, {S, E}, {E, N}};
@isopropylcyanide
isopropylcyanide / generateRandomInRangeAroundNormal.py
Created February 21, 2021 07:34
Generate N random numbers in range(A, B) with an average mean of X
import numpy as np
from math import ceil, floor
"""
The idea is to assume a normal Gaussian distribution around the given
average for a given range. Random numbers will be generated on either
side of the normal.
"""
def randomCeilOrFloor(X):
# Find the partition that contains messages from this connector name. Usually by grepping
kafkacat -C -Z -b localhost -t <connect.offsets.topics> -o <number> | grep "servername"
# Set an null offset.
echo '["<connector-name",{"server":"<servername>"}]|' \|
kafkacat -P -Z -b localhost -t <connect.offsets.topics> -K \|
-p <partition>
@isopropylcyanide
isopropylcyanide / connector-snapshot-override-index.sql
Last active February 25, 2022 19:30
connector-snapshot-override-index.sql
-- myTable has over millions of rows
-- create index on a column first
create index idx_updated_at on `myDatabase.myTable`(updated_at);
-- now the column can be safely used as an override
-- "snapshot.select.statement.overrides.myDatabase.myTable":
-- "select * from `myDatabase.myTable` where updated_at > \"2021-02-01 18:00:00\"",
@isopropylcyanide
isopropylcyanide / connector-lostevent-kafka-reset-offset.sh
Last active September 3, 2021 17:23
connector-lostevent-kafka-reset-offset.sh
# Find the partition for the connector (server name is the key) that stores its latest offset. Let that be p
$ kafkacat -b localhost -C -t <connect.offsets.storage.topic> -f 'Partition(%p) %k \n %s\n'
# Sample output where the connector has stored offsets till 46242 and we've missed from 42000-45000
# Partition <p>
# ["my-db-connector",{"server":"server.key"}]
# {"ts_sec":,"file":"mysql-bin.000038","pos":46242, "row":1,"server_id":121,"event":3}
# Write a poison pill message: NULL to that partition. This essentially means to clear offsets
$ echo '["my-db-connector",{"server":"server.key"}]|' \|
@isopropylcyanide
isopropylcyanide / connector-lostevent-mysql-binlog-check.sh
Created February 5, 2021 20:05
connector-lostevent-mysql-binlog-check.sh
# Note the oldest binlog file position
mysql -u <> -h <> -e "show binary logs";
# Know the offset in the oldest binlog and its timestamp T
mysqlbinlog -vv /usr/local/var/mysql/mysql-bin.000038 --base64-output=DECODE-ROWS | less --base64-output=DECODE-ROWS
# If timestamp of lost event <= T, events have not rolled over (yet).
@isopropylcyanide
isopropylcyanide / connector-hostname-death-view-offset.json
Created February 5, 2021 19:59
connector-hostname-death-view-offset.json
-- Current binlog = my-database-bin.000004. In sync but the host has changed.
{"ts_sec":1610704790,"file":"my-database-bin.000004","pos":1932,"row":1,"server_id":2001186,"event":2}
-- Once you update the connector, it will take a snapshot and update to the earliest binlog
{"file":"new-host-bin.000001","pos":2000}
-- As it is processing the events, it will update the status. Notice the new server id
{"ts_sec":1610708790,"file":"new-host-bin.000001","pos":2001,"row":1,"server_id":1973403,"event":2}
...
@isopropylcyanide
isopropylcyanide / connector-hostname-death-view-topic.json
Created February 5, 2021 19:57
connector-hostname-death-view-topic.json
.....
{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException....","worker_id":"10.33.10.196:8083","generation":121}
{"state":"UNASSIGNED","trace":null,"worker_id":"10.33.10.196:8083","generation":121}
... Issue update
{"state":"RUNNING","trace":null,"worker_id":"10.33.10.196:8083","generation":121}