Skip to content

Instantly share code, notes, and snippets.

@allenday
Last active November 11, 2022 21:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save allenday/92f9874c2912d3b19afeb00d681ac950 to your computer and use it in GitHub Desktop.
Save allenday/92f9874c2912d3b19afeb00d681ac950 to your computer and use it in GitHub Desktop.
CREATE TEMP FUNCTION
KECCAK256(data STRING)
RETURNS STRING
LANGUAGE js AS """
var utf8 = unescape(encodeURIComponent(data));
var arr = [];
for (var i = 0; i < utf8.length; i++) {
arr.push(utf8.charCodeAt(i));
}
return ethers.utils.keccak256(arr)
"""
OPTIONS
( library="gs://blockchain-etl-bigquery/ethers.js" );
CREATE TEMP FUNCTION
PARSE_LOG(data STRING, topics ARRAY<STRING>)
RETURNS STRUCT<`caller` STRING, `pool` STRING>
LANGUAGE js AS """
var parsedEvent = {"anonymous": false, "inputs": [{"indexed": true, "internalType": "address", "name": "caller", "type": "address"}, {"indexed": true, "internalType": "address", "name": "pool", "type": "address"}], "name": "LOG_NEW_POOL", "type": "event"}
return abi.decodeEvent(parsedEvent, data, topics, false);
"""
OPTIONS
( library="gs://blockchain-etl-bigquery/ethjs-abi.js" );
WITH parsed_logs AS
(SELECT
logs.block_timestamp AS block_timestamp
,logs.block_number AS block_number
,logs.transaction_hash AS transaction_hash
,logs.log_index AS log_index
,PARSE_LOG(logs.data, logs.topics) AS parsed
FROM `bigquery-public-data.crypto_ethereum.logs` AS logs
WHERE TRUE
AND address = '0x9424b1412450d0f8fc2255faf6046b98213b76bd' -- can be removed to match clones
AND topics[SAFE_OFFSET(0)] = KECCAK256("LOG_NEW_POOL(address,address)")
),
BFactory_evt_LOG_NEW_POOL AS
(SELECT
block_timestamp
,block_number
,transaction_hash
,log_index
,parsed.caller AS `caller`
,parsed.pool AS `pool`
FROM parsed_logs
),
ERC20_evt_Transfer AS
(SELECT
x.block_timestamp AS evt_block_time,
x.token_address AS contract_address,
x.from_address AS `from`,
x.to_address AS `to`,
SAFE_DIVIDE(SAFE_CAST(x.value AS INT64),POW(10,SAFE_CAST(t.decimals AS FLOAT64))) AS value
FROM
`bigquery-public-data.crypto_ethereum.token_transfers` AS x,
`bigquery-public-data.crypto_ethereum.tokens` AS t
WHERE x.token_address = t.address
),
-- BEGIN;
-- DROP MATERIALIZED VIEW IF EXISTS balancer.view_balances;
-- CREATE MATERIALIZED VIEW balancer.view_balances AS (
pools AS (
SELECT pool as pools
FROM BFactory_evt_LOG_NEW_POOL
),
joins AS (
SELECT p.pools as pool, date_trunc(e.evt_block_time, day) AS day, e.contract_address AS token, SUM(value) AS amount
FROM ERC20_evt_Transfer e
INNER JOIN pools p ON e.`to` = p.pools
GROUP BY 1, 2, 3
UNION ALL
SELECT e.`to` as pool, date_trunc(e.evt_block_time, day) AS day, e.contract_address AS token, SUM(value) AS amount
FROM ERC20_evt_Transfer e
WHERE e.`to` = '\xBA12222222228d8Ba445958a75a0704d566BF2C8'
GROUP BY 1, 2, 3
),
exits AS (
SELECT p.pools as pool, date_trunc(e.evt_block_time, day) AS day, e.contract_address AS token, -SUM(value) AS amount
FROM ERC20_evt_Transfer e
INNER JOIN pools p ON e.`from` = p.pools
GROUP BY 1, 2, 3
UNION ALL
SELECT e.`from` as pool, date_trunc(e.evt_block_time, day) AS day, e.contract_address AS token, -SUM(value) AS amount
FROM ERC20_evt_Transfer e
WHERE e.`from` = '\xBA12222222228d8Ba445958a75a0704d566BF2C8'
GROUP BY 1, 2, 3
),
daily_delta_balance_by_token AS (
SELECT pool, DATE(day) AS day, token, SUM(COALESCE(amount, 0)) AS amount FROM
(SELECT *
FROM joins j
UNION ALL
SELECT *
FROM exits e) foo
GROUP BY 1, 2, 3
),
cumulative_balance_by_token AS (
SELECT
pool,
token,
day,
LEAD(day, 1, CURRENT_DATE) OVER (PARTITION BY token, pool ORDER BY day) AS day_of_next_change,
SUM(amount) OVER (PARTITION BY pool, token ORDER BY day ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_amount
FROM daily_delta_balance_by_token
),
calendar AS (
SELECT day FROM UNNEST(generate_date_array(CAST('2020-01-01' AS DATE), CURRENT_DATE, INTERVAL 1 DAY)) AS day
),
running_cumulative_balance_by_token AS (
SELECT c.day, pool, token, cumulative_amount
FROM calendar c
LEFT JOIN cumulative_balance_by_token b ON b.day <= c.day AND c.day < b.day_of_next_change
)
SELECT * FROM running_cumulative_balance_by_token
-- );
-- CREATE UNIQUE INDEX IF NOT EXISTS balancer_view_balances_day_idx ON balancer.view_balances (day, token, pool);
-- INSERT INTO cron.job(schedule, command)
-- VALUES ('*/12 * * * *', $$REFRESH MATERIALIZED VIEW CONCURRENTLY balancer.view_balances$$)
-- ON CONFLICT (command) DO UPDATE SET schedule=EXCLUDED.schedule;
-- COMMIT;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment