Skip to content

Instantly share code, notes, and snippets.

@huntc
Last active September 15, 2020 02:09
Show Gist options
  • Save huntc/a3f7f3c6ae9c9fe6f379564324af777e to your computer and use it in GitHub Desktop.
Save huntc/a3f7f3c6ae9c9fe6f379564324af777e to your computer and use it in GitHub Desktop.
Merge configuration
import akka.stream.scaladsl.Source
def serviceStreamWithConfigMap[CK, CV, E, M](
staticConfig: Source[(CK, CV), M],
dynamicConfig: Source[(CK, CV), M],
serviceStream: Source[E, M]) : Source[(E, Map[CK, CV]), M] =
staticConfig
.fold(Right[(CK, CV), Map[CK, CV]](Map.empty)) {
case (Right(config), (k, v)) => Right(config.updated(k, v))
}
.concat(dynamicConfig.map(Left.apply))
.scan(Right[E, Map[CK, CV]](Map.empty[CK, CV])) {
case (_, Right(config)) => Right(config)
case (Right(config), Left((k, v))) => Right(config.updated(k, v))
}
.merge(serviceStream.map(Left.apply))
.scan((Option.empty[E], Map.empty[CK, CV])) {
case (_, Right(config)) => None -> config
case ((_, config), Left(e)) => Some(e) -> config
}
.collect {
case (Some(e), config) => (e, config)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment