Skip to content

Instantly share code, notes, and snippets.

@cdodd
Created July 25, 2018 20:44
Show Gist options
  • Save cdodd/3afb17d7743be8acecdcf43a01f855a5 to your computer and use it in GitHub Desktop.
Save cdodd/3afb17d7743be8acecdcf43a01f855a5 to your computer and use it in GitHub Desktop.
import json
import requests
from requests.auth import HTTPBasicAuth
class BitcoinRpc(object):
def __init__(self, username, password, host, port=8332):
self.username = username
self.password = password
self.host = host
self.port = port
def get_blockchain_info(self):
return self._generic_request('getblockchaininfo', [])
def get_mempool_info(self):
return self._generic_request('getmempoolinfo', [])
def get_raw_tx(self, tx_id):
raw_tx = self._generic_request('getrawtransaction', [tx_id])
return bytearray.fromhex(raw_tx)
def get_block_from_hash(self, block_hash):
return self._generic_request('getblock', [block_hash])
def get_block_from_height(self, height):
return self._generic_request('getblock', [self.get_block_hash(height)])
def get_block_hash(self, height):
return self._generic_request('getblockhash', [height])
def _generic_request(self, method, args=[]):
resp = requests.post(
'http://%s:%s' % (self.host, self.port),
auth=HTTPBasicAuth(self.username, self.password),
data=json.dumps({
'jsonrpc': '1.0',
'method': method,
'params': args,
}),
)
return resp.json()['result']
#!/usr/bin/env python3
import datetime
import json
import multiprocessing
import queue
import threading
import tqdm
import requests
import btc
def produce(work_queue, start, end):
for i in range(start, end + 1):
work_queue.put(i)
def consume(thread_id, work, api, mutex, pbar):
while not work.empty():
block_height = work.get()
data = api.get_block_from_height(block_height)
index_data = {
'@timestamp': datetime.datetime.fromtimestamp(data['time']).isoformat() + 'Z',
'difficulty': data['difficulty'],
'weight': data['weight'],
'tx': len(data['tx']),
'height': block_height,
}
# Count segwit transactions
# for tx in data['tx']:
# raw_tx = api.get_raw_tx(tx)
# if raw_tx[4:6] == bytearray(b'\x00\x01'):
# try:
# index_data['segwit_count'] += 1
# except KeyError:
# index_data['segwit_count'] = 1
requests.post(
'http://localhost:9200/block/stat',
data=json.dumps(index_data),
headers={'Content-Type': 'application/json'},
)
with mutex:
pbar.update(1)
def main():
range_s = 30000
range_e = 40000
thread_count = 15
work_queue = queue.Queue()
pbar = tqdm.tqdm(total=(range_e - range_s))
pgbar_mutex = multiprocessing.Lock()
api = btc.BitcoinRpc(
username='rpc-user',
password='password',
host='10.0.0.1',
)
threading.Thread(target=produce, args=[work_queue, range_s, range_e]).start()
consumer_list = []
for i in range(thread_count):
consumer = threading.Thread(target=consume, args=[
i,
work_queue,
api,
pgbar_mutex,
pbar,
])
consumer.start()
consumer_list.append(consumer)
for consumer in consumer_list:
consumer.join()
pbar.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment