Skip to content

Instantly share code, notes, and snippets.

@isopropylcyanide
isopropylcyanide / connector-hostname-death-view-topic.json
Created February 5, 2021 19:57
connector-hostname-death-view-topic.json
.....
{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException....","worker_id":"10.33.10.196:8083","generation":121}
{"state":"UNASSIGNED","trace":null,"worker_id":"10.33.10.196:8083","generation":121}
... Issue update
{"state":"RUNNING","trace":null,"worker_id":"10.33.10.196:8083","generation":121}
@isopropylcyanide
isopropylcyanide / connector-hostname-death-view-offset.json
Created February 5, 2021 19:59
connector-hostname-death-view-offset.json
-- Current binlog = my-database-bin.000004. In sync but the host has changed.
{"ts_sec":1610704790,"file":"my-database-bin.000004","pos":1932,"row":1,"server_id":2001186,"event":2}
-- Once you update the connector, it will take a snapshot and update to the earliest binlog
{"file":"new-host-bin.000001","pos":2000}
-- As it is processing the events, it will update the status. Notice the new server id
{"ts_sec":1610708790,"file":"new-host-bin.000001","pos":2001,"row":1,"server_id":1973403,"event":2}
...
@isopropylcyanide
isopropylcyanide / connector-lostevent-mysql-binlog-check.sh
Created February 5, 2021 20:05
connector-lostevent-mysql-binlog-check.sh
# Note the oldest binlog file position
mysql -u <> -h <> -e "show binary logs";
# Know the offset in the oldest binlog and its timestamp T
mysqlbinlog -vv /usr/local/var/mysql/mysql-bin.000038 --base64-output=DECODE-ROWS | less --base64-output=DECODE-ROWS
# If timestamp of lost event <= T, events have not rolled over (yet).
@isopropylcyanide
isopropylcyanide / generateRandomInRangeAroundNormal.py
Created February 21, 2021 07:34
Generate N random numbers in range(A, B) with an average mean of X
import numpy as np
from math import ceil, floor
"""
The idea is to assume a normal Gaussian distribution around the given
average for a given range. Random numbers will be generated on either
side of the normal.
"""
def randomCeilOrFloor(X):
#include <bits/stdc++.h>
enum direction {
N, S, W, E
};
//pre compute where robot will move when he moves left from a fixed direction
map<direction, direction> leftDir = {
{N, W}, {W, S}, {S, E}, {E, N}};
public class WorkExecutor {
private final MathService mathService;
public List<Integer> processExpensiveFunction(List<Integer> inputs, int firstK) {
try {
List<Integer> outputs = new ArrayList<>();
//our basic executor
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService<Integer> executorService = new ExecutorCompletionService<>(executor);
public interface ConcurrentWorkExecutor {
/**
* Splits a work iterable and submits each task into an executor where the task input is obtained by
* applying a mapper function. The intermediate non null results are joined until a {@code size} number
* of results are obtained post which they are collected using a custom user defined collector.
*
* @param size the number of results to wait for
* @param iterable the work represented as an iterable
* @param mapper the transformation of individual item U within the iterable to a result T
public List<Integer> processExpensiveFunctionElegant(List<Integer> inputs, int firstK) {
try {
return concurrentWorkExecutor.splitJoin(firstK,
inputs,
mathService::process,
Collectors.toList());
} catch (Exception ex) {
log.error("Error processing expensive function ", ex);
return null;
public List<PaymentMethods> getTopKPaymentMethodDetails(List<Integer> paymentIds, int k) {
try {
return concurrentWorkExecutor.splitJoin(k,
paymentIds,
paymentService::fetch,
Collectors.toList());
} catch (Exception ex) {
log.error("Error processing top K payment methods", ex);
return null;
public List<Integer> processExpensiveFunctionElegant(List<Integer> inputs, int firstK) {
try {
return concurrentWorkExecutor.splitJoin(firstK,
inputs,
mathService::process,
Collectors.toList());
} catch (Exception ex) {
log.error("Error processing expensive function", ex);
return null;