Skip to content

Instantly share code, notes, and snippets.

@patriknw
Created August 18, 2016 17:50
Show Gist options
  • Save patriknw/5f9eebb86410d281741b3f4734b96d34 to your computer and use it in GitHub Desktop.
Save patriknw/5f9eebb86410d281741b3f4734b96d34 to your computer and use it in GitHub Desktop.
Remove producer factory
---
core/src/main/scala/akka/kafka/javadsl/Producer.scala | 12 ++++++------
core/src/main/scala/akka/kafka/scaladsl/Producer.scala | 18 +++++++++---------
2 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/akka/kafka/javadsl/Producer.scala b/core/src/main/scala/akka/kafka/javadsl/Producer.scala
index 86005b7..6bae123 100644
--- a/core/src/main/scala/akka/kafka/javadsl/Producer.scala
+++ b/core/src/main/scala/akka/kafka/javadsl/Producer.scala
@@ -38,11 +38,11 @@ object Producer {
* partition number, and an optional key and value.
*/
def plainSink[K, V](
- producerProvider: () => KafkaProducer[K, V],
+ producer: KafkaProducer[K, V],
closeTimeout: FiniteDuration,
parallelism: Int
): Sink[ProducerRecord[K, V], CompletionStage[Done]] =
- scaladsl.Producer.plainSink(producerProvider, closeTimeout, parallelism)
+ scaladsl.Producer.plainSink(producer, closeTimeout, parallelism)
.mapMaterializedValue(_.toJava)
.asJava
@@ -67,11 +67,11 @@ object Producer {
* committing, so it is "at-least once delivery" semantics.
*/
def commitableSink[K, V](
- producerProvider: () => KafkaProducer[K, V],
+ producer: KafkaProducer[K, V],
closeTimeout: FiniteDuration,
parallelism: Int
): Sink[Message[K, V, ConsumerMessage.Committable], CompletionStage[Done]] =
- scaladsl.Producer.commitableSink(producerProvider, closeTimeout, parallelism)
+ scaladsl.Producer.commitableSink(producer, closeTimeout, parallelism)
.mapMaterializedValue(_.toJava)
.asJava
@@ -89,9 +89,9 @@ object Producer {
* be committed later in the flow.
*/
def flow[K, V, PassThrough](
- producerProvider: () => KafkaProducer[K, V],
+ producer: KafkaProducer[K, V],
closeTimeout: FiniteDuration,
parallelism: Int
): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] =
- scaladsl.Producer.flow(producerProvider, closeTimeout, parallelism).asJava
+ scaladsl.Producer.flow(producer, closeTimeout, parallelism).asJava
}
diff --git a/core/src/main/scala/akka/kafka/scaladsl/Producer.scala b/core/src/main/scala/akka/kafka/scaladsl/Producer.scala
index 24b6f41..10167f7 100644
--- a/core/src/main/scala/akka/kafka/scaladsl/Producer.scala
+++ b/core/src/main/scala/akka/kafka/scaladsl/Producer.scala
@@ -25,7 +25,7 @@ object Producer {
* partition number, and an optional key and value.
*/
def plainSink[K, V](settings: ProducerSettings[K, V]): Sink[ProducerRecord[K, V], Future[Done]] =
- plainSink(() => settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism)
+ plainSink(settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism)
/**
* The `plainSink` can be used for publishing records to Kafka topics.
@@ -33,12 +33,12 @@ object Producer {
* partition number, and an optional key and value.
*/
def plainSink[K, V](
- producerProvider: () => KafkaProducer[K, V],
+ producer: KafkaProducer[K, V],
closeTimeout: FiniteDuration,
parallelism: Int
): Sink[ProducerRecord[K, V], Future[Done]] =
Flow[ProducerRecord[K, V]].map(record => Message(record, NotUsed))
- .via(flow(producerProvider, closeTimeout, parallelism))
+ .via(flow(producer, closeTimeout, parallelism))
.toMat(Sink.ignore)(Keep.right)
/**
@@ -50,7 +50,7 @@ object Producer {
* committing, so it is "at-least once delivery" semantics.
*/
def commitableSink[K, V](settings: ProducerSettings[K, V]): Sink[Message[K, V, ConsumerMessage.Committable], Future[Done]] =
- commitableSink(() => settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism)
+ commitableSink(settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism)
/**
* Sink that is aware of the [[ConsumerMessage#CommittableOffset committable offset]]
@@ -61,11 +61,11 @@ object Producer {
* committing, so it is "at-least once delivery" semantics.
*/
def commitableSink[K, V](
- producerProvider: () => KafkaProducer[K, V],
+ producer: KafkaProducer[K, V],
closeTimeout: FiniteDuration,
parallelism: Int
): Sink[Message[K, V, ConsumerMessage.Committable], Future[Done]] =
- flow[K, V, ConsumerMessage.Committable](producerProvider, closeTimeout, parallelism)
+ flow[K, V, ConsumerMessage.Committable](producer, closeTimeout, parallelism)
.mapAsync(parallelism)(_.message.passThrough.commitScaladsl())
.toMat(Sink.ignore)(Keep.right)
@@ -75,7 +75,7 @@ object Producer {
* be committed later in the flow.
*/
def flow[K, V, PassThrough](settings: ProducerSettings[K, V]): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = {
- flow(() => settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism)
+ flow(settings.createKafkaProducer(), settings.closeTimeout, settings.parallelism)
}
/**
@@ -84,13 +84,13 @@ object Producer {
* be committed later in the flow.
*/
def flow[K, V, PassThrough](
- producerProvider: () => KafkaProducer[K, V],
+ producer: KafkaProducer[K, V],
closeTimeout: FiniteDuration,
parallelism: Int
): Flow[Message[K, V, PassThrough], Result[K, V, PassThrough], NotUsed] = {
Flow.fromGraph(new ProducerStage[K, V, PassThrough](
closeTimeout = closeTimeout,
- producerProvider = producerProvider
+ producerProvider = () => producer
)).mapAsync(parallelism)(identity)
}
--
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment