Skip to content

Instantly share code, notes, and snippets.

@robert-zaremba
Created June 22, 2011 13:13
Show Gist options
  • Save robert-zaremba/1040068 to your computer and use it in GitHub Desktop.
Save robert-zaremba/1040068 to your computer and use it in GitHub Desktop.
Rpyc ssl connection bug. Files presenting the issue.
# Author: Robert Zaremba
# Date: 2011.06.22
# This is the code to present rpyc bug
# The bug comes when one wants to manage 1017 sequenced connections to rpyc.utils.server.ThreadedServer with SSL Authenticator.
# the errors are:
# ssl.SSLError: [Errno 8] _ssl.c:499: EOF occurred in violation of protocol trying 1016 connection request
# or ssl.SSLError: [Errno 185090050] _ssl.c:336: error:0B084002:x509 certificate routines:X509_load_cert_crl_file:system lib trying 1017 connection request
#
# The code below contains ssl_connect function with bugfix to CERT_REQIURED flag
# so the ssl certificate is required now.
# see #43 Issue for more explanation (https://github.com/tomerfiliba/rpyc/issues/43)
# Then there are 2 classes to manage connection:
# * Interface - it reconnect every each send
# * SenderNotReconnecting - it keep connectino object for next send request.
#
# Then there is a service class to connect to
# And the las one is the test function which demonstrate problem
# - it comes when one wants to send 2000 messages to MyService object.
#
# In the repository there are 2 additional files required by this test:
# t_cert.pem, t_cert_key.pem - contains certificate and private key for ssl
# and one optional:
# connection_test_simple.py - presenting that this issue is rpyc side, not ssl sntadard module side
#
import rpyc
from rpyc.utils.authenticators import SSLAuthenticator
from rpyc.utils.server import ThreadedServer
import time
import threading
from Queue import Queue
import ssl
import socket
import logging
logger = logging.getLogger('root')
handler = logging.StreamHandler()
handler.setLevel(0)
handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s -\t%(pathname)s[%(lineno)d]:%(funcName)s():\t%(msg)s")
)
logger.addHandler(handler)
def ssl_connect(host, port, keyfile = None, certfile = None, ca_certs = None,
ssl_version = None, service = rpyc.VoidService, config = {}):
"""creates an SSL-wrapped connection to the given host (encrypted and
authenticated).
host - the hostname to connect to
port - the TCP port to use
service - the local service to expose (defaults to Void)
config - configuration dict
keyfile, certfile, ca_certs, ssl_version -- arguments to ssl.wrap_socket.
see that module's documentation for further info."""
kwargs = {"server_side" : False}
if keyfile:
kwargs["keyfile"] = keyfile
if certfile:
kwargs["certfile"] = certfile
if ca_certs:
kwargs["ca_certs"] = ca_certs
kwargs["cert_reqs"] = ssl.CERT_REQUIRED # here is the change. This flag is not set in official rpyc 3.1 verion
if ssl_version:
kwargs["ssl_version"] = ssl_version
else:
kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1
s = rpyc.SocketStream.ssl_connect(host, port, kwargs)
return rpyc.Connection(service, rpyc.Channel(s), config = config)
# Overwrite SSL connect method from rpyc module, because it contains a bug.
rpyc.ssl_connect = ssl_connect
class Interface(object):
def __init__(self, host, port, *args ):
self.service_address = (host, port)
self.out_conn = None
self._inbox = Queue()
self.connected = False
def connect(self):
logger.info('connecting')
self.out_conn = rpyc.ssl_connect(self.service_address[0], self.service_address[1], ca_certs = "t_cert.pem" )
self.connected = True
def close_connection(self):
self.out_conn.close()
self.connected = False
def send(self, msg):
self.connect()
self.out_conn.root.inbox(msg)
self.close_connection()
class SenderNotReconnecting(Interface):
def send(self, *msg):
if not self.connected:
self.connect()
self.connected = True
self.out_conn.root.inbox(msg)
class MyService(rpyc.Service):
exposed_name = "MyService"
ALIASES = ['AAA']
test_storage = Queue()
def exposed_inbox(self, msg):
self.test_storage.put(msg)
def exposed_check(self, num):
ret_val = self.test_storage.qsize() == num
while not self.test_storage.empty():
q.pop()
q.task_done()
def on_connect(self):
pass
def on_disconnect(self):
pass
def test():
#logger.setLevel(50)
port = 18871
myservice_server = ThreadedServer(MyService, port = port, authenticator = SSLAuthenticator(keyfile = "t_cert_key.pem",
certfile = "t_cert.pem"))
th = threading.Thread(target = myservice_server.start) # put service to separate thread to perform parallel other code
th.start()
time.sleep(.4) # wait for server
for test_group in range(100):
print 'start group test:', test_group
#cls = SenderNotReconnecting
cls = Interface
interface = cls('localhost', port)
TEST_AMOUNT = 200
for i in xrange(TEST_AMOUNT):
interface.send((test_group, i))
if test_group >= 5:
print " <debug print>: ",test_group, i
start = time.time()
for i in xrange(TEST_AMOUNT):
msg = MyService.test_storage.get()
if test_group >= 5:
print msg
print 'end group test, time:', time.time()- start
if interface.connected:
interface.close_connection()
myservice_server.close()
if __name__ == "__main__":
test()
# Author: Robert Zaremba
# Date: 2011.06.22
# This is additional file to connection_test.py
# It presents that the bug on ssl connection is rpyc side, not ssl sntadard module side.
# Here we perform the same task like in connection_test.test, but without rpyc machinery
import socket, ssl
import threading, time
from Queue import Queue
import logging as logger
class Interface(object):
def __init__(self, host, port, *args ):
self.service_address = (host, port)
self.out_conn = None
self.active = True
self.connected = False
def connect(self):
logger.info('connecting')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssl_sock = ssl.wrap_socket(s, ca_certs="t_cert.pem",cert_reqs=ssl.CERT_REQUIRED)
ssl_sock.connect(self.service_address)
self.out_conn = ssl_sock
self.connected = True
def close_connection(self):
self.out_conn.close()
self.connected = False
def send(self, msg):
self.connect()
#print ' sending', msg
self.out_conn.write(msg)
self.close_connection()
## this is simple server presentation from official python docs
## http://docs.python.org/library/ssl.html?highlight=ssl#server-side-operation
class Server(threading.Thread):
def __init__(self, host, port):
super(Server, self).__init__()
self.address = (host, port)
self.active = True
self.data_num = 0
self.conn_num = 0
def run(self):
def deal_with_client(connstream):
#print '\n new connection made', connstream.getpeername()
data = connstream.read()
# null data means the client is finished with us
while data:
self.data_num += 1
if not do_something(connstream, data):
# we'll assume do_something returns False
# when we're finished with client
break
data = connstream.read()
# finished with client
connstream.close()
def do_something(connstream, data):
#print "received:", data
if data == "close":
self.active = False
return False
return True
bindsocket = socket.socket()
bindsocket.bind(self.address)
bindsocket.listen(5)
while self.active:
self.conn_num += 1
newsocket, fromaddr = bindsocket.accept()
connstream = ssl.wrap_socket(newsocket,
server_side=True,
certfile = "t_cert.pem",
keyfile = "t_cert_key.pem",
ssl_version=ssl.PROTOCOL_TLSv1)
deal_with_client(connstream)
def test():
#logger.setLevel(50)
port = 18874
myservice_server = Server('localhost', port)
myservice_server.daemon = True
myservice_server.start()
time.sleep(.4) # wait for server
for test_group in range(10):
print 'start group test:', test_group
interface = Interface('localhost', port)
TEST_AMOUNT = 200
start = time.time()
for i in xrange(TEST_AMOUNT):
interface.send("{} {}".format(test_group, i))
print 'end group test, time:', time.time()- start
if interface.connected:
interface.close_connection()
print 'close'
interface.send("close")
time.sleep(1)
print "\n\n ==========="
print "server stopped"
print "connections number:", myservice_server.conn_num
print "data transfered:", myservice_server.data_num
if __name__ == "__main__":
test()
-----BEGIN CERTIFICATE-----
MIICmDCCAgGgAwIBAgIJANIOKVLrk84AMA0GCSqGSIb3DQEBBQUAMGUxCzAJBgNV
BAYTAnBsMRAwDgYDVQQIDAdXcm9jbGF3MRAwDgYDVQQHDAdXcm9jbGF3MSEwHwYD
VQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBlJvYmVydDAe
Fw0xMTA2MjAxMjExNDNaFw0xMjA2MTkxMjExNDNaMGUxCzAJBgNVBAYTAnBsMRAw
DgYDVQQIDAdXcm9jbGF3MRAwDgYDVQQHDAdXcm9jbGF3MSEwHwYDVQQKDBhJbnRl
cm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBlJvYmVydDCBnzANBgkqhkiG
9w0BAQEFAAOBjQAwgYkCgYEAr9kZTYlWJ4rKt1MCEGRm3XfTMyDJJAn2V80AfFNS
hO0e/HASSTnrXwdXPOVc9xGpI55Wof/9ntg4eUaA1BcojZTvtR+JOerpWfZ2sgKD
CWAyI7C1tC10lzJ921Efooqqt6Jei8T0l0liG5kWuCDKoqHnHggiCVl08f7sVmCS
B1UCAwEAAaNQME4wHQYDVR0OBBYEFCC2s3kO/ZvN1FRNHliAAx40CD/2MB8GA1Ud
IwQYMBaAFCC2s3kO/ZvN1FRNHliAAx40CD/2MAwGA1UdEwQFMAMBAf8wDQYJKoZI
hvcNAQEFBQADgYEABfuXCqRclj2995PcL4h89KzhEFCaOXxfhYte6nJKiXdmELd6
xY9N2a334UNhr5UOuKz9HLgn7z6DI2IYNKRkyrlokcjldCTKqv7LAieXIVF/kQTs
Iy6N213BvAVOKUuNgmG/SuFNqIROm8786+Bw72LlGlcS9iAL/TN2+W2Iy7E=
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAK/ZGU2JVieKyrdT
AhBkZt130zMgySQJ9lfNAHxTUoTtHvxwEkk5618HVzzlXPcRqSOeVqH//Z7YOHlG
gNQXKI2U77UfiTnq6Vn2drICgwlgMiOwtbQtdJcyfdtRH6KKqreiXovE9JdJYhuZ
FrggyqKh5x4IIglZdPH+7FZgkgdVAgMBAAECgYA6RVGqLOlcvxQ/vYUAk5S2rN9S
oPmmNJrqK2eOyzmcDMto0WZt14CUg+U3q7WrtbF7uRtyIVcUzQehwGguX+Iyw7tu
DxGBqoCPcsqJlbBPeVbH4HtuDdEcyYD0p5s27sxVr6c+zlxKRRS8XDzpERyiZU3v
ZN8qWFw+FzRWXDXvvQJBAN03HZ5SoEe6Z/+MtOnJm7L3QznexoDLDtugkuPh0Kdd
V7r4rWiRRWHbD4WyMgNioFyBJw2+v5715K1PYTq8C7MCQQDLf8CmmSCcJK2sJ5mg
c92SkAHYQIHYwlnaGmznafKigyJMyY3mkNZ3xvfiGpYBPIpgRWLjQNE8th7IiYY6
YPzXAkAJ9EHz7tnhgGTPngBCCN1ltzSiNQEPqJN5lIcnk/C0p0GcWzZKX0cU+SuA
o50Wg+idYP5l4vBycbQhjFHet7l7AkAmsIdEJWDZzu9mB7FhnCXIM7DmGH2XZHwI
x3VlGfOTijL/PmLIL0lXRHDkgAF9ArGcVBTU+AHP9SAtKFhoyP/JAkEAzbeAvAQP
4xabJUx9Wf3JFXqqkMQo21cxbWxGP1WyUg+ULUZ9ZOJOM4WyS9KMiXcnl0KYDfGN
lIGsvvTM9Bwt+w==
-----END PRIVATE KEY-----
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment