Skip to content

Instantly share code, notes, and snippets.


Created Jun 29, 2017
What would you like to do?
the description for this gist
class QueueObservableReactivePublisherSpec
extends PublisherVerification[AckableGetResponse](new TestEnvironment())
with TestNGSuiteLike
with TestSetup {
/** auxiliary, setup and cleanup code omitted */
implicit val s: Scheduler =
// Indicates that QueueObservable never completes, can only signal error.
// Causes that cases requiring `onComplete` are ignored.
override val maxElementsFromPublisher = Long.MaxValue
//Creates publisher emitting `n` elements that will be tested
override def createPublisher(n: Long) = {
val queue = declareQueue()
Future {
val props = new AMQP.BasicProperties()
val body = Array.fill(1000)(0.toByte) => channel.basicPublish("", queue, props, body))
QueueObservable(queue, connection).toReactivePublisher(s)
//Fails during execution because queue doesn't exist.
override def createFailedPublisher(): Publisher[AckableGetResponse] =
QueueObservable("queueThatShouldNotBe", connection).toReactivePublisher(s)
override def required_spec109_mustIssueOnSubscribeForNonNullSubscriber() =
throw new SkipException(
"original spec checks more than written in textual specification"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.