Skip to content

Instantly share code, notes, and snippets.

@dluc
Last active May 2, 2019 01:35
Show Gist options
  • Save dluc/b72010e8eb622fbb96239b3418ba1f07 to your computer and use it in GitHub Desktop.
Save dluc/b72010e8eb622fbb96239b3418ba1f07 to your computer and use it in GitHub Desktop.
:100644 100644 e163037... 0000000... M build.sbt
:100644 100644 97381e7... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala
:100644 100644 7d3fc76... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala
:100644 100644 997a9c9... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala
:100644 100644 db3d580... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala
:100644 100644 96bf8a3... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala
:100644 100644 d17aed3... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala
:100644 100644 91bfb71... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala
:100644 100644 c954630... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala
:100644 100644 8e54e26... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala
:100644 100644 f12c1f8... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala
:100644 100644 a056d62... 0000000... M src/test/scala/api/API.scala
:100644 100644 57b70d7... 0000000... M src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala
diff --git a/build.sbt b/build.sbt
index e163037..5430880 100644
--- a/build.sbt
+++ b/build.sbt
@@ -19,6 +19,7 @@ libraryDependencies <++= (scalaVersion) {
val datastaxDriverVersion = "3.1.1"
val json4sVersion = "3.5.0"
val akkaStreamVersion = "2.4.16"
+ val mockitoVersion = "1.10.19"
Seq(
// Library dependencies
@@ -32,7 +33,8 @@ libraryDependencies <++= (scalaVersion) {
// Tests dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
- "com.microsoft.azure.iothub-java-client" % "iothub-java-device-client" % iothubDeviceClientVersion % "test"
+ "com.microsoft.azure.iothub-java-client" % "iothub-java-device-client" % iothubDeviceClientVersion % "test",
+ "org.mockito" % "mockito-all" % mockitoVersion % "test"
)
}
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala
index 97381e7..3b1326b 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala
@@ -13,6 +13,7 @@ private[iothubreact] object CheckpointActorSystem {
implicit private[this] val actorSystem = ActorSystem("IoTHubReact")
implicit private[this] val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem))
+ implicit private[this] val cpconfig = new CPConfiguration
var localRegistry: Map[String, ActorRef] = Map[String, ActorRef]()
/** Create an actor to read/write offset checkpoints from the storage
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala
index 7d3fc76..b77864a 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala
@@ -31,7 +31,7 @@ private[iothubreact] object CheckpointService {
*
* @param partition IoT hub partition number [0..N]
*/
-private[iothubreact] class CheckpointService(partition: Int)
+private[iothubreact] class CheckpointService(partition: Int)(implicit config: ICPConfiguration)
extends Actor
with Stash
with Logger {
@@ -97,8 +97,9 @@ private[iothubreact] class CheckpointService(partition: Int)
var offsetToStore: String = ""
val now = Instant.now.getEpochSecond
- val timeThreshold = Configuration.checkpointTimeThreshold.toSeconds
- val countThreshold = Configuration.checkpointCountThreshold
+
+ val timeThreshold = config.checkpointTimeThreshold.toSeconds
+ val countThreshold = config.checkpointCountThreshold
// Check if the queue contains old offsets to flush (time threshold)
// Check if the queue contains data of too many messages (count threshold)
@@ -111,7 +112,7 @@ private[iothubreact] class CheckpointService(partition: Int)
}
if (offsetToStore == "") {
- log.debug(s"Checkpoint skipped: partition=${partition}, count ${queuedOffsets} < threshold ${Configuration.checkpointCountThreshold}")
+ log.debug(s"Checkpoint skipped: partition=${partition}, count ${queuedOffsets} < threshold ${config.checkpointCountThreshold}")
} else {
log.info(s"Writing checkpoint: partition=${partition}, storing ${offsetToStore} (current offset=${currentOffset})")
storage.writeOffset(partition, offsetToStore)
@@ -140,7 +141,7 @@ private[iothubreact] class CheckpointService(partition: Int)
def updateOffsetAction(offset: String) = {
if (!schedulerStarted) {
- val time = Configuration.checkpointFrequency
+ val time = config.checkpointFrequency
schedulerStarted = true
context.system.scheduler.schedule(time, time, self, StoreOffset)
log.info(s"Scheduled checkpoint for partition ${partition} every ${time.toMillis} ms")
@@ -167,7 +168,7 @@ private[iothubreact] class CheckpointService(partition: Int)
// TODO: Support plugins
def getCheckpointBackend: CheckpointBackend = {
- val conf = Configuration.checkpointBackendType
+ val conf = config.checkpointBackendType
conf.toUpperCase match {
case "AZUREBLOB" ⇒ new AzureBlob
case "CASSANDRA" ⇒ new CassandraTable
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala
index 997a9c9..d137fd5 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala
@@ -4,22 +4,38 @@ package com.microsoft.azure.iot.iothubreact.checkpointing
import java.util.concurrent.TimeUnit
+import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Auth
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration._
import scala.language.postfixOps
+import scala.util.Try
+
+trait ICPConfiguration {
+ val isEnabled : Boolean
+ val storageNamespace : String
+ val checkpointBackendType : String
+ val checkpointFrequency : FiniteDuration
+ val checkpointRWTimeout : FiniteDuration
+ val checkpointCountThreshold : Int
+ val checkpointTimeThreshold : FiniteDuration
+ val azureBlobEmulator : Boolean
+ val azureBlobConnectionString : String
+ val azureBlobLeaseDuration : FiniteDuration
+ val cassandraCluster : String
+ val cassandraReplicationFactor: Int
+ val cassandraAuth : Option[Auth]
+}
/** Hold IoT Hub stream checkpointing configuration settings
*/
-private[iothubreact] object Configuration {
+private[iothubreact] case class CPConfiguration(implicit conf: Config = ConfigFactory.load) extends ICPConfiguration {
// TODO: Allow to use multiple configurations, e.g. while processing multiple streams
// a client will need a dedicated checkpoint container for each stream
private[this] val confPath = "iothub-react.checkpointing."
- private[this] val conf: Config = ConfigFactory.load()
-
// Default time between checkpoint writes to the storage
private[this] val DefaultFrequency = 1 second
@@ -57,7 +73,10 @@ private[iothubreact] object Configuration {
private[this] val MaxTimeThreshold = 1 hour
// Default name of the container used to store checkpoint data
- private[this] val DefaultContainer = "iothub-react-checkpoints"
+ private[this] lazy val DefaultContainer = checkpointBackendType.toUpperCase match {
+ case "CASSANDRA" ⇒ "iothub_react_checkpoints"
+ case _ ⇒ "iothub-react-checkpoints"
+ }
// Whether checkpointing is enabled or not
lazy val isEnabled: Boolean = conf.getBoolean(confPath + "enabled")
@@ -70,11 +89,11 @@ private[iothubreact] object Configuration {
MaxFrequency)
// How many messages to replay after a restart, for each IoT hub partition
- lazy val checkpointCountThreshold = Math.max(1, conf.getInt(confPath + "countThreshold"))
+ lazy val checkpointCountThreshold: Int = Math.max(1, conf.getInt(confPath + "countThreshold"))
// Store a position if its value is older than this amount of time, rounded to seconds
// Min: 1 second, Max: 1 hour
- lazy val checkpointTimeThreshold = getDuration(
+ lazy val checkpointTimeThreshold: FiniteDuration = getDuration(
confPath + "timeThreshold",
DefaultTimeThreshold,
MinTimeThreshold,
@@ -107,8 +126,14 @@ private[iothubreact] object Configuration {
60 seconds)
// Cassandra cluster address
- lazy val cassandraCluster : String = conf.getString(confPath + "storage.cassandra.cluster")
- lazy val cassandraReplicationFactor: Int = conf.getInt(confPath + "storage.cassandra.replicationFactor")
+ lazy val cassandraCluster : String = conf.getString(confPath + "storage.cassandra.cluster")
+ lazy val cassandraReplicationFactor: Int = conf.getInt(confPath + "storage.cassandra.replicationFactor")
+ lazy val cassandraAuth : Option[Auth] = (for {
+ u <- Try(conf.getString(confPath + "storage.cassandra.username"))
+ p <- Try(conf.getString(confPath + "storage.cassandra.password"))
+ } yield {
+ Auth(u, p)
+ }).toOption
/** Load Azure blob connection string, taking care of the Azure storage emulator case
*
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala
index db3d580..82a2958 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala
@@ -6,7 +6,7 @@ import java.io.IOException
import java.net.URISyntaxException
import java.util.UUID
-import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition
import com.microsoft.azure.iot.iothubreact.{Logger, Retry}
import com.microsoft.azure.storage.blob.CloudBlockBlob
@@ -17,13 +17,13 @@ import scala.language.{implicitConversions, postfixOps}
/** Storage logic to write checkpoints to Azure blobs
*/
-private[iothubreact] class AzureBlob extends CheckpointBackend with Logger {
+private[iothubreact] class AzureBlob(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {
// Set the account to point either to Azure or the emulator
- val account: CloudStorageAccount = if (Configuration.azureBlobEmulator)
- CloudStorageAccount.getDevelopmentStorageAccount()
- else
- CloudStorageAccount.parse(Configuration.azureBlobConnectionString)
+ val account: CloudStorageAccount = if (config.azureBlobEmulator)
+ CloudStorageAccount.getDevelopmentStorageAccount()
+ else
+ CloudStorageAccount.parse(config.azureBlobConnectionString)
val client = account.createCloudBlobClient()
@@ -121,7 +121,7 @@ private[iothubreact] class AzureBlob extends CheckpointBackend with Logger {
// Note: the lease ID must be a Guid otherwise the service returs 400
var leaseId = UUID.randomUUID().toString
try {
- file.acquireLease(Configuration.azureBlobLeaseDuration.toSeconds.toInt, leaseId)
+ file.acquireLease(config.azureBlobLeaseDuration.toSeconds.toInt, leaseId)
} catch {
case e: StorageException ⇒ {
@@ -146,9 +146,9 @@ private[iothubreact] class AzureBlob extends CheckpointBackend with Logger {
// The access condition depends on the file existing
val accessCondition = if (leaseId == "")
- AccessCondition.generateEmptyCondition()
- else
- AccessCondition.generateLeaseCondition(leaseId)
+ AccessCondition.generateEmptyCondition()
+ else
+ AccessCondition.generateLeaseCondition(leaseId)
try {
file.uploadText(content, "UTF-8", accessCondition, null, new OperationContext)
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala
index 96bf8a3..1d41c66 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala
@@ -3,7 +3,7 @@
package com.microsoft.azure.iot.iothubreact.checkpointing.backends
import com.microsoft.azure.iot.iothubreact.Logger
-import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Connection
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.{CheckpointRecord, CheckpointsTableSchema}
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition
@@ -11,10 +11,10 @@ import org.json4s.JsonAST
/** Storage logic to write checkpoints to a Cassandra table
*/
-private[iothubreact] class CassandraTable extends CheckpointBackend with Logger {
+private[iothubreact] class CassandraTable(implicit config: ICPConfiguration) extends CheckpointBackend with Logger {
val schema = new CheckpointsTableSchema(checkpointNamespace, "checkpoints")
- val connection = Connection(Configuration.cassandraCluster, Configuration.cassandraReplicationFactor, schema)
+ val connection = Connection(config.cassandraCluster, config.cassandraReplicationFactor, config.cassandraAuth, schema)
val table = connection.getTable[CheckpointRecord]()
connection.createKeyspaceIfNotExists()
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala
index d17aed3..f3b5dce 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala
@@ -2,11 +2,11 @@
package com.microsoft.azure.iot.iothubreact.checkpointing.backends
-import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration
trait CheckpointBackend {
- def checkpointNamespace: String = Configuration.storageNamespace
+ def checkpointNamespace(implicit config: ICPConfiguration): String = config.storageNamespace
/** Read the offset of the last record processed for the given partition
*
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala
index 91bfb71..c99a13a 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala
@@ -13,11 +13,18 @@ import com.datastax.driver.core.Cluster
private[iothubreact] case class Connection(
contactPoint: String,
replicationFactor: Int,
+ auth: Option[Auth],
table: TableSchema) {
- private lazy val hostPort = extractHostPort()
- private lazy val cluster = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2).build()
- implicit lazy val session = cluster.connect()
+ private lazy val hostPort = extractHostPort()
+ private lazy val cluster = {
+ val builder = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2)
+ auth map {
+ creds ⇒ builder.withCredentials(creds.username, creds.password)
+ } getOrElse (builder) build()
+ }
+
+ implicit lazy val session = cluster.connect()
/** Create the key space if not present
*/
@@ -51,9 +58,9 @@ private[iothubreact] case class Connection(
val tokens = contactPoint.split(":")
val addr = tokens(0)
val port = if (tokens.size == 2)
- tokens(1).toInt
- else
- 9042
+ tokens(1).toInt
+ else
+ 9042
(addr, port)
}
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala
index c954630..89c0076 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala
@@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage
import akka.stream.javadsl.{Sink, Source ⇒ JavaSource}
import akka.{Done, NotUsed}
import com.microsoft.azure.iot.iothubreact._
+import com.microsoft.azure.iot.iothubreact.checkpointing.{CPConfiguration, ICPConfiguration}
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHub ⇒ IoTHubScalaDSL, OffsetList ⇒ OffsetListScalaDSL, PartitionList ⇒ PartitionListScalaDSL}
import com.microsoft.azure.iot.iothubreact.sinks.{DevicePropertiesSink, MessageToDeviceSink, MethodOnDeviceSink}
@@ -19,6 +20,8 @@ class IoTHub() {
private lazy val iotHub = new IoTHubScalaDSL()
+ private implicit val cpconfig: ICPConfiguration = new CPConfiguration
+
/** Stop the stream
*/
def close(): Unit = {
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala
index 8e54e26..4371c39 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala
@@ -8,7 +8,7 @@ import akka.stream._
import akka.stream.scaladsl._
import akka.{Done, NotUsed}
import com.microsoft.azure.iot.iothubreact._
-import com.microsoft.azure.iot.iothubreact.checkpointing.{Configuration ⇒ CPConfiguration}
+import com.microsoft.azure.iot.iothubreact.checkpointing.{CPConfiguration, ICPConfiguration}
import com.microsoft.azure.iot.iothubreact.sinks.{DevicePropertiesSink, MessageToDeviceSink, MethodOnDeviceSink}
import scala.concurrent.Future
@@ -16,7 +16,7 @@ import scala.language.postfixOps
/** Provides a streaming source to retrieve messages from Azure IoT Hub
*/
-case class IoTHub() extends Logger {
+case class IoTHub(implicit cpconfig: ICPConfiguration = new CPConfiguration) extends Logger {
// TODO: Provide ClearCheckpoints() method to clear the state
@@ -137,7 +137,7 @@ case class IoTHub() extends Logger {
withTimeOffset = false,
partitions = allPartitions,
offsets = fromStart,
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Stream returning all the messages from all the configured partitions.
@@ -154,7 +154,7 @@ case class IoTHub() extends Logger {
withTimeOffset = false,
partitions = Some(partitions),
offsets = fromStart,
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Stream returning all the messages starting from the given offset, from all
@@ -201,7 +201,7 @@ case class IoTHub() extends Logger {
withTimeOffset = true,
partitions = allPartitions,
startTime = startTime,
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Stream returning all the messages starting from the given time, from all
@@ -218,7 +218,7 @@ case class IoTHub() extends Logger {
withTimeOffset = true,
partitions = Some(partitions),
startTime = startTime,
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Stream returning all the messages starting from the given offset, from all
@@ -234,7 +234,7 @@ case class IoTHub() extends Logger {
withTimeOffset = false,
partitions = allPartitions,
offsets = Some(offsets),
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Stream returning all the messages starting from the given offset, from all
@@ -251,7 +251,7 @@ case class IoTHub() extends Logger {
withTimeOffset = false,
partitions = Some(partitions),
offsets = Some(offsets),
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Stream returning all the messages, from the given starting point, optionally with
@@ -280,9 +280,9 @@ case class IoTHub() extends Logger {
for (partition ← partitions.get.values) {
val graph = if (withTimeOffset)
- IoTHubPartition(partition).source(startTime, withCheckpoints).via(streamManager)
- else
- IoTHubPartition(partition).source(offsets.get.values(partition), withCheckpoints).via(streamManager)
+ IoTHubPartition(partition).source(startTime, withCheckpoints).via(streamManager)
+ else
+ IoTHubPartition(partition).source(offsets.get.values(partition), withCheckpoints).via(streamManager)
val source = Source.fromGraph(graph).async
source ~> merge
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala
index f12c1f8..46d56e7 100644
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala
@@ -11,7 +11,7 @@ import akka.util.Timeout
import com.microsoft.azure.eventhubs.PartitionReceiver
import com.microsoft.azure.iot.iothubreact._
import com.microsoft.azure.iot.iothubreact.checkpointing.CheckpointService.GetOffset
-import com.microsoft.azure.iot.iothubreact.checkpointing.{CheckpointActorSystem, SavePositionOnPull, Configuration ⇒ CPConfiguration}
+import com.microsoft.azure.iot.iothubreact.checkpointing.{CheckpointActorSystem, ICPConfiguration, SavePositionOnPull}
import com.microsoft.azure.iot.iothubreact.filters.Ignore
import scala.concurrent.Await
@@ -33,7 +33,7 @@ object IoTHubPartition extends Logger {
* @param partition IoT hub partition number (0-based). The number of
* partitions is set during the deployment.
*/
-private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logger {
+private[iothubreact] case class IoTHubPartition(val partition: Int)(implicit cpconfig: ICPConfiguration) extends Logger {
/** Stream returning all the messages from the given offset
*
@@ -46,7 +46,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
getSource(
withTimeOffset = true,
startTime = startTime,
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Stream returning all the messages from the given offset
@@ -60,7 +60,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
getSource(
withTimeOffset = false,
offset = offset,
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled)
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled)
}
/** Create a stream returning all the messages for the defined partition, from the given start
@@ -77,13 +77,14 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
withTimeOffset: Boolean,
offset: String = "",
startTime: Instant = Instant.MIN,
- withCheckpoints: Boolean = true): Source[MessageFromDevice, NotUsed] = {
+ withCheckpoints: Boolean = true,
+ startFromSavedCheckpoint: Boolean = false): Source[MessageFromDevice, NotUsed] = {
// Load the offset from the storage (if needed)
var _offset = offset
var _withTimeOffset = withTimeOffset
- if (withCheckpoints) {
- val savedOffset = GetSavedOffset()
+ if (withCheckpoints || startFromSavedCheckpoint) {
+ val savedOffset = GetSavedOffset
if (savedOffset != IoTHubPartition.OffsetCheckpointNotFound) {
_offset = savedOffset
_withTimeOffset = false
@@ -93,9 +94,9 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
// Build the source starting by time or by offset
val source: Source[MessageFromDevice, NotUsed] = if (_withTimeOffset)
- MessageFromDeviceSource(partition, startTime, withCheckpoints).filter(Ignore.keepAlive)
- else
- MessageFromDeviceSource(partition, _offset, withCheckpoints).filter(Ignore.keepAlive)
+ MessageFromDeviceSource(partition, startTime, withCheckpoints).filter(Ignore.keepAlive)
+ else
+ MessageFromDeviceSource(partition, _offset, withCheckpoints).filter(Ignore.keepAlive)
// Inject a flow to store the stream position after each pull
if (withCheckpoints) {
@@ -112,7 +113,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg
*/
private[this] def GetSavedOffset(): String = {
val partitionCp = CheckpointActorSystem.getCheckpointService(partition)
- implicit val rwTimeout = Timeout(CPConfiguration.checkpointRWTimeout)
+ implicit val rwTimeout = Timeout(cpconfig.checkpointRWTimeout)
try {
Retry(3, 5 seconds) {
log.debug(s"Loading the stream position for partition ${partition}")
diff --git a/src/test/scala/api/API.scala b/src/test/scala/api/API.scala
index a056d62..b5ca37b 100644
--- a/src/test/scala/api/API.scala
+++ b/src/test/scala/api/API.scala
@@ -3,14 +3,22 @@
// Namespace chosen to avoid access to internal classes
package api
+import java.util.UUID
+
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
// No global imports to make easier detecting breaking changes
-class APIIsBackwardCompatible extends org.scalatest.FeatureSpec {
+class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with MockitoSugar {
info("As a developer using Azure IoT hub React")
info("I want to be able to upgrade to new minor versions without changing my code")
info("So I can benefit from improvements without excessive development costs")
+ implicit val cpconfig = mock[ICPConfiguration]
+
feature("Version 0.x is backward compatible") {
scenario("Using MessageFromDevice") {
@@ -148,6 +156,7 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec {
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.CheckpointBackend
class CustomBackend extends CheckpointBackend {
+
override def readOffset(partition: Int): String = {
return ""
}
@@ -156,7 +165,10 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec {
}
val backend: CustomBackend = new CustomBackend()
- assert(backend.checkpointNamespace == "iothub-react-checkpoints")
+
+ val anyname = UUID.randomUUID.toString
+ when(cpconfig.storageNamespace).thenReturn(anyname)
+ assert(backend.checkpointNamespace == anyname)
}
scenario("Using Message Type") {
diff --git a/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala b/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala
index 57b70d7..646b43d 100644
--- a/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala
+++ b/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala
@@ -22,7 +22,6 @@ class IoTHubReactHasAnAwesomeAPI extends FeatureSpec with GivenWhenThen {
Given("An IoT hub is configured")
val hub = IoTHub()
- val hubPartition = IoTHubPartition(1)
When("A developer wants to fetch messages from Azure IoT hub")
val messagesFromAllPartitions: Source[MessageFromDevice, NotUsed] = hub.source(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment