Skip to content

Instantly share code, notes, and snippets.

@kanzure
Created July 6, 2014 21:18
Show Gist options
  • Save kanzure/ee33ad55a98ad45283d3 to your computer and use it in GitHub Desktop.
Save kanzure/ee33ad55a98ad45283d3 to your computer and use it in GitHub Desktop.
python stratum proxy for cryptocurrency mining
"""
Simple python stratum proxy for connecting multiple miners to multiple pool
servers.
There are two steps for handling a miner client, corresponding to two classes
(AuthenticationStratumClient and ProxyClient). The first step is to handle
authentication from the miner client to the proxy server. This involves the
regular stratum protocol and using the "client.reconnect" message. The second
step is when the miner client connects to the second port, which is a
pass-through connection to the correct mining pool (using the ProxyClient class).
stratum:
https://github.com/slush0/stratum
http://mining.bitcoin.cz/stratum-mining
http://diyhpl.us/~bryan/papers2/bitcoin/Stratum-networkprotocolspec.pdf
relevant:
https://github.com/CaptEmulation/stratum-proxy
https://github.com/CaptEmulation/stratum-proxy/issues/1
mining:
https://github.com/zone117x/node-stratum-pool
https://github.com/slush0/stratum-mining
generic gevent stuff:
http://sdiehl.github.io/gevent-tutorial/
http://www.gevent.org/intro.html#example
http://www.gevent.org/servers.html
"""
# TODO: Make authentication use a real username/password. Each username should
# be associated with different mining pool configuration settings.
# TODO: Authenticate users and extract information against a database (probably
# postgresql).
# TODO: record all messages to database for replay and debugging
# TODO: store logs somewhere
# TODO: pool_read_loop has no way of knowing if the connection to the pool has
# been killed until the miner client writes. So a connection to a miner will
# stay open until the miner sends something, at which point the connection is
# determined to be broken. There may be a way to send intermediate messages to
# the pool over the connection, and then filter out the responses and don't
# show them to the miner client. This way, the server will know when the
# connection is closed.
# TODO: Make each class have its own type of logger. Every logging message
# should be from "self.log" or something, and it should include enough
# information to identify which miner (address, port), which bound address and
# port, which port is being used to connect to the pool server, which pool
# server is being connected to (address, port), and the internal IDs for those
# objects.
# TODO: ProxyClient should handle "client.reconnect" messages from the upstream
# stratum server. Don't pass these along to the miner client.
import functools
import time
import logging
import json
import socket
import gevent
import gevent.server
import gevent.queue
import gevent.socket
PROXY_ADDRESS = "0.0.0.0"
PROXY_PORT = 3333
# TODO: don't use a global logger, put it in a function, etc.
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s")
logger = logging.getLogger(__name__)
class StratumProxy(object):
"""
Basic server for handling incoming stratum connections. Every incoming
stratum connection is subjected to authentication and is then told to
reconnect to another port. The authentication is handled by
AuthenticationStratumClient and everything else is handled by ProxyClient.
"""
def __init__(self):
# connected clients
self.clients = []
self.client_counter = 0
self.proxy_clients = []
self.proxy_client_counter = 0
self.newsocks = []
# place to put StreamServers
self.proxy_servers = []
def handle_authentication_connection(self, sock, address):
"""
Handle connections for the gevent.server.StreamServer instance. The
incoming socket connection is from the miner, since pools don't
initiate outgoing connections. This is only for
AuthenticationStratumClient connections.
:param sock: standard socket object
:param address: (ip address (str), port (int))
"""
logger.info("New connection from {0} port {1}".format(address[0], address[1]))
# make the client handle the connection stuff
client = AuthenticationStratumClient(
sock=sock,
address=address[0],
port=address[1],
server=self,
id=self.client_counter,
)
self.clients.append(client)
self.client_counter += 1
try:
client.run_forever()
except Exception:
logger.error("Unhandled exception!", exc_info=True)
finally:
logger.info("Server removing miner client {}".format(client))
# The client handler is responsible for shutting down itself in a
# sane manner inside the run_forever method. Here is only a small
# line to handle global cleanup.
self.clients.remove(client)
try:
sock.close()
except socket.error:
logger.debug("Exception encountered when trying to shut down a socket.")
def handle_other_connection(self, data, sock, address):
"""
Handle a miner client connecting to another port (the actual proxy
port). Handle the connection with a ProxyClient instance.
"""
sock_port = sock.getsockname()[1]
ip_address = address[0]
port = address[1]
logger.info("New proxy connection to port {0} from {1} port {2}".format(sock_port, ip_address, port))
proxy_client = ProxyClient(
sock=sock,
address=ip_address,
port=port,
server=self,
id=self.proxy_client_counter,
data=data,
)
self.proxy_clients.append(proxy_client)
self.proxy_client_counter += 1
# Make a reference so that a stop method can find how to set
# _disconnect.
sock.proxy_client = proxy_client
try:
proxy_client.run_forever()
except Exception:
logger.error("Unhandled exception!", exc_info=True)
finally:
logger.info("Server removing miner proxy client {}".format(proxy_client))
# The client handler is responsible for shutting down itself in a
# sane manner inside the run_forever method. Here is only a small
# line to handle global cleanup.
self.proxy_clients.remove(proxy_client)
try:
sock.close()
except socket.error:
logger.debug("Exception encountered when trying to shut down a socket.")
# StreamServer isn't dead. A miner could connect back to this port
# again and another pool connection will be established.
def stop(self):
"""
Kill the StreamServer objects.
"""
for proxy_server in self.proxy_servers:
try:
logger.debug("Attempting to stop proxy server {}".format(proxy_server))
# try to get it to stop on its own
proxy_server.socket.proxy_client._disconnected = True
gevent.sleep(0)
logger.debug("Continuing with attempt to stop proxy server {}".format(proxy_server))
# TODO: what about the "fp" from socket.makefile()?
# close the socket
proxy_server.socket.close()
# Timeout is in seconds, default is 1 second and will wait for
# currently running handlers in the pool to be killed.
proxy_server.stop(timeout=0)
except Exception:
logger.debug("Unhandled exception while attempting to kill proxy server {}".format(proxy_server))
class StratumClient(object):
ERROR_OTHER = 20
ERROR_JOB_NOT_FOUND = 21
ERROR_DUPLICATE_SHARE = 22
ERROR_LOW_DIFFICULTY_SHARE = 23
ERROR_UNAUTHORIZED = 24
ERROR_NOT_SUBSCRIBED = 25
errors = {
ERROR_OTHER: "Other/Unknown",
ERROR_UNAUTHORIZED: "Unauthorized worker",
ERROR_NOT_SUBSCRIBED: "Not subscribed",
# these stratum errors probably shouldn't be generated by this proxy
ERROR_JOB_NOT_FOUND: "Job not found (stale)",
ERROR_DUPLICATE_SHARE: "Duplicate share",
ERROR_LOW_DIFFICULTY_SHARE: "Low difficulty share",
}
def __init__(self, sock, address, port, server, id, **kwarg):
self.id = id
self.sock = sock
self.address = address
self.port = port
self.server = server
self.connection_initiation_time = int(time.time())
# Cheap trick for informing the read loop that the write loop
# encountered a broken socket connection.
self._disconnected = False
# not all connecting miners will have a good username/password
self.authenticated = False
self.subscribed = False
# result of socket getpeername
self.peer_name = None
# socket connection goes here
self.fp = None
# for the write loop
self.write_queue = gevent.queue.Queue()
# just some greenlets...
self.write_greenlet = None
def configure_socket(self):
"""
Configure the socket connection with some basic settings.
"""
# seconds before sending keepalive probes
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 120)
# interval in seconds between keepalive probes
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1)
# failed keepalive probles before declaring other end dead
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5)
self.peer_name = self.sock.getpeername()
self.fp = self.sock.makefile()
def before_looping(self):
"""
Additional configuration and initialization to be executed prior to
starting the read and write loops.
"""
pass
def after_looping(self):
"""
Additional cleanup that might be required after ending the read/write
loops with the locally connected miner client.
"""
pass
def send_error(self, number=20, id=1):
"""
Add error message to the write queue. The write loop will dump the
information over the connection.
"""
error = {
"id": id,
"result": None,
"error": (number, self.errors[number], None),
}
logger.warn("Error number {} on ip {}".format(number, self.peer_name[0]))
message = json.dumps(error, separators=(",", ":"))
message += "\n" # stratum reasons
self.write_queue.put(message)
def write_loop(self):
while True:
try:
# write messages to the miner
for item in self.write_queue:
self.fp.write(item)
self.fp.flush()
except socket.error:
logger.debug("Write loop terminated. Informing the read loop.")
# Inform the read loop, which needs to know about disconnection
# as well.
self._disconnected = True
# exit the while loop
break
except Exception:
logger.warn("Unhandled exception in write loop!", exc_info=True)
self._disconnected = True
# also exit the while loop
break
finally:
# This still gets executed even in the case of
# self._disconnected/break.
gevent.sleep(0)
def read_loop(self):
while True:
if self._disconnected:
logger.debug("Read loop encountered flag from write loop, exiting.")
break
timeout_seconds = 300
timeout_value = "timeout"
line = gevent.with_timeout(
timeout_seconds,
self.fp.readline,
timeout_value=timeout_value,
)
# be forgiving and continue if timeout was hit anyway
if line == timeout_value:
# mining pools probably do vardiff adjustment stuff here
continue
# stratum protocol includes \n at the end of every line
line = line.strip()
# empty messages will end the connection
if not len(line):
logger.debug("Socket data was empty, throwing the connection.")
break
try:
data = json.loads(line)
except ValueError:
logger.warn("not json: {}".format(line))
# communicate the error to the client
self.send_error()
# be forgiving and wait for the next message
continue
logger.debug("Client {} received data: {}".format(self.id, data))
self.handle_stratum_message(data)
gevent.sleep(0)
def run_forever(self):
"""
gevent coroutine main loop stuff goes here
"""
try:
# setup the socket connection
self.configure_socket()
self.before_looping()
# start writing down the socket
self.write_greenlet = gevent.spawn(self.write_loop)
# Inform the miner client that there was a problem connecting to
# the pool.
if hasattr(self, "pool_connection_error") and self.pool_connection_error:
logger.info("Informing the miner client about the failure to connect to the pool.")
message = {
"error": [82001, "The proxy couldn't establish a connection to the pool.", None],
"id": None,
"result": False,
}
self.write_queue.put(json.dumps(message) + "\n")
self._disconnected = True
gevent.sleep(0)
# start reading from the socket
self.read_loop()
except socket.error:
logger.debug("Socket error closing connection", exc_info=True)
except Exception:
logger.error("Unhandled exception!", exc_info=True)
finally:
logger.info("Closing the connection.")
if self.write_greenlet:
self.write_greenlet.kill()
try:
self.sock.shutdown(socket.SHUT_RDWR)
except socket.error:
logger.debug("Couldn't shutdown the socket with SHUT_RDWR.")
try:
self.fp.close()
self.sock.close()
except (socket.error, AttributeError):
logger.debug("Couldn't close the socket.")
try:
self.after_looping()
except Exception:
logger.debug("Unhandled exception in after_looping (during shutdown).", exc_info=True)
return
class AuthenticationStratumClient(StratumClient):
"""
Handle stratum miner authentication and tell the miner client to reconnect
to another port automatically. Deploys the appropriate proxy server stuff
to listen on the other port.
"""
def start_proxy_client_server(self):
"""
Start a new StreamServer for handling a connection from the miner
client and a connection to a mining pool.
:return: (address, port)
"""
# bind a new socket to a new port
newsock = gevent.socket.socket()
newsock.bind(("0.0.0.0", 0))
newsock.listen(0)
# Information about which mining pool to connect to could be attached
# to the socket here. Alternatively, it could be passed in by wrapping
# self.server.handle_other_connection but that seems a tiny bit worse.
# Oops... looks like the wrapper is needed. :( The data was not
# preserved on the socket object.
data = {}
# Tell the new StreamServer about which IP address should only be
# allowed to connect to the new port. Note that a username/password
# restriction should not be allowed because maybe a stratum client will
# one day know to send one set of passwords before the
# "client.reconnect" message and a second set after the
# "client.reconnect" message. The only attack that I can think of is
# one where you're on the same IP address and you submit lots of stale
# shares and maybe this penalizes the real miner somehow?
data["ip_address_restriction"] = self.sock.getpeername()[0]
# Tell the new StreamServer about which pool to connect to and the
# allowable username and password.
# TODO: get this from a database, based on the currently authenticated
# user.
data["mining"] = {
"pool": ("0.0.0.0", "3334"),
"username": "username",
"password": "password",
}
# HACK: attach the application-specific data to the socket
#newsock.data = data
# nope.. doesn't work :( See functools.partial implementation below.
# build and start a StreamServer
newserver = gevent.server.StreamServer(newsock, functools.partial(self.server.handle_other_connection, data))
newserver.start()
logger.debug(
"Started a new server {} for miner at {}:{} to reconnect to.".format(
newserver.address,
self.address,
self.port,
)
)
# store the socket and StreamServer for later
self.server.newsocks.append(newsock)
self.server.proxy_servers.append(newserver)
# TODO: call socket.close() eventually?
# TODO: call stop() on newserver eventually (see StratumProxy.stop).
sockname = newsock.getsockname()
(address, port) = (sockname[0], sockname[1])
return (address, port)
def handle_stratum_message(self, data):
"""
The read loop encounters stratum protocol messages from the
miner client.
The stratum proxy protocol must look exactly like stratum to the miner
client. This means that the miner client must use the same
authorization protocol as if this was a pool server. After
authorization, and after connecting to the pool server, all messages
should be transmitted straight to the pool.
The miner client connects, sends the "mining.subscribe" command, then
sends the "mining.authorize" command. The server then responds with
"client.reconnect" to force the stratum miner client to reconnect to
another port.
The "client.reconnect" message is so that the miner client can connect
to a different port and start a unique session where it sends
"mining.subscribe" again and gets a separate "mining.subscribe"
response. This can't be avoided because each pool server will give a
separate "mining.subscribe" response, and you don't know which pool to
hook up the miner client to until the "mining.authorize" message, which
comes after the "mining.subscribe" message.
"""
if "method" in data:
method = data["method"].lower()
if method == "mining.subscribe":
# TODO: the values in this message might need to be tweaked,
# how should the values be picked?
# just send an expected string
logger.info("Received mining.subscribe request from {}".format(self.peer_name[0]))
message = {
"error": None,
"id": 1,
"result": [
["mining.notify", "ae6812eb4cd7735a302a8a9dd95cf71f"],
"f800002e",
4,
],
}
self.write_queue.put(json.dumps(message) + "\n")
elif method == "mining.authorize":
logger.debug("Received authorization message.")
# TODO: do authentication
authenticated = True
if authenticated:
self.authenticated = True
# new port to listen on for the mining client
(address, port) = self.start_proxy_client_server()
# Inform the miner client to reconnect on the port for that
# new StreamServer instance.
message = {
"method": "client.reconnect",
"id": None,
"params": [address, port],
}
self.write_queue.put(json.dumps(message) + "\n")
else:
self.authenticated = False
# TODO: use ERROR_UNAUTHORIZED
self.send_error()
# disconnect in either case
self._disconnected = True
elif method == "mining.submit":
logger.warn("Share submitted by {} but not connected to pool server: {}".format(self.peer_name[0], data))
self.send_error()
self._disconnected = True
else:
logger.warn("Unparsed method message from {} says: {}".format(self.peer_name[0], data))
self.send_error()
else:
logger.warn("Unparsed non-method message from {} says: {}".format(self.peer_name[0], data))
self.send_error()
class ProxyClient(StratumClient):
"""
Handle a connection from a miner client. Establish a connection with the
upstream pool server. Proxy messages between the miner client and the pool
server.
"""
def __init__(self, *args, **kwargs):
super(ProxyClient, self).__init__(*args, **kwargs)
# assume there's no problem connecting to the pool
self.pool_connection_error = False
self.pool_write_greenlet = None
self.pool_read_greenlet = None
self.pool_write_queue = gevent.queue.Queue()
# information from the last connection, like username, password, IP
# address restrictions, which mining pool to connect to, etc.
self.data = kwargs["data"]
def pool_write_loop(self):
"""
Send things to the pool. Monitor the connection to the pool. When the
connection is terminated, start shutting down the other connections.
"""
while True:
try:
for item in self.pool_write_queue:
self.poolfp.write(item)
self.poolfp.flush()
except socket.error:
logger.debug("Pool write loop terminated. Informing the read loops.")
# Inform the read loop, which needs to know about disconnection
# as well.
self._disconnected = True
# exit the while loop
break
except Exception:
logger.warn("Unhandled exception in pool write loop!", exc_info=True)
self._disconnected = True
# also exit the while loop
break
finally:
# This still gets executed even in the case of
# self._disconnected/break.
gevent.sleep(0)
def pool_read_loop(self):
"""
Read from the pool. Dump messages into the miner client write queue.
"""
while True:
# when there's no way to read data, don't bother
if self.poolfp.closed or self.poolsock.closed:
logger.debug("No way to read from the pool socket, exiting pool read loop.")
self._disconnected = True
break
if self._disconnected:
logger.debug("The pool read loop encountered flag from some write loop, exiting.")
break
timeout_seconds = 10
timeout_value = "timeout"
line = gevent.with_timeout(
timeout_seconds,
self.poolfp.readline,
timeout_value=timeout_value,
)
# be forgiving and continue if timeout was hit anyway
if line == timeout_value:
continue
if line == None or line == "":
self._disconnected = True
break
# pass the message along to the miner client
logger.debug("Proxy client {} received pool message: {}".format(self.id, line))
self.write_queue.put(line)
gevent.sleep(0)
def before_looping(self):
"""
Additional configuration and initialization to be executed prior to
starting the read and write loops.
Establish a connection to the upstream pool or stratum endpoint thing.
Really it can be any stratum endpoint, but I like to pretend that it is
probably a pool because that's easier to fit in my head.
"""
# make a new socket for the pool connection
self.poolsock = gevent.socket.socket()
try:
# read stored data from the last connection
pooltarget = self.data["mining"]["pool"]
logger.debug("Attempting to connect to pool: {}".format(pooltarget))
# connect to the pool
self.poolsock.connect(pooltarget)
# make a file descriptor thing
self.poolfp = self.poolsock.makefile()
except Exception:
# Can't just immediately disconnect because the miner client needs
# to be told that there was a problem.
logger.info("Couldn't connect to the remote mining server. Waiting to tell miner client.")
self.pool_connection_error = True
else:
# gevent.spawn() something to monitor the pool connection, when
# the connection is terminated please inform some other variables and
# probably set self._disconnected or something.
self.pool_write_greenlet = gevent.spawn(self.pool_write_loop)
self.pool_read_greenlet = gevent.spawn(self.pool_read_loop)
def after_looping(self):
"""
Additional cleanup that might be required after ending the read/write
loops with the locally connected miner client.
Close the connection to the upstream pool.
"""
if self.pool_write_greenlet:
self.pool_write_greenlet.kill()
if self.pool_read_greenlet:
self.pool_read_greenlet.kill()
try:
self.poolsock.shutdown(socket.SHUT_RDWR)
except socket.error:
logger.debug("Couldn't shutdown the pool socket with SHUT_RDWR.")
try:
self.poolfp.close()
self.poolsock.close()
except (socket.error, AttributeError):
logger.debug("Couldn't close the pool socket.")
def handle_stratum_message(self, data):
"""
Replace username/password authorization details with details from
memory or database. Otherwise pass-through all messages straight to the
other side.
"""
# TODO: strip username/password and use values from memory (self.data)
self.pool_write_queue.put(json.dumps(data) + "\n")
if __name__ == "__main__":
stratum_proxy = StratumProxy()
server = gevent.server.StreamServer((PROXY_ADDRESS, PROXY_PORT), stratum_proxy.handle_authentication_connection)
#server.serve_forever()
server.start()
gevent.wait()
# TODO: The server is never killed because nothing has a reference to it
# (and therefore can't call server.kill()).
# TODO: is this ever called? Use spawn to detect when it's time to kill all
# the stream servers.
stratum_proxy.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment