Skip to content

Instantly share code, notes, and snippets.

View ptrdom's full-sized avatar

Domantas Petrauskas ptrdom

View GitHub Profile
@marwei
marwei / how_to_reset_kafka_consumer_group_offset.md
Created November 9, 2017 23:39
How to Reset Kafka Consumer Group Offset

Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups command.

  1. List the topics to which the group is subscribed
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

Note the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.

  1. Reset the consumer offset for a topic (preview)

Explaining Miles's Magic

Miles Sabin recently opened a pull request fixing the infamous SI-2712. First off, this is remarkable and, if merged, will make everyone's life enormously easier. This is a bug that a lot of people hit often without even realizing it, and they just assume that either they did something wrong or the compiler is broken in some weird way. It is especially common for users of scalaz or cats.

But that's not what I wanted to write about. What I want to write about is the exact semantics of Miles's fix, because it does impose some very specific assumptions about the way that type constructors work, and understanding those assumptions is the key to getting the most of it his fix.

For starters, here is the sort of thing that SI-2712 affects:

def foo[F[_], A](fa: F[A]): String = fa.toString
@acdcjunior
acdcjunior / Dummies.java
Created January 23, 2016 23:36
Mockito: create a mock object that throws exception at any method call (so... a Dummy object, not a mock)
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Mockito.mock;
public class Dummies {
private static class NoMethodShouldBeCalledAnswer implements Answer<Object> {
private String className;
@rrodseth
rrodseth / UnfoldPages.scala
Last active June 14, 2021 18:36
Create akka-stream Source from a pagination, using Source.unfoldAsync
// Inspired by a tweet from @trautonen 1/13/2016
// Use Source.unfoldAsync to turn paginated database results into an akka-streams Source
// unfold is the inverse of fold
case class Page[T](pageNumber:Long, totalPages:Long, contents:List[T])
case class Thing(id: Long, name: String = "foo")
val totalPages = 5 //
val pageSize = 3