Skip to content

Instantly share code, notes, and snippets.

@huseinzol05
Created May 10, 2022 04:25
Show Gist options
  • Save huseinzol05/883c894e258c827c231f18fe04d37cc7 to your computer and use it in GitHub Desktop.
Save huseinzol05/883c894e258c827c231f18fe04d37cc7 to your computer and use it in GitHub Desktop.
Reset offsets for Kafka MirrorMaker2 using Kowl API
import requests
from datetime import datetime, timedelta, date
import json
td = int((datetime.today() - timedelta(days = 2)).timestamp() * 1000)
r = requests.get('http://localhost:8080/api/consumer-groups').json()
for g in r['consumerGroups']:
group_id = g['groupId']
if 'connect-' in group_id:
continue
for t in g['topicOffsets']:
topic = t['topic']
ts_partitions = requests.get(f'http://localhost:8080/api/topics-offsets?topicNames={topic}&timestamp={td}').json()
ts_partitions = {v['partitionId']: v['offset'] for v in ts_partitions['topicOffsets'][0]['partitions']}
neg_lags = []
for p in t['partitionOffsets']:
if p['groupOffset'] > p['highWaterMark']:
o = ts_partitions[p['partitionId']]
if o == -1:
o = p['highWaterMark']
neg_lags.append({'partitionId': p['partitionId'], 'offset': o})
if len(neg_lags):
print(ts_partitions, t['partitionOffsets'])
patch = {'groupId': group_id,
'topics': [{
'topicName': topic,
'partitions': neg_lags
}]}
print(patch)
requests.patch(f'http://localhost:8080/api/consumer-groups/{group_id}',
headers = {'Content-Type': 'application/json'},
data = json.dumps(patch)).json()
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment