Created
June 29, 2017 06:17
-
-
Save anonymous/46975caf0e8a2c1d67f85f07d1ad1c14 to your computer and use it in GitHub Desktop.
the description for this gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class QueueObservableReactivePublisherSpec | |
extends PublisherVerification[AckableGetResponse](new TestEnvironment()) | |
with TestNGSuiteLike | |
with TestSetup { | |
/** auxiliary, setup and cleanup code omitted */ | |
implicit val s: Scheduler = Scheduler.io() | |
// Indicates that QueueObservable never completes, can only signal error. | |
// Causes that cases requiring `onComplete` are ignored. | |
override val maxElementsFromPublisher = Long.MaxValue | |
//Creates publisher emitting `n` elements that will be tested | |
override def createPublisher(n: Long) = { | |
val queue = declareQueue() | |
Future { | |
val props = new AMQP.BasicProperties() | |
val body = Array.fill(1000)(0.toByte) | |
1L.to(n).foreach(_ => channel.basicPublish("", queue, props, body)) | |
} | |
QueueObservable(queue, connection).toReactivePublisher(s) | |
} | |
//Fails during execution because queue doesn't exist. | |
override def createFailedPublisher(): Publisher[AckableGetResponse] = | |
QueueObservable("queueThatShouldNotBe", connection).toReactivePublisher(s) | |
override def required_spec109_mustIssueOnSubscribeForNonNullSubscriber() = | |
throw new SkipException( | |
"original spec checks more than written in textual specification" | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment