Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@ogibayashi
Created July 8, 2016 10:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ogibayashi/e5e72e7d47046cbf46cdc897e491a135 to your computer and use it in GitHub Desktop.
Save ogibayashi/e5e72e7d47046cbf46cdc897e491a135 to your computer and use it in GitHub Desktop.
Using ReentrantLock in FlinkKafkaConsumer09
@@ -114,6 +116,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	/** If the consumer doesn't have a Kafka partition assigned at runtime, it'll block on this waitThread **/
 	private transient Thread waitThread;
 
+        private final ReentrantLock lock = new ReentrantLock(true);
 
 	// ------------------------------------------------------------------------
 
@@ -388,9 +391,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 	@Override
 	protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
 		Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets);
-		synchronized (this.consumer) {
+                lock.lock();
+		try {
 			this.consumer.commitSync(kafkaCheckpointOffsets);
-		}
+		} finally {
+                    lock.unlock();
+                }
 	}
 
 	public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
@@ -444,17 +450,19 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 				pollLoop: while (running) {
 					ConsumerRecords<byte[], byte[]> records;
 					//noinspection SynchronizeOnNonFinalField
-					synchronized (flinkKafkaConsumer.consumer) {
-						try {
-							records = flinkKafkaConsumer.consumer.poll(pollTimeout);
-						} catch (WakeupException we) {
-							if (running) {
-								throw we;
-							}
-							// leave loop
-							continue;
-						}
-					}
+                                        flinkKafkaConsumer.lock.lock();
+                                        try {
+                                            records = flinkKafkaConsumer.consumer.poll(pollTimeout);
+                                        } catch (WakeupException we) {
+                                            if (running) {
+                                                throw we;
+                                            }
+                                            // leave loop
+                                            continue;
+                                        } finally {
+                                            flinkKafkaConsumer.lock.unlock();
+                                        }
+                                        
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment