Skip to content

Instantly share code, notes, and snippets.

@vaibhavgeek
Created July 14, 2022 13:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vaibhavgeek/48220557ffdcce303024ce662a7d4255 to your computer and use it in GitHub Desktop.
Save vaibhavgeek/48220557ffdcce303024ce662a7d4255 to your computer and use it in GitHub Desktop.
Zesty Indexer by @JSeam
import os
import asyncio
import json
import logging
import traceback
import requests
from datetime import datetime
from tortoise import Tortoise
from web3 import Web3, HTTPProvider, exceptions
from backend.auth.models import Campaign, Contract, SellerAuction, Space, User
from backend.config import Config
class Indexer(object):
@classmethod
async def backfill_async(self):
indexer_logger = logging.getLogger('bob.indexer')
"""
Backfill onchain data to the database
"""
# set up connection to db
await Tortoise.init(
db_url=Config.DB_URI,
modules={'models': [
'backend.auth.models',
'aerich.models'
]}
)
skip = 0 # Starting skip
while True:
try:
graph_polygon = os.getenv("GRAPH_POLYGON")
query = self.construct_token_datas_query(50, skip)
response = requests.post(
graph_polygon, "", json={"query": query})
data = json.loads(response.text)
tokenDatas = data['data']['tokenDatas']
if len(tokenDatas) == 0:
break
for token in tokenDatas:
id = int(token["id"])
burned = token["burned"]
seller_nft_setting = token["sellerNFTSetting"]
# Occasionally seller_nft_setting is none, we'll just skip that
if seller_nft_setting is None:
continue
seller_auctions = seller_nft_setting["sellerAuctions"]
# returns a space object and bool indicating existence
space_obj, _ = await Space.get_or_create(id=id)
creator = token["creator"]
creator_obj, _ = await User.get_or_create(eth_address=creator.lower())
await creator_obj.save()
owner = token["owner"]
owner_obj, _ = await User.get_or_create(eth_address=owner.lower())
await creator_obj.save()
uri = token["uri"]
ipfs_hash = uri
# we don't consider ar://
if uri[0:7] == "ipfs://":
ipfs_hash = ipfs_hash[7:]
space_obj.uri = uri
else:
space_obj.uri = "ipfs://" + uri
space_obj.burned = burned
space_obj.creator = creator_obj
space_obj.owner = owner_obj
# get ipfs data
ipfs_url = "https://ipfs.zesty.market/ipfs/{}".format(
ipfs_hash)
space_uri_data = requests.get(ipfs_url).json()
space_obj.name = space_uri_data['name']
space_obj.description = space_uri_data['description']
space_obj.location = space_uri_data['location']
if space_uri_data['image'][0:7] == "ipfs://":
space_obj.image = space_uri_data['image']
else:
space_obj.image = "ipfs://" + space_uri_data['image']
space_obj.space_format = space_uri_data['format']
await space_obj.save()
for auction in seller_auctions:
auction_id = auction["id"]
buyer_campaigns = auction["buyerCampaigns"]
# returns a seller_auction_obj and bool indicating existence
seller_auction_obj, _ = await SellerAuction.get_or_create(id=auction_id)
seller_auction_obj.space = space_obj
# create a campaign link with the last campaign id
try:
campaign_obj, _ = await Campaign.get_or_create(
id=auction["buyerCampaignsIdList"][-1])
except IndexError:
campaign_obj = None
# get campaign state
try:
if auction["buyerCampaignsPending"][-1] == True:
seller_auction_obj.campaign_approved = False
else:
seller_auction_obj.campaign_approved = True
except IndexError:
seller_auction_obj.campaign_approved = False
seller_auction_obj.cancelled = auction["cancelled"]
seller_auction_obj.price_start = int(
auction["priceStart"])
seller_auction_obj.price_pending = int(
auction["pricePending"])
seller_auction_obj.price_end = int(auction["priceEnd"])
seller_auction_obj.auction_time_start = datetime.fromtimestamp(
int(auction["auctionTimeStart"]))
seller_auction_obj.auction_time_end = datetime.fromtimestamp(
int(auction["auctionTimeEnd"]))
seller_auction_obj.contract_time_start = datetime.fromtimestamp(
int(auction["contractTimeStart"]))
seller_auction_obj.contract_time_end = datetime.fromtimestamp(
int(auction["contractTimeEnd"]))
seller_auction_obj.campaign = campaign_obj
await seller_auction_obj.save()
for campaign in buyer_campaigns:
buyer = campaign["buyer"]
# returns a campaign_obj and bool indicating existence
campaign_obj, _ = await Campaign.get_or_create(id=campaign["id"])
# we don't consider ar://
uri = campaign["uri"]
ipfs_hash = uri
if uri[0:7] == "ipfs://":
ipfs_hash = ipfs_hash[7:]
campaign_obj.uri = uri
else:
campaign_obj.uri = "ipfs://" + uri
ipfs_url = "https://ipfs.zesty.market/ipfs/{}".format(
ipfs_hash)
campaign_uri_data = requests.get(ipfs_url).json()
user_obj, _ = await User.get_or_create(eth_address=buyer)
await user_obj.save()
campaign_obj.user = user_obj
campaign_obj.name = campaign_uri_data['name']
campaign_obj.description = campaign_uri_data['description']
campaign_obj.url = campaign_uri_data['url']
campaign_obj.image = campaign_uri_data['image']
if campaign_uri_data['image'][0:7] == "ipfs://":
campaign_obj.image = campaign_uri_data['image']
else:
campaign_obj.image = "ipfs://" + \
campaign_uri_data['image']
campaign_obj.campaign_format = campaign_uri_data['format']
await campaign_obj.save()
contracts = campaign["contracts"]
for contract in contracts:
# returns a contract_obj and bool indicating existence
contract_obj, _ = await Contract.get_or_create(id=contract["id"])
contract_obj.withdrawn = contract["withdrawn"]
contract_obj.campaign = campaign_obj
contract_obj.seller_auction = seller_auction_obj
contract_obj.contract_time_start = datetime.fromtimestamp(
int(contract["contractTimeStart"]))
contract_obj.contract_time_end = datetime.fromtimestamp(
int(contract["contractTimeEnd"]))
contract_obj.contract_value = int(
contract["contractValue"])
await contract_obj.save()
skip += 50
except Exception as e:
print(traceback.format_exc())
indexer_logger.error(
"[Backfill] Error: %s \nTraceback: {}".format(e, traceback.format_exc()))
@classmethod
def backfill(self):
try:
loop = asyncio.get_running_loop()
loop.run_until_complete(self.backfill_async())
except RuntimeError:
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(self.backfill_async())
except RuntimeError:
loop = asyncio.new_event_loop()
loop.run_until_complete(self.backfill_async())
def construct_buyer_campaigns_query(first, skip) -> str:
return """
{{
{
buyerCampaigns(first: {}, skip: {}) {
id
buyer
uri
sellerAuctions{
id
}
contracts{
id
}
cumulativeVolumeUSDC
}
}
}}
""".format(first, skip)
def seller_auction_query(first, skip) -> str:
return """
{
sellerAuctions(first: {}, skip: {}) {
id
currency
sellerNFTSetting {
id
tokenData {
id
}
}
seller
auctionTimeStart
auctionTimeEnd
auctionTimeApprove
contractTimeStart
contractTimeEnd
priceStart
pricePending
priceEnd
buyerCampaigns {
id
}
buyerCampaignsIdList
buyerCampaignsPending
contract {
id
}
cancelled
}
}""".format(first, skip)
def construct_token_datas_query(first, skip) -> str:
return """
{{
tokenDatas(first: {}, skip: {}) {{
id
creator
owner
uri
burned
sellerNFTSetting {{
sellerAuctions(first: 1000) {{
id
auctionTimeStart
auctionTimeEnd
contractTimeStart
contractTimeEnd
priceStart
pricePending
priceEnd
buyerCampaignsIdList
buyerCampaignsPending
buyerCampaignsApproved
cancelled
buyerCampaigns(first: 1000, where: {{ uri_not: null }}) {{
id
uri
buyer
contracts {{
id
withdrawn
contractTimeStart
contractTimeEnd
contractValue
}}
}}
}}
}}
}}
}}
""".format(first, skip)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment