Skip to content

Instantly share code, notes, and snippets.

@samschlegel
Last active April 18, 2017 18:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samschlegel/cac3c948fee6f48c7c137d906485210b to your computer and use it in GitHub Desktop.
Save samschlegel/cac3c948fee6f48c7c137d906485210b to your computer and use it in GitHub Desktop.
spotify/scio#546 - parseFn output type invariance
class JImmutableMapWrapper[K, V](val self: JMap[K, V])
extends Map[K, V] {
// scalastyle:off method.name
override def +[B1 >: V](kv: (K, B1)): Map[K, B1] = self.asScala.toMap + kv
override def -(key: K): Map[K, V] = self.asScala.toMap - key
// scalastyle:on method.name
override def get(key: K): Option[V] = Option(self.get(key))
override def iterator: Iterator[(K, V)] = self.asScala.iterator
}
private def pubsubInWithAttributes[T: ClassTag](isSubscription: Boolean,
name: String,
idLabel: String,
timestampLabel: String)
: SCollection[(T, JImmutableMapWrapper[String, String])] = requireNotClosed {
if (this.isTest) {
this.getTestInput(PubsubIO(name))
} else {
val elementCoder = pipeline.getCoderRegistry.getScalaCoder[T]
val outputCoder = pipeline.getCoderRegistry.getScalaCoder[(T, JImmutableMapWrapper[String, String])]
val parseFn = Functions.simpleFn { msg: PubsubMessage =>
val element = CoderUtils.decodeFromByteArray(elementCoder, msg.getMessage)
val attributes = new JImmutableMapWrapper[String, String](msg.getAttributeMap)
(element, attributes)
}
val input: gio.PubsubIO.Read[(T, JImmutableMapWrapper[String, String])] =
if (isSubscription) {
gio.PubsubIO.read().subscription(name)
} else {
gio.PubsubIO.read().topic(name)
}
var transform = input
.withAttributes(parseFn)
.withCoder(outputCoder)
if (idLabel != null) {
transform = transform.idLabel(idLabel)
}
if (timestampLabel != null) {
transform = transform.timestampLabel(timestampLabel)
}
wrap(this.applyInternal(transform)).setName(name)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment