@@ -114,6 +116,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
/** If the consumer doesn't have a Kafka partition assigned at runtime, it'll block on this waitThread **/
private transient Thread waitThread;
+ private final ReentrantLock lock = new ReentrantLock(true);
// ------------------------------------------------------------------------
@@ -388,9 +391,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
@Override
protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets);
- synchronized (this.consumer) {
+ lock.lock();
+ try {
this.consumer.commitSync(kafkaCheckpointOffsets);
- }
+ } finally {
+ lock.unlock();
+ }
}
public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
@@ -444,17 +450,19 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
pollLoop: while (running) {
ConsumerRecords<byte[], byte[]> records;
//noinspection SynchronizeOnNonFinalField
- synchronized (flinkKafkaConsumer.consumer) {
- try {
- records = flinkKafkaConsumer.consumer.poll(pollTimeout);
- } catch (WakeupException we) {
- if (running) {
- throw we;
- }
- // leave loop
- continue;
- }
- }
+ flinkKafkaConsumer.lock.lock();
+ try {
+ records = flinkKafkaConsumer.consumer.poll(pollTimeout);
+ } catch (WakeupException we) {
+ if (running) {
+ throw we;
+ }
+ // leave loop
+ continue;
+ } finally {
+ flinkKafkaConsumer.lock.unlock();
+ }
+
Created
July 8, 2016 10:08
-
-
Save ogibayashi/e5e72e7d47046cbf46cdc897e491a135 to your computer and use it in GitHub Desktop.
Using ReentrantLock in FlinkKafkaConsumer09
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment