Created
August 18, 2016 17:50
-
-
Save patriknw/5f9eebb86410d281741b3f4734b96d34 to your computer and use it in GitHub Desktop.
Remove producer factory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
core/src/main/scala/akka/kafka/javadsl/Producer.scala | 12 ++++++------ | |
core/src/main/scala/akka/kafka/scaladsl/Producer.scala | 18 +++++++++--------- | |
2 files changed, 15 insertions(+), 15 deletions(-) | |
diff --git a/core/src/main/scala/akka/kafka/javadsl/Producer.scala b/core/src/main/scala/akka/kafka/javadsl/Producer.scala | |
index 86005b7..6bae123 100644 | |
--- a/core/src/main/scala/akka/kafka/javadsl/Producer.scala | |
+++ b/core/src/main/scala/akka/kafka/javadsl/Producer.scala | |
@@ -38,11 +38,11 @@ object Producer { | |
* partition number, and an optional key and value. | |
*/ | |
def plainSink[K, V]( | |
- producerProvider: () => KafkaProducer[K, V], | |
+ producer: KafkaProducer[K, V], | |
closeTimeout: FiniteDuration, | |
parallelism: Int | |
): Sink[ProducerRecord[K, V], CompletionStage[Done]] = | |
- scaladsl.Producer.plainSink(producerProvider, closeTimeout, parallelism) | |
+ scaladsl.Producer.plainSink(producer, closeTimeout, parallelism) | |
.mapMaterializedValue(_.toJava) | |
.asJava | |
@@ -67,11 +67,11 @@ object Producer { | |
* committing, so it is "at-least once delivery" semantics. | |
*/ | |
def commitableSink[K, V]( | |
- producerProvider: () => KafkaProducer[K, V], | |
+ producer: KafkaProducer[K, V], | |
closeTimeout: FiniteDuration, | |
parallelism: Int | |
): Sink[Message[K, V, ConsumerMessage.Committable], CompletionStage[Done]] = | |
- scaladsl.Producer.commitableSink(producerProvider, closeTimeout, parallelism) | |
+ scaladsl.Producer.commitableSink(producer, closeTimeout, parallelism) | |
.mapMaterializedValue(_.toJava) | |
.asJava | |
@@ -89,9 +89,9 @@ object Producer { | |
* be committed later in the flow. | |
*/ | |
def flow[K, V, PassThrough]( | |
- producerProvider: () => KafkaProducer[K, V], | |
+ producer: KafkaProducer[K, V], | |
closeTimeout: FiniteDuration, | |
parallelism: Int | |
): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = | |
- scaladsl.Producer.flow(producerProvider, closeTimeout, parallelism).asJava | |
+ scaladsl.Producer.flow(producer, closeTimeout, parallelism).asJava | |
} | |
diff --git a/core/src/main/scala/akka/kafka/scaladsl/Producer.scala b/core/src/main/scala/akka/kafka/scaladsl/Producer.scala | |
index 24b6f41..10167f7 100644 | |
--- a/core/src/main/scala/akka/kafka/scaladsl/Producer.scala | |
+++ b/core/src/main/scala/akka/kafka/scaladsl/Producer.scala | |
@@ -25,7 +25,7 @@ object Producer { | |
* partition number, and an optional key and value. | |
*/ | |
def plainSink[K, V](settings: ProducerSettings[K, V]): Sink[ProducerRecord[K, V], Future[Done]] = | |
- plainSink(() => settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism) | |
+ plainSink(settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism) | |
/** | |
* The `plainSink` can be used for publishing records to Kafka topics. | |
@@ -33,12 +33,12 @@ object Producer { | |
* partition number, and an optional key and value. | |
*/ | |
def plainSink[K, V]( | |
- producerProvider: () => KafkaProducer[K, V], | |
+ producer: KafkaProducer[K, V], | |
closeTimeout: FiniteDuration, | |
parallelism: Int | |
): Sink[ProducerRecord[K, V], Future[Done]] = | |
Flow[ProducerRecord[K, V]].map(record => Message(record, NotUsed)) | |
- .via(flow(producerProvider, closeTimeout, parallelism)) | |
+ .via(flow(producer, closeTimeout, parallelism)) | |
.toMat(Sink.ignore)(Keep.right) | |
/** | |
@@ -50,7 +50,7 @@ object Producer { | |
* committing, so it is "at-least once delivery" semantics. | |
*/ | |
def commitableSink[K, V](settings: ProducerSettings[K, V]): Sink[Message[K, V, ConsumerMessage.Committable], Future[Done]] = | |
- commitableSink(() => settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism) | |
+ commitableSink(settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism) | |
/** | |
* Sink that is aware of the [[ConsumerMessage#CommittableOffset committable offset]] | |
@@ -61,11 +61,11 @@ object Producer { | |
* committing, so it is "at-least once delivery" semantics. | |
*/ | |
def commitableSink[K, V]( | |
- producerProvider: () => KafkaProducer[K, V], | |
+ producer: KafkaProducer[K, V], | |
closeTimeout: FiniteDuration, | |
parallelism: Int | |
): Sink[Message[K, V, ConsumerMessage.Committable], Future[Done]] = | |
- flow[K, V, ConsumerMessage.Committable](producerProvider, closeTimeout, parallelism) | |
+ flow[K, V, ConsumerMessage.Committable](producer, closeTimeout, parallelism) | |
.mapAsync(parallelism)(_.message.passThrough.commitScaladsl()) | |
.toMat(Sink.ignore)(Keep.right) | |
@@ -75,7 +75,7 @@ object Producer { | |
* be committed later in the flow. | |
*/ | |
def flow[K, V, PassThrough](settings: ProducerSettings[K, V]): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = { | |
- flow(() => settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism) | |
+ flow(settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism) | |
} | |
/** | |
@@ -84,13 +84,13 @@ object Producer { | |
* be committed later in the flow. | |
*/ | |
def flow[K, V, PassThrough]( | |
- producerProvider: () => KafkaProducer[K, V], | |
+ producer: KafkaProducer[K, V], | |
closeTimeout: FiniteDuration, | |
parallelism: Int | |
): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = { | |
Flow.fromGraph(new ProducerStage[K, V, PassThrough]( | |
closeTimeout = closeTimeout, | |
- producerProvider = producerProvider | |
+ producerProvider = () => producer | |
)).mapAsync(parallelism)(identity) | |
} | |
-- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment