Last active
April 18, 2017 18:58
-
-
Save samschlegel/cac3c948fee6f48c7c137d906485210b to your computer and use it in GitHub Desktop.
spotify/scio#546 - parseFn output type invariance
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class JImmutableMapWrapper[K, V](val self: JMap[K, V]) | |
extends Map[K, V] { | |
// scalastyle:off method.name | |
override def +[B1 >: V](kv: (K, B1)): Map[K, B1] = self.asScala.toMap + kv | |
override def -(key: K): Map[K, V] = self.asScala.toMap - key | |
// scalastyle:on method.name | |
override def get(key: K): Option[V] = Option(self.get(key)) | |
override def iterator: Iterator[(K, V)] = self.asScala.iterator | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def pubsubInWithAttributes[T: ClassTag](isSubscription: Boolean, | |
name: String, | |
idLabel: String, | |
timestampLabel: String) | |
: SCollection[(T, JImmutableMapWrapper[String, String])] = requireNotClosed { | |
if (this.isTest) { | |
this.getTestInput(PubsubIO(name)) | |
} else { | |
val elementCoder = pipeline.getCoderRegistry.getScalaCoder[T] | |
val outputCoder = pipeline.getCoderRegistry.getScalaCoder[(T, JImmutableMapWrapper[String, String])] | |
val parseFn = Functions.simpleFn { msg: PubsubMessage => | |
val element = CoderUtils.decodeFromByteArray(elementCoder, msg.getMessage) | |
val attributes = new JImmutableMapWrapper[String, String](msg.getAttributeMap) | |
(element, attributes) | |
} | |
val input: gio.PubsubIO.Read[(T, JImmutableMapWrapper[String, String])] = | |
if (isSubscription) { | |
gio.PubsubIO.read().subscription(name) | |
} else { | |
gio.PubsubIO.read().topic(name) | |
} | |
var transform = input | |
.withAttributes(parseFn) | |
.withCoder(outputCoder) | |
if (idLabel != null) { | |
transform = transform.idLabel(idLabel) | |
} | |
if (timestampLabel != null) { | |
transform = transform.timestampLabel(timestampLabel) | |
} | |
wrap(this.applyInternal(transform)).setName(name) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment