Skip to content

Instantly share code, notes, and snippets.

/post.scala

Created Jun 29, 2017
Embed
What would you like to do?
the description for this gist
class QueueObservableReactivePublisherSpec
extends PublisherVerification[AckableGetResponse](new TestEnvironment())
with TestNGSuiteLike
with TestSetup {
/** auxiliary, setup and cleanup code omitted */
implicit val s: Scheduler = Scheduler.io()
// Indicates that QueueObservable never completes, can only signal error.
// Causes that cases requiring `onComplete` are ignored.
override val maxElementsFromPublisher = Long.MaxValue
//Creates publisher emitting `n` elements that will be tested
override def createPublisher(n: Long) = {
val queue = declareQueue()
Future {
val props = new AMQP.BasicProperties()
val body = Array.fill(1000)(0.toByte)
1L.to(n).foreach(_ => channel.basicPublish("", queue, props, body))
}
QueueObservable(queue, connection).toReactivePublisher(s)
}
//Fails during execution because queue doesn't exist.
override def createFailedPublisher(): Publisher[AckableGetResponse] =
QueueObservable("queueThatShouldNotBe", connection).toReactivePublisher(s)
override def required_spec109_mustIssueOnSubscribeForNonNullSubscriber() =
throw new SkipException(
"original spec checks more than written in textual specification"
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.