Last active
May 2, 2019 01:35
-
-
Save dluc/b72010e8eb622fbb96239b3418ba1f07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
:100644 100644 e163037... 0000000... M build.sbt | |
:100644 100644 97381e7... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala | |
:100644 100644 7d3fc76... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala | |
:100644 100644 997a9c9... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala | |
:100644 100644 db3d580... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala | |
:100644 100644 96bf8a3... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala | |
:100644 100644 d17aed3... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala | |
:100644 100644 91bfb71... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala | |
:100644 100644 c954630... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala | |
:100644 100644 8e54e26... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala | |
:100644 100644 f12c1f8... 0000000... M src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala | |
:100644 100644 a056d62... 0000000... M src/test/scala/api/API.scala | |
:100644 100644 57b70d7... 0000000... M src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala | |
diff --git a/build.sbt b/build.sbt | |
index e163037..5430880 100644 | |
--- a/build.sbt | |
+++ b/build.sbt | |
@@ -19,6 +19,7 @@ libraryDependencies <++= (scalaVersion) { | |
val datastaxDriverVersion = "3.1.1" | |
val json4sVersion = "3.5.0" | |
val akkaStreamVersion = "2.4.16" | |
+ val mockitoVersion = "1.10.19" | |
Seq( | |
// Library dependencies | |
@@ -32,7 +33,8 @@ libraryDependencies <++= (scalaVersion) { | |
// Tests dependencies | |
"org.scalatest" %% "scalatest" % scalaTestVersion % "test", | |
- "com.microsoft.azure.iothub-java-client" % "iothub-java-device-client" % iothubDeviceClientVersion % "test" | |
+ "com.microsoft.azure.iothub-java-client" % "iothub-java-device-client" % iothubDeviceClientVersion % "test", | |
+ "org.mockito" % "mockito-all" % mockitoVersion % "test" | |
) | |
} | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala | |
index 97381e7..3b1326b 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointActorSystem.scala | |
@@ -13,6 +13,7 @@ private[iothubreact] object CheckpointActorSystem { | |
implicit private[this] val actorSystem = ActorSystem("IoTHubReact") | |
implicit private[this] val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem)) | |
+ implicit private[this] val cpconfig = new CPConfiguration | |
var localRegistry: Map[String, ActorRef] = Map[String, ActorRef]() | |
/** Create an actor to read/write offset checkpoints from the storage | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala | |
index 7d3fc76..b77864a 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/CheckpointService.scala | |
@@ -31,7 +31,7 @@ private[iothubreact] object CheckpointService { | |
* | |
* @param partition IoT hub partition number [0..N] | |
*/ | |
-private[iothubreact] class CheckpointService(partition: Int) | |
+private[iothubreact] class CheckpointService(partition: Int)(implicit config: ICPConfiguration) | |
extends Actor | |
with Stash | |
with Logger { | |
@@ -97,8 +97,9 @@ private[iothubreact] class CheckpointService(partition: Int) | |
var offsetToStore: String = "" | |
val now = Instant.now.getEpochSecond | |
- val timeThreshold = Configuration.checkpointTimeThreshold.toSeconds | |
- val countThreshold = Configuration.checkpointCountThreshold | |
+ | |
+ val timeThreshold = config.checkpointTimeThreshold.toSeconds | |
+ val countThreshold = config.checkpointCountThreshold | |
// Check if the queue contains old offsets to flush (time threshold) | |
// Check if the queue contains data of too many messages (count threshold) | |
@@ -111,7 +112,7 @@ private[iothubreact] class CheckpointService(partition: Int) | |
} | |
if (offsetToStore == "") { | |
- log.debug(s"Checkpoint skipped: partition=${partition}, count ${queuedOffsets} < threshold ${Configuration.checkpointCountThreshold}") | |
+ log.debug(s"Checkpoint skipped: partition=${partition}, count ${queuedOffsets} < threshold ${config.checkpointCountThreshold}") | |
} else { | |
log.info(s"Writing checkpoint: partition=${partition}, storing ${offsetToStore} (current offset=${currentOffset})") | |
storage.writeOffset(partition, offsetToStore) | |
@@ -140,7 +141,7 @@ private[iothubreact] class CheckpointService(partition: Int) | |
def updateOffsetAction(offset: String) = { | |
if (!schedulerStarted) { | |
- val time = Configuration.checkpointFrequency | |
+ val time = config.checkpointFrequency | |
schedulerStarted = true | |
context.system.scheduler.schedule(time, time, self, StoreOffset) | |
log.info(s"Scheduled checkpoint for partition ${partition} every ${time.toMillis} ms") | |
@@ -167,7 +168,7 @@ private[iothubreact] class CheckpointService(partition: Int) | |
// TODO: Support plugins | |
def getCheckpointBackend: CheckpointBackend = { | |
- val conf = Configuration.checkpointBackendType | |
+ val conf = config.checkpointBackendType | |
conf.toUpperCase match { | |
case "AZUREBLOB" ⇒ new AzureBlob | |
case "CASSANDRA" ⇒ new CassandraTable | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala | |
index 997a9c9..d137fd5 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/Configuration.scala | |
@@ -4,22 +4,38 @@ package com.microsoft.azure.iot.iothubreact.checkpointing | |
import java.util.concurrent.TimeUnit | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Auth | |
import com.typesafe.config.{Config, ConfigFactory} | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
+import scala.util.Try | |
+ | |
+trait ICPConfiguration { | |
+ val isEnabled : Boolean | |
+ val storageNamespace : String | |
+ val checkpointBackendType : String | |
+ val checkpointFrequency : FiniteDuration | |
+ val checkpointRWTimeout : FiniteDuration | |
+ val checkpointCountThreshold : Int | |
+ val checkpointTimeThreshold : FiniteDuration | |
+ val azureBlobEmulator : Boolean | |
+ val azureBlobConnectionString : String | |
+ val azureBlobLeaseDuration : FiniteDuration | |
+ val cassandraCluster : String | |
+ val cassandraReplicationFactor: Int | |
+ val cassandraAuth : Option[Auth] | |
+} | |
/** Hold IoT Hub stream checkpointing configuration settings | |
*/ | |
-private[iothubreact] object Configuration { | |
+private[iothubreact] case class CPConfiguration(implicit conf: Config = ConfigFactory.load) extends ICPConfiguration { | |
// TODO: Allow to use multiple configurations, e.g. while processing multiple streams | |
// a client will need a dedicated checkpoint container for each stream | |
private[this] val confPath = "iothub-react.checkpointing." | |
- private[this] val conf: Config = ConfigFactory.load() | |
- | |
// Default time between checkpoint writes to the storage | |
private[this] val DefaultFrequency = 1 second | |
@@ -57,7 +73,10 @@ private[iothubreact] object Configuration { | |
private[this] val MaxTimeThreshold = 1 hour | |
// Default name of the container used to store checkpoint data | |
- private[this] val DefaultContainer = "iothub-react-checkpoints" | |
+ private[this] lazy val DefaultContainer = checkpointBackendType.toUpperCase match { | |
+ case "CASSANDRA" ⇒ "iothub_react_checkpoints" | |
+ case _ ⇒ "iothub-react-checkpoints" | |
+ } | |
// Whether checkpointing is enabled or not | |
lazy val isEnabled: Boolean = conf.getBoolean(confPath + "enabled") | |
@@ -70,11 +89,11 @@ private[iothubreact] object Configuration { | |
MaxFrequency) | |
// How many messages to replay after a restart, for each IoT hub partition | |
- lazy val checkpointCountThreshold = Math.max(1, conf.getInt(confPath + "countThreshold")) | |
+ lazy val checkpointCountThreshold: Int = Math.max(1, conf.getInt(confPath + "countThreshold")) | |
// Store a position if its value is older than this amount of time, rounded to seconds | |
// Min: 1 second, Max: 1 hour | |
- lazy val checkpointTimeThreshold = getDuration( | |
+ lazy val checkpointTimeThreshold: FiniteDuration = getDuration( | |
confPath + "timeThreshold", | |
DefaultTimeThreshold, | |
MinTimeThreshold, | |
@@ -107,8 +126,14 @@ private[iothubreact] object Configuration { | |
60 seconds) | |
// Cassandra cluster address | |
- lazy val cassandraCluster : String = conf.getString(confPath + "storage.cassandra.cluster") | |
- lazy val cassandraReplicationFactor: Int = conf.getInt(confPath + "storage.cassandra.replicationFactor") | |
+ lazy val cassandraCluster : String = conf.getString(confPath + "storage.cassandra.cluster") | |
+ lazy val cassandraReplicationFactor: Int = conf.getInt(confPath + "storage.cassandra.replicationFactor") | |
+ lazy val cassandraAuth : Option[Auth] = (for { | |
+ u <- Try(conf.getString(confPath + "storage.cassandra.username")) | |
+ p <- Try(conf.getString(confPath + "storage.cassandra.password")) | |
+ } yield { | |
+ Auth(u, p) | |
+ }).toOption | |
/** Load Azure blob connection string, taking care of the Azure storage emulator case | |
* | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala | |
index db3d580..82a2958 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/AzureBlob.scala | |
@@ -6,7 +6,7 @@ import java.io.IOException | |
import java.net.URISyntaxException | |
import java.util.UUID | |
-import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration | |
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition | |
import com.microsoft.azure.iot.iothubreact.{Logger, Retry} | |
import com.microsoft.azure.storage.blob.CloudBlockBlob | |
@@ -17,13 +17,13 @@ import scala.language.{implicitConversions, postfixOps} | |
/** Storage logic to write checkpoints to Azure blobs | |
*/ | |
-private[iothubreact] class AzureBlob extends CheckpointBackend with Logger { | |
+private[iothubreact] class AzureBlob(implicit config: ICPConfiguration) extends CheckpointBackend with Logger { | |
// Set the account to point either to Azure or the emulator | |
- val account: CloudStorageAccount = if (Configuration.azureBlobEmulator) | |
- CloudStorageAccount.getDevelopmentStorageAccount() | |
- else | |
- CloudStorageAccount.parse(Configuration.azureBlobConnectionString) | |
+ val account: CloudStorageAccount = if (config.azureBlobEmulator) | |
+ CloudStorageAccount.getDevelopmentStorageAccount() | |
+ else | |
+ CloudStorageAccount.parse(config.azureBlobConnectionString) | |
val client = account.createCloudBlobClient() | |
@@ -121,7 +121,7 @@ private[iothubreact] class AzureBlob extends CheckpointBackend with Logger { | |
// Note: the lease ID must be a Guid otherwise the service returs 400 | |
var leaseId = UUID.randomUUID().toString | |
try { | |
- file.acquireLease(Configuration.azureBlobLeaseDuration.toSeconds.toInt, leaseId) | |
+ file.acquireLease(config.azureBlobLeaseDuration.toSeconds.toInt, leaseId) | |
} catch { | |
case e: StorageException ⇒ { | |
@@ -146,9 +146,9 @@ private[iothubreact] class AzureBlob extends CheckpointBackend with Logger { | |
// The access condition depends on the file existing | |
val accessCondition = if (leaseId == "") | |
- AccessCondition.generateEmptyCondition() | |
- else | |
- AccessCondition.generateLeaseCondition(leaseId) | |
+ AccessCondition.generateEmptyCondition() | |
+ else | |
+ AccessCondition.generateLeaseCondition(leaseId) | |
try { | |
file.uploadText(content, "UTF-8", accessCondition, null, new OperationContext) | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala | |
index 96bf8a3..1d41c66 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CassandraTable.scala | |
@@ -3,7 +3,7 @@ | |
package com.microsoft.azure.iot.iothubreact.checkpointing.backends | |
import com.microsoft.azure.iot.iothubreact.Logger | |
-import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration | |
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.lib.Connection | |
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.cassandra.{CheckpointRecord, CheckpointsTableSchema} | |
import com.microsoft.azure.iot.iothubreact.scaladsl.IoTHubPartition | |
@@ -11,10 +11,10 @@ import org.json4s.JsonAST | |
/** Storage logic to write checkpoints to a Cassandra table | |
*/ | |
-private[iothubreact] class CassandraTable extends CheckpointBackend with Logger { | |
+private[iothubreact] class CassandraTable(implicit config: ICPConfiguration) extends CheckpointBackend with Logger { | |
val schema = new CheckpointsTableSchema(checkpointNamespace, "checkpoints") | |
- val connection = Connection(Configuration.cassandraCluster, Configuration.cassandraReplicationFactor, schema) | |
+ val connection = Connection(config.cassandraCluster, config.cassandraReplicationFactor, config.cassandraAuth, schema) | |
val table = connection.getTable[CheckpointRecord]() | |
connection.createKeyspaceIfNotExists() | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala | |
index d17aed3..f3b5dce 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/CheckpointBackend.scala | |
@@ -2,11 +2,11 @@ | |
package com.microsoft.azure.iot.iothubreact.checkpointing.backends | |
-import com.microsoft.azure.iot.iothubreact.checkpointing.Configuration | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration | |
trait CheckpointBackend { | |
- def checkpointNamespace: String = Configuration.storageNamespace | |
+ def checkpointNamespace(implicit config: ICPConfiguration): String = config.storageNamespace | |
/** Read the offset of the last record processed for the given partition | |
* | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala | |
index 91bfb71..c99a13a 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/checkpointing/backends/cassandra/lib/Connection.scala | |
@@ -13,11 +13,18 @@ import com.datastax.driver.core.Cluster | |
private[iothubreact] case class Connection( | |
contactPoint: String, | |
replicationFactor: Int, | |
+ auth: Option[Auth], | |
table: TableSchema) { | |
- private lazy val hostPort = extractHostPort() | |
- private lazy val cluster = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2).build() | |
- implicit lazy val session = cluster.connect() | |
+ private lazy val hostPort = extractHostPort() | |
+ private lazy val cluster = { | |
+ val builder = Cluster.builder().addContactPoint(hostPort._1).withPort(hostPort._2) | |
+ auth map { | |
+ creds ⇒ builder.withCredentials(creds.username, creds.password) | |
+ } getOrElse (builder) build() | |
+ } | |
+ | |
+ implicit lazy val session = cluster.connect() | |
/** Create the key space if not present | |
*/ | |
@@ -51,9 +58,9 @@ private[iothubreact] case class Connection( | |
val tokens = contactPoint.split(":") | |
val addr = tokens(0) | |
val port = if (tokens.size == 2) | |
- tokens(1).toInt | |
- else | |
- 9042 | |
+ tokens(1).toInt | |
+ else | |
+ 9042 | |
(addr, port) | |
} | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala | |
index c954630..89c0076 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/javadsl/IoTHub.scala | |
@@ -8,6 +8,7 @@ import java.util.concurrent.CompletionStage | |
import akka.stream.javadsl.{Sink, Source ⇒ JavaSource} | |
import akka.{Done, NotUsed} | |
import com.microsoft.azure.iot.iothubreact._ | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.{CPConfiguration, ICPConfiguration} | |
import com.microsoft.azure.iot.iothubreact.scaladsl.{IoTHub ⇒ IoTHubScalaDSL, OffsetList ⇒ OffsetListScalaDSL, PartitionList ⇒ PartitionListScalaDSL} | |
import com.microsoft.azure.iot.iothubreact.sinks.{DevicePropertiesSink, MessageToDeviceSink, MethodOnDeviceSink} | |
@@ -19,6 +20,8 @@ class IoTHub() { | |
private lazy val iotHub = new IoTHubScalaDSL() | |
+ private implicit val cpconfig: ICPConfiguration = new CPConfiguration | |
+ | |
/** Stop the stream | |
*/ | |
def close(): Unit = { | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala | |
index 8e54e26..4371c39 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHub.scala | |
@@ -8,7 +8,7 @@ import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.{Done, NotUsed} | |
import com.microsoft.azure.iot.iothubreact._ | |
-import com.microsoft.azure.iot.iothubreact.checkpointing.{Configuration ⇒ CPConfiguration} | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.{CPConfiguration, ICPConfiguration} | |
import com.microsoft.azure.iot.iothubreact.sinks.{DevicePropertiesSink, MessageToDeviceSink, MethodOnDeviceSink} | |
import scala.concurrent.Future | |
@@ -16,7 +16,7 @@ import scala.language.postfixOps | |
/** Provides a streaming source to retrieve messages from Azure IoT Hub | |
*/ | |
-case class IoTHub() extends Logger { | |
+case class IoTHub(implicit cpconfig: ICPConfiguration = new CPConfiguration) extends Logger { | |
// TODO: Provide ClearCheckpoints() method to clear the state | |
@@ -137,7 +137,7 @@ case class IoTHub() extends Logger { | |
withTimeOffset = false, | |
partitions = allPartitions, | |
offsets = fromStart, | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Stream returning all the messages from all the configured partitions. | |
@@ -154,7 +154,7 @@ case class IoTHub() extends Logger { | |
withTimeOffset = false, | |
partitions = Some(partitions), | |
offsets = fromStart, | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Stream returning all the messages starting from the given offset, from all | |
@@ -201,7 +201,7 @@ case class IoTHub() extends Logger { | |
withTimeOffset = true, | |
partitions = allPartitions, | |
startTime = startTime, | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Stream returning all the messages starting from the given time, from all | |
@@ -218,7 +218,7 @@ case class IoTHub() extends Logger { | |
withTimeOffset = true, | |
partitions = Some(partitions), | |
startTime = startTime, | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Stream returning all the messages starting from the given offset, from all | |
@@ -234,7 +234,7 @@ case class IoTHub() extends Logger { | |
withTimeOffset = false, | |
partitions = allPartitions, | |
offsets = Some(offsets), | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Stream returning all the messages starting from the given offset, from all | |
@@ -251,7 +251,7 @@ case class IoTHub() extends Logger { | |
withTimeOffset = false, | |
partitions = Some(partitions), | |
offsets = Some(offsets), | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Stream returning all the messages, from the given starting point, optionally with | |
@@ -280,9 +280,9 @@ case class IoTHub() extends Logger { | |
for (partition ← partitions.get.values) { | |
val graph = if (withTimeOffset) | |
- IoTHubPartition(partition).source(startTime, withCheckpoints).via(streamManager) | |
- else | |
- IoTHubPartition(partition).source(offsets.get.values(partition), withCheckpoints).via(streamManager) | |
+ IoTHubPartition(partition).source(startTime, withCheckpoints).via(streamManager) | |
+ else | |
+ IoTHubPartition(partition).source(offsets.get.values(partition), withCheckpoints).via(streamManager) | |
val source = Source.fromGraph(graph).async | |
source ~> merge | |
diff --git a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala | |
index f12c1f8..46d56e7 100644 | |
--- a/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala | |
+++ b/src/main/scala/com/microsoft/azure/iot/iothubreact/scaladsl/IoTHubPartition.scala | |
@@ -11,7 +11,7 @@ import akka.util.Timeout | |
import com.microsoft.azure.eventhubs.PartitionReceiver | |
import com.microsoft.azure.iot.iothubreact._ | |
import com.microsoft.azure.iot.iothubreact.checkpointing.CheckpointService.GetOffset | |
-import com.microsoft.azure.iot.iothubreact.checkpointing.{CheckpointActorSystem, SavePositionOnPull, Configuration ⇒ CPConfiguration} | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.{CheckpointActorSystem, ICPConfiguration, SavePositionOnPull} | |
import com.microsoft.azure.iot.iothubreact.filters.Ignore | |
import scala.concurrent.Await | |
@@ -33,7 +33,7 @@ object IoTHubPartition extends Logger { | |
* @param partition IoT hub partition number (0-based). The number of | |
* partitions is set during the deployment. | |
*/ | |
-private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logger { | |
+private[iothubreact] case class IoTHubPartition(val partition: Int)(implicit cpconfig: ICPConfiguration) extends Logger { | |
/** Stream returning all the messages from the given offset | |
* | |
@@ -46,7 +46,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg | |
getSource( | |
withTimeOffset = true, | |
startTime = startTime, | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Stream returning all the messages from the given offset | |
@@ -60,7 +60,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg | |
getSource( | |
withTimeOffset = false, | |
offset = offset, | |
- withCheckpoints = withCheckpoints && CPConfiguration.isEnabled) | |
+ withCheckpoints = withCheckpoints && cpconfig.isEnabled) | |
} | |
/** Create a stream returning all the messages for the defined partition, from the given start | |
@@ -77,13 +77,14 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg | |
withTimeOffset: Boolean, | |
offset: String = "", | |
startTime: Instant = Instant.MIN, | |
- withCheckpoints: Boolean = true): Source[MessageFromDevice, NotUsed] = { | |
+ withCheckpoints: Boolean = true, | |
+ startFromSavedCheckpoint: Boolean = false): Source[MessageFromDevice, NotUsed] = { | |
// Load the offset from the storage (if needed) | |
var _offset = offset | |
var _withTimeOffset = withTimeOffset | |
- if (withCheckpoints) { | |
- val savedOffset = GetSavedOffset() | |
+ if (withCheckpoints || startFromSavedCheckpoint) { | |
+ val savedOffset = GetSavedOffset | |
if (savedOffset != IoTHubPartition.OffsetCheckpointNotFound) { | |
_offset = savedOffset | |
_withTimeOffset = false | |
@@ -93,9 +94,9 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg | |
// Build the source starting by time or by offset | |
val source: Source[MessageFromDevice, NotUsed] = if (_withTimeOffset) | |
- MessageFromDeviceSource(partition, startTime, withCheckpoints).filter(Ignore.keepAlive) | |
- else | |
- MessageFromDeviceSource(partition, _offset, withCheckpoints).filter(Ignore.keepAlive) | |
+ MessageFromDeviceSource(partition, startTime, withCheckpoints).filter(Ignore.keepAlive) | |
+ else | |
+ MessageFromDeviceSource(partition, _offset, withCheckpoints).filter(Ignore.keepAlive) | |
// Inject a flow to store the stream position after each pull | |
if (withCheckpoints) { | |
@@ -112,7 +113,7 @@ private[iothubreact] case class IoTHubPartition(val partition: Int) extends Logg | |
*/ | |
private[this] def GetSavedOffset(): String = { | |
val partitionCp = CheckpointActorSystem.getCheckpointService(partition) | |
- implicit val rwTimeout = Timeout(CPConfiguration.checkpointRWTimeout) | |
+ implicit val rwTimeout = Timeout(cpconfig.checkpointRWTimeout) | |
try { | |
Retry(3, 5 seconds) { | |
log.debug(s"Loading the stream position for partition ${partition}") | |
diff --git a/src/test/scala/api/API.scala b/src/test/scala/api/API.scala | |
index a056d62..b5ca37b 100644 | |
--- a/src/test/scala/api/API.scala | |
+++ b/src/test/scala/api/API.scala | |
@@ -3,14 +3,22 @@ | |
// Namespace chosen to avoid access to internal classes | |
package api | |
+import java.util.UUID | |
+ | |
+import com.microsoft.azure.iot.iothubreact.checkpointing.ICPConfiguration | |
+import org.mockito.Mockito.when | |
+import org.scalatest.mockito.MockitoSugar | |
+ | |
// No global imports to make easier detecting breaking changes | |
-class APIIsBackwardCompatible extends org.scalatest.FeatureSpec { | |
+class APIIsBackwardCompatible extends org.scalatest.FeatureSpec with MockitoSugar { | |
info("As a developer using Azure IoT hub React") | |
info("I want to be able to upgrade to new minor versions without changing my code") | |
info("So I can benefit from improvements without excessive development costs") | |
+ implicit val cpconfig = mock[ICPConfiguration] | |
+ | |
feature("Version 0.x is backward compatible") { | |
scenario("Using MessageFromDevice") { | |
@@ -148,6 +156,7 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec { | |
import com.microsoft.azure.iot.iothubreact.checkpointing.backends.CheckpointBackend | |
class CustomBackend extends CheckpointBackend { | |
+ | |
override def readOffset(partition: Int): String = { | |
return "" | |
} | |
@@ -156,7 +165,10 @@ class APIIsBackwardCompatible extends org.scalatest.FeatureSpec { | |
} | |
val backend: CustomBackend = new CustomBackend() | |
- assert(backend.checkpointNamespace == "iothub-react-checkpoints") | |
+ | |
+ val anyname = UUID.randomUUID.toString | |
+ when(cpconfig.storageNamespace).thenReturn(anyname) | |
+ assert(backend.checkpointNamespace == anyname) | |
} | |
scenario("Using Message Type") { | |
diff --git a/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala b/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala | |
index 57b70d7..646b43d 100644 | |
--- a/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala | |
+++ b/src/test/scala/it/IoTHubReactHasAnAwesomeAPI.scala | |
@@ -22,7 +22,6 @@ class IoTHubReactHasAnAwesomeAPI extends FeatureSpec with GivenWhenThen { | |
Given("An IoT hub is configured") | |
val hub = IoTHub() | |
- val hubPartition = IoTHubPartition(1) | |
When("A developer wants to fetch messages from Azure IoT hub") | |
val messagesFromAllPartitions: Source[MessageFromDevice, NotUsed] = hub.source(false) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment