Skip to content

Instantly share code, notes, and snippets.

@laanwj
Last active March 10, 2023 07:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save laanwj/b3d7b01ef61ce07c2eff0a72a6b90183 to your computer and use it in GitHub Desktop.
Save laanwj/b3d7b01ef61ce07c2eff0a72a6b90183 to your computer and use it in GitHub Desktop.
Get TorV3 addresses from a node and check if they are connectable
#!/usr/bin/env python3
import easyrpc
import socks
import sys
out = sys.stdout
log = sys.stderr
log.write(f"Getting addresses from local node…\n")
p = easyrpc.get_rpc_proxy(network='main')
data = p.getnodeaddresses(0)
onions = [x for x in data if x['address'].endswith('.onion') and len(x['address']) > 22]
success = []
for rec in onions:
log.write(f"Checking {rec['address']}:{rec['port']} services {rec['services']:016x}… ")
log.flush()
sock = socks.socksocket()
sock.set_proxy(socks.SOCKS5, '127.0.0.1', 9050)
try:
sock.connect((rec['address'], rec['port']))
except IOError:
log.write("[fail]")
else:
log.write("[ok]")
sock.close()
success.append(rec)
log.write("\n")
success.sort(key=lambda rec: rec['address'])
for rec in success:
out.write(f"{rec['address']}:{rec['port']}\n")
# Copyright (c) 2011 Jeff Garzik
#
# Previous copyright, from python-jsonrpc/jsonrpc/proxy.py:
#
# Copyright (c) 2007 Jan-Klaas Kollhof
#
# This file is part of jsonrpc.
#
# jsonrpc is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this software; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""HTTP proxy for opening RPC connection to bitcoind.
AuthServiceProxy has the following improvements over python-jsonrpc's
ServiceProxy class:
- HTTP connections persist for the life of the AuthServiceProxy object
(if server supports HTTP/1.1)
- sends protocol 'version', per JSON-RPC 1.1
- sends proper, incrementing 'id'
- sends Basic HTTP authentication headers
- parses all JSON numbers that look like floats as Decimal
- uses standard Python json lib
"""
import base64
import decimal
from http import HTTPStatus
import http.client
import json
import logging
import os
import socket
import time
import urllib.parse
HTTP_TIMEOUT = 30
USER_AGENT = "AuthServiceProxy/0.1"
log = logging.getLogger("BitcoinRPC")
class JSONRPCException(Exception):
def __init__(self, rpc_error, http_status=None):
try:
errmsg = '%(message)s (%(code)i)' % rpc_error
except (KeyError, TypeError):
errmsg = ''
super().__init__(errmsg)
self.error = rpc_error
self.http_status = http_status
def EncodeDecimal(o):
if isinstance(o, decimal.Decimal):
return str(o)
raise TypeError(repr(o) + " is not JSON serializable")
class AuthServiceProxy():
__id_count = 0
# ensure_ascii: escape unicode as \uXXXX, passed to json.dumps
def __init__(self, service_url, service_name=None, timeout=HTTP_TIMEOUT, connection=None, ensure_ascii=True):
self.__service_url = service_url
self._service_name = service_name
self.ensure_ascii = ensure_ascii # can be toggled on the fly by tests
self.__url = urllib.parse.urlparse(service_url)
user = None if self.__url.username is None else self.__url.username.encode('utf8')
passwd = None if self.__url.password is None else self.__url.password.encode('utf8')
authpair = user + b':' + passwd
self.__auth_header = b'Basic ' + base64.b64encode(authpair)
self.timeout = timeout
self._set_conn(connection)
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
# Python internal stuff
raise AttributeError
if self._service_name is not None:
name = "%s.%s" % (self._service_name, name)
return AuthServiceProxy(self.__service_url, name, connection=self.__conn)
def _request(self, method, path, postdata):
'''
Do a HTTP request, with retry if we get disconnected (e.g. due to a timeout).
This is a workaround for https://bugs.python.org/issue3566 which is fixed in Python 3.5.
'''
headers = {'Host': self.__url.hostname,
'User-Agent': USER_AGENT,
'Authorization': self.__auth_header,
'Content-type': 'application/json'}
if os.name == 'nt':
# Windows somehow does not like to re-use connections
# TODO: Find out why the connection would disconnect occasionally and make it reusable on Windows
# Avoid "ConnectionAbortedError: [WinError 10053] An established connection was aborted by the software in your host machine"
self._set_conn()
try:
self.__conn.request(method, path, postdata, headers)
return self._get_response()
except (BrokenPipeError, ConnectionResetError):
# Python 3.5+ raises BrokenPipeError when the connection was reset
# ConnectionResetError happens on FreeBSD
self.__conn.close()
self.__conn.request(method, path, postdata, headers)
return self._get_response()
except OSError as e:
retry = (
'[WinError 10053] An established connection was aborted by the software in your host machine' in str(e))
# Workaround for a bug on macOS. See https://bugs.python.org/issue33450
retry = retry or ('[Errno 41] Protocol wrong type for socket' in str(e))
if retry:
self.__conn.close()
self.__conn.request(method, path, postdata, headers)
return self._get_response()
else:
raise
def get_request(self, *args, **argsn):
AuthServiceProxy.__id_count += 1
log.debug("-{}-> {} {}".format(
AuthServiceProxy.__id_count,
self._service_name,
json.dumps(args or argsn, default=EncodeDecimal, ensure_ascii=self.ensure_ascii),
))
if args and argsn:
raise ValueError('Cannot handle both named and positional arguments')
return {'version': '1.1',
'method': self._service_name,
'params': args or argsn,
'id': AuthServiceProxy.__id_count}
def __call__(self, *args, **argsn):
postdata = json.dumps(self.get_request(*args, **argsn), default=EncodeDecimal, ensure_ascii=self.ensure_ascii)
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8'))
if response['error'] is not None:
raise JSONRPCException(response['error'], status)
elif 'result' not in response:
raise JSONRPCException({
'code': -343, 'message': 'missing JSON-RPC result'}, status)
elif status != HTTPStatus.OK:
raise JSONRPCException({
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status)
else:
return response['result']
def batch(self, rpc_call_list):
postdata = json.dumps(list(rpc_call_list), default=EncodeDecimal, ensure_ascii=self.ensure_ascii)
log.debug("--> " + postdata)
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8'))
if status != HTTPStatus.OK:
raise JSONRPCException({
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status)
return response
def _get_response(self):
req_start_time = time.time()
try:
http_response = self.__conn.getresponse()
except socket.timeout:
raise JSONRPCException({
'code': -344,
'message': '%r RPC took longer than %f seconds. Consider '
'using larger timeout for calls that take '
'longer to return.' % (self._service_name,
self.__conn.timeout)})
if http_response is None:
raise JSONRPCException({
'code': -342, 'message': 'missing HTTP response from server'})
content_type = http_response.getheader('Content-Type')
if content_type != 'application/json':
raise JSONRPCException(
{'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server' % (http_response.status, http_response.reason)},
http_response.status)
responsedata = http_response.read().decode('utf8')
response = json.loads(responsedata, parse_float=decimal.Decimal)
elapsed = time.time() - req_start_time
if "error" in response and response["error"] is None:
log.debug("<-%s- [%.6f] %s" % (response["id"], elapsed, json.dumps(response["result"], default=EncodeDecimal, ensure_ascii=self.ensure_ascii)))
else:
log.debug("<-- [%.6f] %s" % (elapsed, responsedata))
return response, http_response.status
def __truediv__(self, relative_uri):
return AuthServiceProxy("{}/{}".format(self.__service_url, relative_uri), self._service_name, connection=self.__conn)
def _set_conn(self, connection=None):
port = 80 if self.__url.port is None else self.__url.port
if connection:
self.__conn = connection
self.timeout = connection.timeout
elif self.__url.scheme == 'https':
self.__conn = http.client.HTTPSConnection(self.__url.hostname, port, timeout=self.timeout)
else:
self.__conn = http.client.HTTPConnection(self.__url.hostname, port, timeout=self.timeout)
'''
Convenience utility for connecting to a bitcoind instance through RPC.
'''
# SPDX-License-Identifier: MIT
import os
import urllib
from authproxy import AuthServiceProxy
# Datadir suffix and RPC port per bitcoind chain.
CHAIN_PARAMS = {
'main': ("", 8332),
'test': ("testnet3", 18332),
'signet': ("signet", 38332),
'regtest': ("regtest", 18443),
}
def get_rpc_proxy(*, network=None, host=None, port=None, user=None, passwd=None, wallet=None, datadir=None):
'''
Create a bitcoind RPC proxy.
If you specify `network` (main, test, signet, regtest) it will try to establish cookie authentication
to a local bitcoind based on the `datadir` (defaults to `~/.bitcoin`).
It is also possible to specify `host`, `port`, `user` and `passwd` (or a subset of them) manually.
A specific `wallet` can be specified as well (defaults to no wallet).
'''
if port is None:
if network is None:
raise ValueError('A network must be provided if no RPC port is provided')
port = CHAIN_PARAMS[network][1]
if host is None:
host = '127.0.0.1'
if user is None or passwd is None: # Cookie auth
if network is None:
raise ValueError('A network must be provided for cookie authentication')
if datadir is None:
datadir = os.getenv("DATADIR", os.path.join(os.getenv('HOME'), '.bitcoin'))
datadir = os.path.join(datadir, CHAIN_PARAMS[network][0])
with open(os.path.join(datadir, '.cookie'),'r') as f:
auth = f.read()
else: # Explicit username and password
auth = urllib.parse.quote(user) + ':' + urllib.parse.quote(passwd)
tgt = f"http://{auth}@{host}:{port}/"
if wallet is not None:
tgt += 'wallet/' + urllib.parse.quote(wallet)
return AuthServiceProxy(tgt)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment