Skip to content

Instantly share code, notes, and snippets.

@nickebbutt
Last active August 29, 2015 14:12
Show Gist options
  • Save nickebbutt/62942e2ac5f124fdeb26 to your computer and use it in GitHub Desktop.
Save nickebbutt/62942e2ac5f124fdeb26 to your computer and use it in GitHub Desktop.
Spread Calculation from independent Bid and Offer streams
package org.od.maprecord;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* Show how a spread can be calculated from an observable stream of bid prices and an observable stream of offer prices
*/
public class TestSpreadCalc {
private Subject<Double, Double> bidStream;
private Subject<Double, Double> offerStream;
private List<Double> resultantSpreads = new ArrayList<>();
@Before
public void doSetUp() {
bidStream = BehaviorSubject.create();
offerStream = BehaviorSubject.create();
resultantSpreads.clear();
}
@Test
public void testCombineBidAndOfferStreamsToSpreadStream() {
Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> {
return offer - bid;
});
//capture the output spreads
spreadStream.subscribe(resultantSpreads::add);
newBid(99);
//this will be ignored since there is not yet an offer
newBidAndOffer(99, 100);
//spread is 1
newOffer(101);
//spread is 2
newBid(98d);
//spread is 3
//here we get 3 spreads in the output, which is what we want because we receive the second bid and second offer independently
assertEquals(Arrays.asList(1d, 2d, 3d), resultantSpreads);
}
@Test
public void testCombineDoesNotPreserveAtomicityWhenFieldDeltasReceivedTogether() {
Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> {
return offer - bid;
});
//capture the output spreads
spreadStream.subscribe(resultantSpreads::add);
newBidAndOffer(99, 100);
//spread is 1
newBidAndOffer(98, 101);
//spread is 3
//here we still get 1,2,3 in the output, which is not really what we want - we have lost the atomicity of the newBidAndOffer
//we really want 1, 3 without the transitional 2
//this example serves to demonstrate the problem of preserving atomicity when we splitting an update into separate streams for bid and offer
assertEquals(Arrays.asList(1d, 2d, 3d), resultantSpreads);
}
//simluate receiving a new bid and offer atomically
private void newBidAndOffer(double newBid, double newOffer) {
bidStream.onNext(newBid);
offerStream.onNext(newOffer);
}
private void newBid(double newBid) {
bidStream.onNext(newBid);
}
private void newOffer(double newOffer) {
offerStream.onNext(newOffer);
}
}
package org.od.maprecord;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.subjects.BehaviorSubject;
import rx.subjects.Subject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* Fix the problem of atomicity where we split an atomic update into separate streams
*/
public class TestSpreadCalcPreservingAtomicity {
private Subject<Double, Double> bidStream;
private Subject<Double, Double> offerStream;
private Subject<Long, Long> completionStream;
private long updateCount = 0;
private List<Double> resultantSpreads = new ArrayList<Double>();
@Before
public void doSetUp() {
bidStream = BehaviorSubject.create();
offerStream = BehaviorSubject.create();
completionStream = BehaviorSubject.create();
updateCount = 0;
resultantSpreads.clear();
}
@Test
public void testCombineLatestWithSamplingOnCompletionOfUpdateToPreserveAtomicity() {
Observable<Double> spreadStream = Observable.combineLatest(bidStream, offerStream, (bid, offer) -> {
return offer - bid;
});
//sample the spread stream to take the spread value only when each atomic set of deltas has been completely processed
spreadStream.sample(completionStream).subscribe(resultantSpreads::add);
newBidAndOffer(99, 100);
//spread is 1
newBidAndOffer(98, 101);
//spread is 3
//here we get 2 spreads in the output - we have solved the atomicity problem by sampling the latest
//value from the spread stream only when the neBidAndOffer message is fully complete
assertEquals(Arrays.asList(1d, 3d), resultantSpreads);
}
//simluate receiving a new bid and offer atomically
private void newBidAndOffer(double newBid, double newOffer) {
bidStream.onNext(newBid);
offerStream.onNext(newOffer);
notifyUpdateCompleted();
}
private void newBid(double newBid) {
bidStream.onNext(newBid);
notifyUpdateCompleted();
}
private void newOffer(double newOffer) {
offerStream.onNext(newOffer);
notifyUpdateCompleted();
}
private void notifyUpdateCompleted() {
//now notify update complete, using a system timestamp
completionStream.onNext(updateCount++);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment