-
-
Save kanzure/ee33ad55a98ad45283d3 to your computer and use it in GitHub Desktop.
python stratum proxy for cryptocurrency mining
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple python stratum proxy for connecting multiple miners to multiple pool | |
servers. | |
There are two steps for handling a miner client, corresponding to two classes | |
(AuthenticationStratumClient and ProxyClient). The first step is to handle | |
authentication from the miner client to the proxy server. This involves the | |
regular stratum protocol and using the "client.reconnect" message. The second | |
step is when the miner client connects to the second port, which is a | |
pass-through connection to the correct mining pool (using the ProxyClient class). | |
stratum: | |
https://github.com/slush0/stratum | |
http://mining.bitcoin.cz/stratum-mining | |
http://diyhpl.us/~bryan/papers2/bitcoin/Stratum-networkprotocolspec.pdf | |
relevant: | |
https://github.com/CaptEmulation/stratum-proxy | |
https://github.com/CaptEmulation/stratum-proxy/issues/1 | |
mining: | |
https://github.com/zone117x/node-stratum-pool | |
https://github.com/slush0/stratum-mining | |
generic gevent stuff: | |
http://sdiehl.github.io/gevent-tutorial/ | |
http://www.gevent.org/intro.html#example | |
http://www.gevent.org/servers.html | |
""" | |
# TODO: Make authentication use a real username/password. Each username should | |
# be associated with different mining pool configuration settings. | |
# TODO: Authenticate users and extract information against a database (probably | |
# postgresql). | |
# TODO: record all messages to database for replay and debugging | |
# TODO: store logs somewhere | |
# TODO: pool_read_loop has no way of knowing if the connection to the pool has | |
# been killed until the miner client writes. So a connection to a miner will | |
# stay open until the miner sends something, at which point the connection is | |
# determined to be broken. There may be a way to send intermediate messages to | |
# the pool over the connection, and then filter out the responses and don't | |
# show them to the miner client. This way, the server will know when the | |
# connection is closed. | |
# TODO: Make each class have its own type of logger. Every logging message | |
# should be from "self.log" or something, and it should include enough | |
# information to identify which miner (address, port), which bound address and | |
# port, which port is being used to connect to the pool server, which pool | |
# server is being connected to (address, port), and the internal IDs for those | |
# objects. | |
# TODO: ProxyClient should handle "client.reconnect" messages from the upstream | |
# stratum server. Don't pass these along to the miner client. | |
import functools | |
import time | |
import logging | |
import json | |
import socket | |
import gevent | |
import gevent.server | |
import gevent.queue | |
import gevent.socket | |
PROXY_ADDRESS = "0.0.0.0" | |
PROXY_PORT = 3333 | |
# TODO: don't use a global logger, put it in a function, etc. | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s") | |
logger = logging.getLogger(__name__) | |
class StratumProxy(object): | |
""" | |
Basic server for handling incoming stratum connections. Every incoming | |
stratum connection is subjected to authentication and is then told to | |
reconnect to another port. The authentication is handled by | |
AuthenticationStratumClient and everything else is handled by ProxyClient. | |
""" | |
def __init__(self): | |
# connected clients | |
self.clients = [] | |
self.client_counter = 0 | |
self.proxy_clients = [] | |
self.proxy_client_counter = 0 | |
self.newsocks = [] | |
# place to put StreamServers | |
self.proxy_servers = [] | |
def handle_authentication_connection(self, sock, address): | |
""" | |
Handle connections for the gevent.server.StreamServer instance. The | |
incoming socket connection is from the miner, since pools don't | |
initiate outgoing connections. This is only for | |
AuthenticationStratumClient connections. | |
:param sock: standard socket object | |
:param address: (ip address (str), port (int)) | |
""" | |
logger.info("New connection from {0} port {1}".format(address[0], address[1])) | |
# make the client handle the connection stuff | |
client = AuthenticationStratumClient( | |
sock=sock, | |
address=address[0], | |
port=address[1], | |
server=self, | |
id=self.client_counter, | |
) | |
self.clients.append(client) | |
self.client_counter += 1 | |
try: | |
client.run_forever() | |
except Exception: | |
logger.error("Unhandled exception!", exc_info=True) | |
finally: | |
logger.info("Server removing miner client {}".format(client)) | |
# The client handler is responsible for shutting down itself in a | |
# sane manner inside the run_forever method. Here is only a small | |
# line to handle global cleanup. | |
self.clients.remove(client) | |
try: | |
sock.close() | |
except socket.error: | |
logger.debug("Exception encountered when trying to shut down a socket.") | |
def handle_other_connection(self, data, sock, address): | |
""" | |
Handle a miner client connecting to another port (the actual proxy | |
port). Handle the connection with a ProxyClient instance. | |
""" | |
sock_port = sock.getsockname()[1] | |
ip_address = address[0] | |
port = address[1] | |
logger.info("New proxy connection to port {0} from {1} port {2}".format(sock_port, ip_address, port)) | |
proxy_client = ProxyClient( | |
sock=sock, | |
address=ip_address, | |
port=port, | |
server=self, | |
id=self.proxy_client_counter, | |
data=data, | |
) | |
self.proxy_clients.append(proxy_client) | |
self.proxy_client_counter += 1 | |
# Make a reference so that a stop method can find how to set | |
# _disconnect. | |
sock.proxy_client = proxy_client | |
try: | |
proxy_client.run_forever() | |
except Exception: | |
logger.error("Unhandled exception!", exc_info=True) | |
finally: | |
logger.info("Server removing miner proxy client {}".format(proxy_client)) | |
# The client handler is responsible for shutting down itself in a | |
# sane manner inside the run_forever method. Here is only a small | |
# line to handle global cleanup. | |
self.proxy_clients.remove(proxy_client) | |
try: | |
sock.close() | |
except socket.error: | |
logger.debug("Exception encountered when trying to shut down a socket.") | |
# StreamServer isn't dead. A miner could connect back to this port | |
# again and another pool connection will be established. | |
def stop(self): | |
""" | |
Kill the StreamServer objects. | |
""" | |
for proxy_server in self.proxy_servers: | |
try: | |
logger.debug("Attempting to stop proxy server {}".format(proxy_server)) | |
# try to get it to stop on its own | |
proxy_server.socket.proxy_client._disconnected = True | |
gevent.sleep(0) | |
logger.debug("Continuing with attempt to stop proxy server {}".format(proxy_server)) | |
# TODO: what about the "fp" from socket.makefile()? | |
# close the socket | |
proxy_server.socket.close() | |
# Timeout is in seconds, default is 1 second and will wait for | |
# currently running handlers in the pool to be killed. | |
proxy_server.stop(timeout=0) | |
except Exception: | |
logger.debug("Unhandled exception while attempting to kill proxy server {}".format(proxy_server)) | |
class StratumClient(object): | |
ERROR_OTHER = 20 | |
ERROR_JOB_NOT_FOUND = 21 | |
ERROR_DUPLICATE_SHARE = 22 | |
ERROR_LOW_DIFFICULTY_SHARE = 23 | |
ERROR_UNAUTHORIZED = 24 | |
ERROR_NOT_SUBSCRIBED = 25 | |
errors = { | |
ERROR_OTHER: "Other/Unknown", | |
ERROR_UNAUTHORIZED: "Unauthorized worker", | |
ERROR_NOT_SUBSCRIBED: "Not subscribed", | |
# these stratum errors probably shouldn't be generated by this proxy | |
ERROR_JOB_NOT_FOUND: "Job not found (stale)", | |
ERROR_DUPLICATE_SHARE: "Duplicate share", | |
ERROR_LOW_DIFFICULTY_SHARE: "Low difficulty share", | |
} | |
def __init__(self, sock, address, port, server, id, **kwarg): | |
self.id = id | |
self.sock = sock | |
self.address = address | |
self.port = port | |
self.server = server | |
self.connection_initiation_time = int(time.time()) | |
# Cheap trick for informing the read loop that the write loop | |
# encountered a broken socket connection. | |
self._disconnected = False | |
# not all connecting miners will have a good username/password | |
self.authenticated = False | |
self.subscribed = False | |
# result of socket getpeername | |
self.peer_name = None | |
# socket connection goes here | |
self.fp = None | |
# for the write loop | |
self.write_queue = gevent.queue.Queue() | |
# just some greenlets... | |
self.write_greenlet = None | |
def configure_socket(self): | |
""" | |
Configure the socket connection with some basic settings. | |
""" | |
# seconds before sending keepalive probes | |
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 120) | |
# interval in seconds between keepalive probes | |
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1) | |
# failed keepalive probles before declaring other end dead | |
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5) | |
self.peer_name = self.sock.getpeername() | |
self.fp = self.sock.makefile() | |
def before_looping(self): | |
""" | |
Additional configuration and initialization to be executed prior to | |
starting the read and write loops. | |
""" | |
pass | |
def after_looping(self): | |
""" | |
Additional cleanup that might be required after ending the read/write | |
loops with the locally connected miner client. | |
""" | |
pass | |
def send_error(self, number=20, id=1): | |
""" | |
Add error message to the write queue. The write loop will dump the | |
information over the connection. | |
""" | |
error = { | |
"id": id, | |
"result": None, | |
"error": (number, self.errors[number], None), | |
} | |
logger.warn("Error number {} on ip {}".format(number, self.peer_name[0])) | |
message = json.dumps(error, separators=(",", ":")) | |
message += "\n" # stratum reasons | |
self.write_queue.put(message) | |
def write_loop(self): | |
while True: | |
try: | |
# write messages to the miner | |
for item in self.write_queue: | |
self.fp.write(item) | |
self.fp.flush() | |
except socket.error: | |
logger.debug("Write loop terminated. Informing the read loop.") | |
# Inform the read loop, which needs to know about disconnection | |
# as well. | |
self._disconnected = True | |
# exit the while loop | |
break | |
except Exception: | |
logger.warn("Unhandled exception in write loop!", exc_info=True) | |
self._disconnected = True | |
# also exit the while loop | |
break | |
finally: | |
# This still gets executed even in the case of | |
# self._disconnected/break. | |
gevent.sleep(0) | |
def read_loop(self): | |
while True: | |
if self._disconnected: | |
logger.debug("Read loop encountered flag from write loop, exiting.") | |
break | |
timeout_seconds = 300 | |
timeout_value = "timeout" | |
line = gevent.with_timeout( | |
timeout_seconds, | |
self.fp.readline, | |
timeout_value=timeout_value, | |
) | |
# be forgiving and continue if timeout was hit anyway | |
if line == timeout_value: | |
# mining pools probably do vardiff adjustment stuff here | |
continue | |
# stratum protocol includes \n at the end of every line | |
line = line.strip() | |
# empty messages will end the connection | |
if not len(line): | |
logger.debug("Socket data was empty, throwing the connection.") | |
break | |
try: | |
data = json.loads(line) | |
except ValueError: | |
logger.warn("not json: {}".format(line)) | |
# communicate the error to the client | |
self.send_error() | |
# be forgiving and wait for the next message | |
continue | |
logger.debug("Client {} received data: {}".format(self.id, data)) | |
self.handle_stratum_message(data) | |
gevent.sleep(0) | |
def run_forever(self): | |
""" | |
gevent coroutine main loop stuff goes here | |
""" | |
try: | |
# setup the socket connection | |
self.configure_socket() | |
self.before_looping() | |
# start writing down the socket | |
self.write_greenlet = gevent.spawn(self.write_loop) | |
# Inform the miner client that there was a problem connecting to | |
# the pool. | |
if hasattr(self, "pool_connection_error") and self.pool_connection_error: | |
logger.info("Informing the miner client about the failure to connect to the pool.") | |
message = { | |
"error": [82001, "The proxy couldn't establish a connection to the pool.", None], | |
"id": None, | |
"result": False, | |
} | |
self.write_queue.put(json.dumps(message) + "\n") | |
self._disconnected = True | |
gevent.sleep(0) | |
# start reading from the socket | |
self.read_loop() | |
except socket.error: | |
logger.debug("Socket error closing connection", exc_info=True) | |
except Exception: | |
logger.error("Unhandled exception!", exc_info=True) | |
finally: | |
logger.info("Closing the connection.") | |
if self.write_greenlet: | |
self.write_greenlet.kill() | |
try: | |
self.sock.shutdown(socket.SHUT_RDWR) | |
except socket.error: | |
logger.debug("Couldn't shutdown the socket with SHUT_RDWR.") | |
try: | |
self.fp.close() | |
self.sock.close() | |
except (socket.error, AttributeError): | |
logger.debug("Couldn't close the socket.") | |
try: | |
self.after_looping() | |
except Exception: | |
logger.debug("Unhandled exception in after_looping (during shutdown).", exc_info=True) | |
return | |
class AuthenticationStratumClient(StratumClient): | |
""" | |
Handle stratum miner authentication and tell the miner client to reconnect | |
to another port automatically. Deploys the appropriate proxy server stuff | |
to listen on the other port. | |
""" | |
def start_proxy_client_server(self): | |
""" | |
Start a new StreamServer for handling a connection from the miner | |
client and a connection to a mining pool. | |
:return: (address, port) | |
""" | |
# bind a new socket to a new port | |
newsock = gevent.socket.socket() | |
newsock.bind(("0.0.0.0", 0)) | |
newsock.listen(0) | |
# Information about which mining pool to connect to could be attached | |
# to the socket here. Alternatively, it could be passed in by wrapping | |
# self.server.handle_other_connection but that seems a tiny bit worse. | |
# Oops... looks like the wrapper is needed. :( The data was not | |
# preserved on the socket object. | |
data = {} | |
# Tell the new StreamServer about which IP address should only be | |
# allowed to connect to the new port. Note that a username/password | |
# restriction should not be allowed because maybe a stratum client will | |
# one day know to send one set of passwords before the | |
# "client.reconnect" message and a second set after the | |
# "client.reconnect" message. The only attack that I can think of is | |
# one where you're on the same IP address and you submit lots of stale | |
# shares and maybe this penalizes the real miner somehow? | |
data["ip_address_restriction"] = self.sock.getpeername()[0] | |
# Tell the new StreamServer about which pool to connect to and the | |
# allowable username and password. | |
# TODO: get this from a database, based on the currently authenticated | |
# user. | |
data["mining"] = { | |
"pool": ("0.0.0.0", "3334"), | |
"username": "username", | |
"password": "password", | |
} | |
# HACK: attach the application-specific data to the socket | |
#newsock.data = data | |
# nope.. doesn't work :( See functools.partial implementation below. | |
# build and start a StreamServer | |
newserver = gevent.server.StreamServer(newsock, functools.partial(self.server.handle_other_connection, data)) | |
newserver.start() | |
logger.debug( | |
"Started a new server {} for miner at {}:{} to reconnect to.".format( | |
newserver.address, | |
self.address, | |
self.port, | |
) | |
) | |
# store the socket and StreamServer for later | |
self.server.newsocks.append(newsock) | |
self.server.proxy_servers.append(newserver) | |
# TODO: call socket.close() eventually? | |
# TODO: call stop() on newserver eventually (see StratumProxy.stop). | |
sockname = newsock.getsockname() | |
(address, port) = (sockname[0], sockname[1]) | |
return (address, port) | |
def handle_stratum_message(self, data): | |
""" | |
The read loop encounters stratum protocol messages from the | |
miner client. | |
The stratum proxy protocol must look exactly like stratum to the miner | |
client. This means that the miner client must use the same | |
authorization protocol as if this was a pool server. After | |
authorization, and after connecting to the pool server, all messages | |
should be transmitted straight to the pool. | |
The miner client connects, sends the "mining.subscribe" command, then | |
sends the "mining.authorize" command. The server then responds with | |
"client.reconnect" to force the stratum miner client to reconnect to | |
another port. | |
The "client.reconnect" message is so that the miner client can connect | |
to a different port and start a unique session where it sends | |
"mining.subscribe" again and gets a separate "mining.subscribe" | |
response. This can't be avoided because each pool server will give a | |
separate "mining.subscribe" response, and you don't know which pool to | |
hook up the miner client to until the "mining.authorize" message, which | |
comes after the "mining.subscribe" message. | |
""" | |
if "method" in data: | |
method = data["method"].lower() | |
if method == "mining.subscribe": | |
# TODO: the values in this message might need to be tweaked, | |
# how should the values be picked? | |
# just send an expected string | |
logger.info("Received mining.subscribe request from {}".format(self.peer_name[0])) | |
message = { | |
"error": None, | |
"id": 1, | |
"result": [ | |
["mining.notify", "ae6812eb4cd7735a302a8a9dd95cf71f"], | |
"f800002e", | |
4, | |
], | |
} | |
self.write_queue.put(json.dumps(message) + "\n") | |
elif method == "mining.authorize": | |
logger.debug("Received authorization message.") | |
# TODO: do authentication | |
authenticated = True | |
if authenticated: | |
self.authenticated = True | |
# new port to listen on for the mining client | |
(address, port) = self.start_proxy_client_server() | |
# Inform the miner client to reconnect on the port for that | |
# new StreamServer instance. | |
message = { | |
"method": "client.reconnect", | |
"id": None, | |
"params": [address, port], | |
} | |
self.write_queue.put(json.dumps(message) + "\n") | |
else: | |
self.authenticated = False | |
# TODO: use ERROR_UNAUTHORIZED | |
self.send_error() | |
# disconnect in either case | |
self._disconnected = True | |
elif method == "mining.submit": | |
logger.warn("Share submitted by {} but not connected to pool server: {}".format(self.peer_name[0], data)) | |
self.send_error() | |
self._disconnected = True | |
else: | |
logger.warn("Unparsed method message from {} says: {}".format(self.peer_name[0], data)) | |
self.send_error() | |
else: | |
logger.warn("Unparsed non-method message from {} says: {}".format(self.peer_name[0], data)) | |
self.send_error() | |
class ProxyClient(StratumClient): | |
""" | |
Handle a connection from a miner client. Establish a connection with the | |
upstream pool server. Proxy messages between the miner client and the pool | |
server. | |
""" | |
def __init__(self, *args, **kwargs): | |
super(ProxyClient, self).__init__(*args, **kwargs) | |
# assume there's no problem connecting to the pool | |
self.pool_connection_error = False | |
self.pool_write_greenlet = None | |
self.pool_read_greenlet = None | |
self.pool_write_queue = gevent.queue.Queue() | |
# information from the last connection, like username, password, IP | |
# address restrictions, which mining pool to connect to, etc. | |
self.data = kwargs["data"] | |
def pool_write_loop(self): | |
""" | |
Send things to the pool. Monitor the connection to the pool. When the | |
connection is terminated, start shutting down the other connections. | |
""" | |
while True: | |
try: | |
for item in self.pool_write_queue: | |
self.poolfp.write(item) | |
self.poolfp.flush() | |
except socket.error: | |
logger.debug("Pool write loop terminated. Informing the read loops.") | |
# Inform the read loop, which needs to know about disconnection | |
# as well. | |
self._disconnected = True | |
# exit the while loop | |
break | |
except Exception: | |
logger.warn("Unhandled exception in pool write loop!", exc_info=True) | |
self._disconnected = True | |
# also exit the while loop | |
break | |
finally: | |
# This still gets executed even in the case of | |
# self._disconnected/break. | |
gevent.sleep(0) | |
def pool_read_loop(self): | |
""" | |
Read from the pool. Dump messages into the miner client write queue. | |
""" | |
while True: | |
# when there's no way to read data, don't bother | |
if self.poolfp.closed or self.poolsock.closed: | |
logger.debug("No way to read from the pool socket, exiting pool read loop.") | |
self._disconnected = True | |
break | |
if self._disconnected: | |
logger.debug("The pool read loop encountered flag from some write loop, exiting.") | |
break | |
timeout_seconds = 10 | |
timeout_value = "timeout" | |
line = gevent.with_timeout( | |
timeout_seconds, | |
self.poolfp.readline, | |
timeout_value=timeout_value, | |
) | |
# be forgiving and continue if timeout was hit anyway | |
if line == timeout_value: | |
continue | |
if line == None or line == "": | |
self._disconnected = True | |
break | |
# pass the message along to the miner client | |
logger.debug("Proxy client {} received pool message: {}".format(self.id, line)) | |
self.write_queue.put(line) | |
gevent.sleep(0) | |
def before_looping(self): | |
""" | |
Additional configuration and initialization to be executed prior to | |
starting the read and write loops. | |
Establish a connection to the upstream pool or stratum endpoint thing. | |
Really it can be any stratum endpoint, but I like to pretend that it is | |
probably a pool because that's easier to fit in my head. | |
""" | |
# make a new socket for the pool connection | |
self.poolsock = gevent.socket.socket() | |
try: | |
# read stored data from the last connection | |
pooltarget = self.data["mining"]["pool"] | |
logger.debug("Attempting to connect to pool: {}".format(pooltarget)) | |
# connect to the pool | |
self.poolsock.connect(pooltarget) | |
# make a file descriptor thing | |
self.poolfp = self.poolsock.makefile() | |
except Exception: | |
# Can't just immediately disconnect because the miner client needs | |
# to be told that there was a problem. | |
logger.info("Couldn't connect to the remote mining server. Waiting to tell miner client.") | |
self.pool_connection_error = True | |
else: | |
# gevent.spawn() something to monitor the pool connection, when | |
# the connection is terminated please inform some other variables and | |
# probably set self._disconnected or something. | |
self.pool_write_greenlet = gevent.spawn(self.pool_write_loop) | |
self.pool_read_greenlet = gevent.spawn(self.pool_read_loop) | |
def after_looping(self): | |
""" | |
Additional cleanup that might be required after ending the read/write | |
loops with the locally connected miner client. | |
Close the connection to the upstream pool. | |
""" | |
if self.pool_write_greenlet: | |
self.pool_write_greenlet.kill() | |
if self.pool_read_greenlet: | |
self.pool_read_greenlet.kill() | |
try: | |
self.poolsock.shutdown(socket.SHUT_RDWR) | |
except socket.error: | |
logger.debug("Couldn't shutdown the pool socket with SHUT_RDWR.") | |
try: | |
self.poolfp.close() | |
self.poolsock.close() | |
except (socket.error, AttributeError): | |
logger.debug("Couldn't close the pool socket.") | |
def handle_stratum_message(self, data): | |
""" | |
Replace username/password authorization details with details from | |
memory or database. Otherwise pass-through all messages straight to the | |
other side. | |
""" | |
# TODO: strip username/password and use values from memory (self.data) | |
self.pool_write_queue.put(json.dumps(data) + "\n") | |
if __name__ == "__main__": | |
stratum_proxy = StratumProxy() | |
server = gevent.server.StreamServer((PROXY_ADDRESS, PROXY_PORT), stratum_proxy.handle_authentication_connection) | |
#server.serve_forever() | |
server.start() | |
gevent.wait() | |
# TODO: The server is never killed because nothing has a reference to it | |
# (and therefore can't call server.kill()). | |
# TODO: is this ever called? Use spawn to detect when it's time to kill all | |
# the stream servers. | |
stratum_proxy.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment