Skip to content

Instantly share code, notes, and snippets.

@MtkN1
Last active May 30, 2021 06:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MtkN1/4b511c8328bf19ec378b3b7f3591a586 to your computer and use it in GitHub Desktop.
Save MtkN1/4b511c8328bf19ec378b3b7f3591a586 to your computer and use it in GitHub Desktop.
import asyncio
from typing import Any, Dict
import motor.motor_asyncio
import pybotters
class BybitMongoDB:
def __init__(self):
client = motor.motor_asyncio.AsyncIOMotorClient()
db = client['bybit']
self.collection = db['inverse']
self.queue = asyncio.Queue()
asyncio.create_task(self.consumer())
def onmessage(self, msg: Dict[str, Any], ws):
if 'topic' in msg:
topic: str = msg['topic']
if topic.startswith('trade'):
self.queue.put_nowait(msg['data'])
async def consumer(self):
while True:
data = await self.queue.get()
await self.collection.insert_many(data)
async def main():
async with pybotters.Client() as client:
db = BybitMongoDB()
wstask = await client.ws_connect(
'wss://stream.bybit.com/realtime',
send_json={'op': 'subscribe', 'args': ['trade.BTCUSD']},
hdlr_json=db.onmessage,
)
await wstask
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
pip install pybotters motor
import pymongo
client = pymongo.MongoClient()
db = client['bybit']
collection = db['inverse']
# 2021-05-30T05:59:18.000Z 以降のドキュメントを5件取得する
for doc in collection.find({'timestamp': {'$gte': '2021-05-30T05:59:18.000Z'}}, projection={'_id': False}).limit(5):
print(doc)
print('-' * 80)
# 最新のレコードを降順で1件取得する
doc = collection.find_one(sort=[('_id', pymongo.DESCENDING)], projection={'_id': False})
print(doc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment