Skip to content

Instantly share code, notes, and snippets.

@baunov
Created June 12, 2022 11:45
Show Gist options
  • Save baunov/9394a4d3d7794b5279e23d0064dd8c4f to your computer and use it in GitHub Desktop.
Save baunov/9394a4d3d7794b5279e23d0064dd8c4f to your computer and use it in GitHub Desktop.
import {
combineLatest,
distinctUntilChanged,
filter,
map,
Observable,
scan,
shareReplay,
switchMap,
withLatestFrom
} from 'rxjs';
import {AggregatedTrade, Candle} from 'binance-api-node';
import {BookTickerData} from '../types';
import Big from 'big.js';
interface UpdateData<T> {
timestamp: number;
value: T;
}
function getRealtimePrice(bookStream: Observable<BookTickerData>): Observable<UpdateData<Big>> {
return bookStream.pipe(
map((book) => {
return {timestamp: Date.now(), value: Big(book.bestAsk)};
}),
distinctUntilChanged((a, b) => {
return a.value.eq(b.value);
}),
shareReplay(1)
);
}
function getRealtimeVolume(klineStream: Observable<Candle>, tradesStream: Observable<AggregatedTrade>): Observable<UpdateData<Big>> {
return klineStream.pipe(
withLatestFrom(tradesStream),
filter(([candle, trade]) => candle.eventTime > trade.timestamp),
switchMap(([candle]) => {
return tradesStream.pipe(
filter((tradeData) => tradeData.timestamp > candle.eventTime),
scan((acc: { timestamp, value }, data: AggregatedTrade) => {
acc.timestamp = data.timestamp;
acc.value = acc.value.add(data.quantity);
return acc;
}, {timestamp: candle.eventTime, value: Big(candle.volume)}),
);
}),
distinctUntilChanged((a, b) => {
return a.value.eq(b.value);
}),
shareReplay(1),
);
}
export function getRealtimeKline(klineStream: Observable<Candle>,
tradesStream: Observable<AggregatedTrade>,
bookStream: Observable<BookTickerData>): Observable<Candle> {
const realtimeVolume = getRealtimeVolume(klineStream, tradesStream);
const realtimePrice = getRealtimePrice(bookStream);
return klineStream.pipe(
switchMap((btc) => {
return combineLatest([
realtimePrice,
realtimeVolume,
]).pipe(
map(([price, volume]) => {
const timestamp = Math.max(btc.eventTime, price.timestamp, volume.timestamp)
return {
...btc,
close: price.value.toString(),
volume: volume.value.toString(),
eventTime: timestamp,
}
}),
shareReplay(1)
)
})
);
}
// USAGE example
readonly realtimeBtc$: Observable<Candle> = getRealtimeKline(
this.btc$,
this.btcTrade$,
this.btcBidAsk$,
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment