Created
June 22, 2011 13:13
-
-
Save robert-zaremba/1040068 to your computer and use it in GitHub Desktop.
Rpyc ssl connection bug. Files presenting the issue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Robert Zaremba | |
# Date: 2011.06.22 | |
# This is the code to present rpyc bug | |
# The bug comes when one wants to manage 1017 sequenced connections to rpyc.utils.server.ThreadedServer with SSL Authenticator. | |
# the errors are: | |
# ssl.SSLError: [Errno 8] _ssl.c:499: EOF occurred in violation of protocol trying 1016 connection request | |
# or ssl.SSLError: [Errno 185090050] _ssl.c:336: error:0B084002:x509 certificate routines:X509_load_cert_crl_file:system lib trying 1017 connection request | |
# | |
# The code below contains ssl_connect function with bugfix to CERT_REQIURED flag | |
# so the ssl certificate is required now. | |
# see #43 Issue for more explanation (https://github.com/tomerfiliba/rpyc/issues/43) | |
# Then there are 2 classes to manage connection: | |
# * Interface - it reconnect every each send | |
# * SenderNotReconnecting - it keep connectino object for next send request. | |
# | |
# Then there is a service class to connect to | |
# And the las one is the test function which demonstrate problem | |
# - it comes when one wants to send 2000 messages to MyService object. | |
# | |
# In the repository there are 2 additional files required by this test: | |
# t_cert.pem, t_cert_key.pem - contains certificate and private key for ssl | |
# and one optional: | |
# connection_test_simple.py - presenting that this issue is rpyc side, not ssl sntadard module side | |
# | |
import rpyc | |
from rpyc.utils.authenticators import SSLAuthenticator | |
from rpyc.utils.server import ThreadedServer | |
import time | |
import threading | |
from Queue import Queue | |
import ssl | |
import socket | |
import logging | |
logger = logging.getLogger('root') | |
handler = logging.StreamHandler() | |
handler.setLevel(0) | |
handler.setFormatter( | |
logging.Formatter("%(asctime)s - %(levelname)s -\t%(pathname)s[%(lineno)d]:%(funcName)s():\t%(msg)s") | |
) | |
logger.addHandler(handler) | |
def ssl_connect(host, port, keyfile = None, certfile = None, ca_certs = None, | |
ssl_version = None, service = rpyc.VoidService, config = {}): | |
"""creates an SSL-wrapped connection to the given host (encrypted and | |
authenticated). | |
host - the hostname to connect to | |
port - the TCP port to use | |
service - the local service to expose (defaults to Void) | |
config - configuration dict | |
keyfile, certfile, ca_certs, ssl_version -- arguments to ssl.wrap_socket. | |
see that module's documentation for further info.""" | |
kwargs = {"server_side" : False} | |
if keyfile: | |
kwargs["keyfile"] = keyfile | |
if certfile: | |
kwargs["certfile"] = certfile | |
if ca_certs: | |
kwargs["ca_certs"] = ca_certs | |
kwargs["cert_reqs"] = ssl.CERT_REQUIRED # here is the change. This flag is not set in official rpyc 3.1 verion | |
if ssl_version: | |
kwargs["ssl_version"] = ssl_version | |
else: | |
kwargs["ssl_version"] = ssl.PROTOCOL_TLSv1 | |
s = rpyc.SocketStream.ssl_connect(host, port, kwargs) | |
return rpyc.Connection(service, rpyc.Channel(s), config = config) | |
# Overwrite SSL connect method from rpyc module, because it contains a bug. | |
rpyc.ssl_connect = ssl_connect | |
class Interface(object): | |
def __init__(self, host, port, *args ): | |
self.service_address = (host, port) | |
self.out_conn = None | |
self._inbox = Queue() | |
self.connected = False | |
def connect(self): | |
logger.info('connecting') | |
self.out_conn = rpyc.ssl_connect(self.service_address[0], self.service_address[1], ca_certs = "t_cert.pem" ) | |
self.connected = True | |
def close_connection(self): | |
self.out_conn.close() | |
self.connected = False | |
def send(self, msg): | |
self.connect() | |
self.out_conn.root.inbox(msg) | |
self.close_connection() | |
class SenderNotReconnecting(Interface): | |
def send(self, *msg): | |
if not self.connected: | |
self.connect() | |
self.connected = True | |
self.out_conn.root.inbox(msg) | |
class MyService(rpyc.Service): | |
exposed_name = "MyService" | |
ALIASES = ['AAA'] | |
test_storage = Queue() | |
def exposed_inbox(self, msg): | |
self.test_storage.put(msg) | |
def exposed_check(self, num): | |
ret_val = self.test_storage.qsize() == num | |
while not self.test_storage.empty(): | |
q.pop() | |
q.task_done() | |
def on_connect(self): | |
pass | |
def on_disconnect(self): | |
pass | |
def test(): | |
#logger.setLevel(50) | |
port = 18871 | |
myservice_server = ThreadedServer(MyService, port = port, authenticator = SSLAuthenticator(keyfile = "t_cert_key.pem", | |
certfile = "t_cert.pem")) | |
th = threading.Thread(target = myservice_server.start) # put service to separate thread to perform parallel other code | |
th.start() | |
time.sleep(.4) # wait for server | |
for test_group in range(100): | |
print 'start group test:', test_group | |
#cls = SenderNotReconnecting | |
cls = Interface | |
interface = cls('localhost', port) | |
TEST_AMOUNT = 200 | |
for i in xrange(TEST_AMOUNT): | |
interface.send((test_group, i)) | |
if test_group >= 5: | |
print " <debug print>: ",test_group, i | |
start = time.time() | |
for i in xrange(TEST_AMOUNT): | |
msg = MyService.test_storage.get() | |
if test_group >= 5: | |
print msg | |
print 'end group test, time:', time.time()- start | |
if interface.connected: | |
interface.close_connection() | |
myservice_server.close() | |
if __name__ == "__main__": | |
test() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Robert Zaremba | |
# Date: 2011.06.22 | |
# This is additional file to connection_test.py | |
# It presents that the bug on ssl connection is rpyc side, not ssl sntadard module side. | |
# Here we perform the same task like in connection_test.test, but without rpyc machinery | |
import socket, ssl | |
import threading, time | |
from Queue import Queue | |
import logging as logger | |
class Interface(object): | |
def __init__(self, host, port, *args ): | |
self.service_address = (host, port) | |
self.out_conn = None | |
self.active = True | |
self.connected = False | |
def connect(self): | |
logger.info('connecting') | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
ssl_sock = ssl.wrap_socket(s, ca_certs="t_cert.pem",cert_reqs=ssl.CERT_REQUIRED) | |
ssl_sock.connect(self.service_address) | |
self.out_conn = ssl_sock | |
self.connected = True | |
def close_connection(self): | |
self.out_conn.close() | |
self.connected = False | |
def send(self, msg): | |
self.connect() | |
#print ' sending', msg | |
self.out_conn.write(msg) | |
self.close_connection() | |
## this is simple server presentation from official python docs | |
## http://docs.python.org/library/ssl.html?highlight=ssl#server-side-operation | |
class Server(threading.Thread): | |
def __init__(self, host, port): | |
super(Server, self).__init__() | |
self.address = (host, port) | |
self.active = True | |
self.data_num = 0 | |
self.conn_num = 0 | |
def run(self): | |
def deal_with_client(connstream): | |
#print '\n new connection made', connstream.getpeername() | |
data = connstream.read() | |
# null data means the client is finished with us | |
while data: | |
self.data_num += 1 | |
if not do_something(connstream, data): | |
# we'll assume do_something returns False | |
# when we're finished with client | |
break | |
data = connstream.read() | |
# finished with client | |
connstream.close() | |
def do_something(connstream, data): | |
#print "received:", data | |
if data == "close": | |
self.active = False | |
return False | |
return True | |
bindsocket = socket.socket() | |
bindsocket.bind(self.address) | |
bindsocket.listen(5) | |
while self.active: | |
self.conn_num += 1 | |
newsocket, fromaddr = bindsocket.accept() | |
connstream = ssl.wrap_socket(newsocket, | |
server_side=True, | |
certfile = "t_cert.pem", | |
keyfile = "t_cert_key.pem", | |
ssl_version=ssl.PROTOCOL_TLSv1) | |
deal_with_client(connstream) | |
def test(): | |
#logger.setLevel(50) | |
port = 18874 | |
myservice_server = Server('localhost', port) | |
myservice_server.daemon = True | |
myservice_server.start() | |
time.sleep(.4) # wait for server | |
for test_group in range(10): | |
print 'start group test:', test_group | |
interface = Interface('localhost', port) | |
TEST_AMOUNT = 200 | |
start = time.time() | |
for i in xrange(TEST_AMOUNT): | |
interface.send("{} {}".format(test_group, i)) | |
print 'end group test, time:', time.time()- start | |
if interface.connected: | |
interface.close_connection() | |
print 'close' | |
interface.send("close") | |
time.sleep(1) | |
print "\n\n ===========" | |
print "server stopped" | |
print "connections number:", myservice_server.conn_num | |
print "data transfered:", myservice_server.data_num | |
if __name__ == "__main__": | |
test() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-----BEGIN CERTIFICATE----- | |
MIICmDCCAgGgAwIBAgIJANIOKVLrk84AMA0GCSqGSIb3DQEBBQUAMGUxCzAJBgNV | |
BAYTAnBsMRAwDgYDVQQIDAdXcm9jbGF3MRAwDgYDVQQHDAdXcm9jbGF3MSEwHwYD | |
VQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBlJvYmVydDAe | |
Fw0xMTA2MjAxMjExNDNaFw0xMjA2MTkxMjExNDNaMGUxCzAJBgNVBAYTAnBsMRAw | |
DgYDVQQIDAdXcm9jbGF3MRAwDgYDVQQHDAdXcm9jbGF3MSEwHwYDVQQKDBhJbnRl | |
cm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBlJvYmVydDCBnzANBgkqhkiG | |
9w0BAQEFAAOBjQAwgYkCgYEAr9kZTYlWJ4rKt1MCEGRm3XfTMyDJJAn2V80AfFNS | |
hO0e/HASSTnrXwdXPOVc9xGpI55Wof/9ntg4eUaA1BcojZTvtR+JOerpWfZ2sgKD | |
CWAyI7C1tC10lzJ921Efooqqt6Jei8T0l0liG5kWuCDKoqHnHggiCVl08f7sVmCS | |
B1UCAwEAAaNQME4wHQYDVR0OBBYEFCC2s3kO/ZvN1FRNHliAAx40CD/2MB8GA1Ud | |
IwQYMBaAFCC2s3kO/ZvN1FRNHliAAx40CD/2MAwGA1UdEwQFMAMBAf8wDQYJKoZI | |
hvcNAQEFBQADgYEABfuXCqRclj2995PcL4h89KzhEFCaOXxfhYte6nJKiXdmELd6 | |
xY9N2a334UNhr5UOuKz9HLgn7z6DI2IYNKRkyrlokcjldCTKqv7LAieXIVF/kQTs | |
Iy6N213BvAVOKUuNgmG/SuFNqIROm8786+Bw72LlGlcS9iAL/TN2+W2Iy7E= | |
-----END CERTIFICATE----- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-----BEGIN PRIVATE KEY----- | |
MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAK/ZGU2JVieKyrdT | |
AhBkZt130zMgySQJ9lfNAHxTUoTtHvxwEkk5618HVzzlXPcRqSOeVqH//Z7YOHlG | |
gNQXKI2U77UfiTnq6Vn2drICgwlgMiOwtbQtdJcyfdtRH6KKqreiXovE9JdJYhuZ | |
FrggyqKh5x4IIglZdPH+7FZgkgdVAgMBAAECgYA6RVGqLOlcvxQ/vYUAk5S2rN9S | |
oPmmNJrqK2eOyzmcDMto0WZt14CUg+U3q7WrtbF7uRtyIVcUzQehwGguX+Iyw7tu | |
DxGBqoCPcsqJlbBPeVbH4HtuDdEcyYD0p5s27sxVr6c+zlxKRRS8XDzpERyiZU3v | |
ZN8qWFw+FzRWXDXvvQJBAN03HZ5SoEe6Z/+MtOnJm7L3QznexoDLDtugkuPh0Kdd | |
V7r4rWiRRWHbD4WyMgNioFyBJw2+v5715K1PYTq8C7MCQQDLf8CmmSCcJK2sJ5mg | |
c92SkAHYQIHYwlnaGmznafKigyJMyY3mkNZ3xvfiGpYBPIpgRWLjQNE8th7IiYY6 | |
YPzXAkAJ9EHz7tnhgGTPngBCCN1ltzSiNQEPqJN5lIcnk/C0p0GcWzZKX0cU+SuA | |
o50Wg+idYP5l4vBycbQhjFHet7l7AkAmsIdEJWDZzu9mB7FhnCXIM7DmGH2XZHwI | |
x3VlGfOTijL/PmLIL0lXRHDkgAF9ArGcVBTU+AHP9SAtKFhoyP/JAkEAzbeAvAQP | |
4xabJUx9Wf3JFXqqkMQo21cxbWxGP1WyUg+ULUZ9ZOJOM4WyS9KMiXcnl0KYDfGN | |
lIGsvvTM9Bwt+w== | |
-----END PRIVATE KEY----- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment