Skip to content

Instantly share code, notes, and snippets.

@markizano
Last active April 19, 2024 14:28
Show Gist options
  • Save markizano/79a0534d9ed2767d24decc09539fdc57 to your computer and use it in GitHub Desktop.
Save markizano/79a0534d9ed2767d24decc09539fdc57 to your computer and use it in GitHub Desktop.
Watch for bids against my NFT and store the price history so I can see how popular it gets.
#!/usr/bin/env python3
'''
Use case: I have an NFT for sale and I want to know when someone makes an offer on it.
I'm tired of refreshing the page at https://opensea.io/assets/ethereum/0x583f608324bce3569472d750b45ae5892d546a04/128
so I can have a custom dashboard if I wish to know the fluctuations of the offers against this item I'm wathing until sold.
This script will suscribe to updates on OpenSea's API and will insert the offers into a MongoDB database.
In this way, I can track updates withouth having to refresh the page so intently and can build my own price graph.
Documentation:
- Get your own OpenSea API token: https://opensea.io/account/settings?tab=developer
- Learn about OpenSea API Streaming (this is where this API call lives): https://docs.opensea.io/reference/stream-api-overview
- PyMongo Docs: https://pymongo.readthedocs.io/en/stable/api/
pip3 install opensea-stream pymongo dateparser kizano
Saved this script as `~/bin/bids2db` and executed as such:
chmod 755 ~/bin/bids2db
bids2db go-fish
Configuration file saved in ~/.config/opensea/config.yml
Sample config file:
```yaml
api_key: <your-api-key>
contract_address: '<contract-address-of-NFT>'
collection_name: '<collection-name>'
token_id: <token-id-of-NFT>
mongo_uri: 'mongodb://<mongo-user>:<mongo-pass>@<mongo-host>:<mongo-port>/OpenSea?authSource=admin'
mongo_dbname: OpenSea
```
'''
import sys
import requests
import opensea_sdk as opensea
from pymongo import MongoClient
from dateparser import parse as dateparse
from datetime import datetime
import kizano
kizano.Config.APP_NAME = 'opensea'
log = kizano.getLogger(__name__)
class OpenSeaFisher:
'''
Go fishing on OpenSea for offers against a specific NFT.
Establish a client and make the API call for us.
Return the API data about the offers against the contract address and token id.
'''
_instance = None
@staticmethod
def getInstance():
if OpenSeaFisher._instance is None:
OpenSeaFisher._instance = OpenSeaFisher()
return OpenSeaFisher._instance
def __init__(self) -> None:
self.config = kizano.getConfig()
self.contract_address = self.config['contract_address']
self.token_id = self.config['token_id']
log.info('> Connecting to DB ...')
self.mongo = MongoClient(self.config['mongo_uri'])
self.db = self.mongo[self.config['mongo_dbname']]
if not list( self.db.OpenSeaBids.list_indexes() ):
# Create an index on order_hash to be a unique item.
self.db.OpenSeaBids.create_index('order_hash', unique=True)
self.db.OpenSeaBids.create_index('created_date')
self.openseastream = None
log.info('> Connected!')
def __del__(self):
if hasattr(self, 'unsubscription') and self.unsubscription:
self.unsubscription()
if self.openseastream:
self.openseastream.disconnect()
if self.mongo:
self.mongo.close()
def start(self):
log.info('Start listening for events...')
try:
log.info('> Connecting to OpenSea ...')
self.openseastream = opensea.OpenseaStreamClient(self.config['api_key'], opensea.Network.MAINNET)
log.info('> Connected!')
log.info('> Subscribing to OpenSea events ...')
self.unsubscription = self.openseastream.onEvents(
[self.config['collection_name']],
[ '*' ],
lambda data: self.payload(data))
log.info('> Subscribed!')
self.openseastream.startListening()
except KeyboardInterrupt:
log.info('Stopping...')
self.unsubscription()
self.openseastream.disconnect()
self.mongo.close()
self.unsubscription = self.openseastream = self.mongo = None
def payload(self, payload: dict):
'''
Handle the response from the streaming api by writing the records to DB.
Craft a decent data structure to insert into the DB.
'''
# A condensed version of the event, since their data structure is pretty large.
log.debug(payload)
try:
eventType = payload.get('event', payload.get('event_type', ''))
if eventType not in ('item_received_offer', 'item_received_bid', 'collection_offer'):
log.info(f'Ignoring event {payload["event"]}')
return
if eventType in ('item_received_offer', 'item_received_bid'):
log.info(f'Offer received for {payload["payload"]["payload"]["nft_id"]}')
event = {
'collection_slug': payload['payload']['payload']['collection']['slug'],
'created_date': dateparse(payload['payload']['payload']['created_date']),
'event_timestamp': dateparse(payload['payload']['payload']['event_timestamp']),
'expiration_date': dateparse(payload['payload']['payload']['expiration_date']),
'name': payload['payload']['payload']['metadata']['name'],
'nft_id': payload['payload']['payload']['nft_id'],
'permalink': payload['payload']['payload']['permalink'],
'order_hash': payload['payload']['payload']['order_hash'],
'payment_token': payload['payload']['payload']['payment_token'],
'offer': payload['payload']['payload']['protocol_data']['parameters']['offer'],
}
elif eventType == 'collection_offer':
log.info(f'Offer received for {payload["payload"]["collection"]["slug"]}')
if payload['payload']['asset_contract_criteria']['address'] != self.contract_address:
log.info(f'Ignoring event for {payload["payload"]["asset_contract_criteria"]["address"]} since it is not ours.')
return
event = {
'collection_slug': payload['payload']['collection']['slug'],
'created_date': dateparse(payload['payload']['created_date']),
'event_timestamp': dateparse(payload['payload']['event_timestamp']),
'expiration_date': dateparse(payload['payload']['expiration_date']),
'nft_id': f'{payload["payload"]["chain"]}/{payload["payload"]["asset_contract_criteria"]["address"]}/{self.token_id}',
'order_hash': payload['payload']['order_hash'],
'payment_token': payload['payload']['payment_token'],
'offer': payload['payload']['protocol_data']['parameters']['offer'],
}
nft_id = f'ethereum/{self.contract_address}/{self.token_id}'
if nft_id != event['nft_id']:
log.info(f'Ignoring event for {event["nft_id"]} since it is not ours.')
return
# Insert the event into the DB. The unique index will ignore duplicates for us.
self.db.OpenSeaBids.insert_one(event)
except Exception as e:
log.error(f'Error: ')
log.error(e)
def showOffers(self):
'''
Fetch the offers from the DB and print them to the console in a pretty format.
Filter out expired offers.
'''
for offer in self.db.OpenSeaBids.find({'expiration_date': {'$gt': datetime.now()}}).sort('created_date', 1):
# skip records not containing offer, they don't have all the datar.
if 'offer' not in offer:
continue
amount = int(offer['offer'][0]['startAmount']) / pow(10, offer['payment_token']['decimals'])
price = amount * float(offer['payment_token']['usd_price'])
payment_str = f'{amount} {offer["payment_token"]["symbol"]} (${price}) '
log.info(f'C={offer["created_date"]}; E={offer["expiration_date"]} - {payment_str}')
def showAllOffers(self):
'''
Fetch ALL offers from the DB and print them to the console in a pretty format.
Filter out expired offers.
'''
count = 0
for offer in self.db.OpenSeaBids.find({}).sort('created_date', 1):
# skip records not containing offer, they don't have all the datar.
if 'offer' not in offer:
continue
amount = int(offer['offer'][0]['startAmount']) / pow(10, offer['payment_token']['decimals'])
price = amount * float(offer['payment_token']['usd_price'])
payment_str = f'{amount} {offer["payment_token"]["symbol"]} (${price}) '
log.info(f'C={offer["created_date"]}; E={offer["expiration_date"]} - {payment_str}')
count +=1
log.info(f'Total offers found: {count}')
def collectHistoricEvents(self):
'''
Use the OpenSea API to fetch all the events for the contract address and token id.
'''
api_endpoint = f'https://api.opensea.io/api/v2/events/collection/{self.config["collection_name"]}'
params = {
'event_type': 'offer',
'after': 1713306480,
}
headers = [
('Accept', 'application/json'),
('X-API-KEY', self.config['api_key']),
]
response = requests.get(api_endpoint, headers=headers, params=params)
if response.status_code != 200:
log.error(f'Error fetching events: {response.status_code}')
return
events = response.json()
while 'next' in events:
for event in events['asset_events']:
if event['contract_address'] != self.contract_address:
continue
params['next'] = events['next']
response = requests.get(api_endpoint, headers=headers, params=params)
def main():
'''
Entrypoint: Use OpenSea's streaming sdk api to find all offers against ${CONTRACT_ADDRESS} with ${TOKEN_ID}
If there are any new offers we don't have in DB, insert them.
'''
log.info('Welcome!')
fisher = OpenSeaFisher.getInstance()
cmd = sys.argv[1] if len(sys.argv) > 1 else ''
if cmd == 'go-fish':
fisher.start()
elif cmd == 'show-offers':
fisher.showOffers()
elif cmd == 'show-all-offers':
fisher.showAllOffers()
del fisher, OpenSeaFisher._instance
log.info('Goodbye!')
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment