Given a JSON file with a structure like this:
{
"topic": {
"cleanup_policy": "delete",
"partitions": [
{
"consumer_groups": [
{
"group_name": "activity-feed-service",
"offset": 26764
},
{
"group_name": "adjustment-report-service",
"offset": 26764
}
],
"earliest_offset": 0,
"isr": 3,
"latest_offset": 26764,
"partition": 0,
"size": 10578495
},
{
"consumer_groups": [
{
"group_name": "activity-feed-service",
"offset": 26329
},
{
"group_name": "adjustment-report-service",
"offset": 26329
}
],
"earliest_offset": 0,
"isr": 3,
"latest_offset": 26329,
"partition": 5,
"size": 10418206
}
],
"replication": 3,
"retention_bytes": -1,
"retention_hours": -1,
"state": "ACTIVE",
"tags": [],
"topic_name": "adjustments"
}
}
we would want to receive latest offset per partition. We can achieve that via (Windows CMD syntax)!
type adjustments.json | jq ".topic.partitions[] | \"\(.partition) \(.latest_offset)\"" | sort > latest-offset-per-partition.txt
Now, we'd want to produce list of latest offset per a given consumer group:
type adjustments.json | jq ".topic.partitions[] | {name: .consumer_groups[].group_name, partition, latest_offset} | select(.name == \"adjustment-report-service\") | \"\(.partition) \(.latest_offset)\"" | sort > adjustment-offsets.txt
Even better would be to combine it within a single command:
type adjustments.json | jq ".topic.partitions[] | .consumer_groups[] + del(.consumer_groups) | select(.group_name == \"adjustment-report-service\")"
or, if we'd just want to check which of the consumers lag behind:
type adjustments.json | jq ".topic.partitions[] | .consumer_groups[] + del(.consumer_groups) | select(.latest_offset > .offset) | { group_name, partition, lag: (.latest_offset - .offset) }"