Skip to content

Instantly share code, notes, and snippets.

Created June 29, 2017 06:17
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save anonymous/46975caf0e8a2c1d67f85f07d1ad1c14 to your computer and use it in GitHub Desktop.
the description for this gist
class QueueObservableReactivePublisherSpec
extends PublisherVerification[AckableGetResponse](new TestEnvironment())
with TestNGSuiteLike
with TestSetup {
/** auxiliary, setup and cleanup code omitted */
implicit val s: Scheduler = Scheduler.io()
// Indicates that QueueObservable never completes, can only signal error.
// Causes that cases requiring `onComplete` are ignored.
override val maxElementsFromPublisher = Long.MaxValue
//Creates publisher emitting `n` elements that will be tested
override def createPublisher(n: Long) = {
val queue = declareQueue()
Future {
val props = new AMQP.BasicProperties()
val body = Array.fill(1000)(0.toByte)
1L.to(n).foreach(_ => channel.basicPublish("", queue, props, body))
}
QueueObservable(queue, connection).toReactivePublisher(s)
}
//Fails during execution because queue doesn't exist.
override def createFailedPublisher(): Publisher[AckableGetResponse] =
QueueObservable("queueThatShouldNotBe", connection).toReactivePublisher(s)
override def required_spec109_mustIssueOnSubscribeForNonNullSubscriber() =
throw new SkipException(
"original spec checks more than written in textual specification"
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment