-
-
Save manuels/8852953 to your computer and use it in GitHub Desktop.
# -*- coding: latin-1 -*- | |
# | |
# Copyright (C) AB Strakt | |
# Copyright (C) Jean-Paul Calderone | |
# See LICENSE for details. | |
""" | |
Simple SSL client, using blocking I/O | |
""" | |
from OpenSSL import SSL | |
import sys, os, select, socket | |
def verify_cb(conn, cert, errnum, depth, ok): | |
# This obviously has to be updated | |
print 'Got certificate: %s' % cert.get_subject() | |
return ok | |
if len(sys.argv) < 3: | |
print 'Usage: python[2] client.py HOST PORT' | |
sys.exit(1) | |
dir = os.path.dirname(sys.argv[0]) | |
if dir == '': | |
dir = os.curdir | |
# Initialize context | |
ctx = SSL.Context(SSL.DTLSv1_METHOD) | |
ctx.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate | |
ctx.use_privatekey_file (os.path.join(dir, 'client.pkey')) | |
ctx.use_certificate_file(os.path.join(dir, 'client.cert')) | |
ctx.load_verify_locations(os.path.join(dir, 'CA.cert')) | |
# Set up client | |
sock = SSL.Connection(ctx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) | |
addr = (sys.argv[1], int(sys.argv[2])) | |
sock.connect(addr) | |
while 1: | |
line = sys.stdin.readline() | |
if line == '': | |
break | |
try: | |
sock.sendto(line, addr) | |
print 'received', sock.recvfrom(1024) | |
sys.stdout.flush() | |
except SSL.Error as e: | |
print e | |
print 'Connection died unexpectedly' | |
break | |
sock.shutdown() | |
sock.close() |
# -*- coding: latin-1 -*- | |
# | |
# Copyright (C) AB Strakt | |
# Copyright (C) Jean-Paul Calderone | |
# See LICENSE for details. | |
""" | |
Simple echo server, using nonblocking I/O | |
""" | |
from OpenSSL import SSL | |
import sys, os, select, socket | |
def verify_cb(conn, cert, errnum, depth, ok): | |
# This obviously has to be updated | |
print 'Got certificate: %s' % cert.get_subject() | |
return ok | |
if len(sys.argv) < 2: | |
print 'Usage: python[2] server.py PORT' | |
sys.exit(1) | |
dir = os.path.dirname(sys.argv[0]) | |
if dir == '': | |
dir = os.curdir | |
# Initialize context | |
ctx = SSL.Context(SSL.DTLSv1_METHOD) | |
ctx.set_options(SSL.OP_NO_SSLv2) | |
ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) # Demand a certificate | |
ctx.use_privatekey_file (os.path.join(dir, 'server.pkey')) | |
ctx.use_certificate_file(os.path.join(dir, 'server.cert')) | |
ctx.load_verify_locations(os.path.join(dir, 'CA.cert')) | |
# Set up server | |
server = SSL.Connection(ctx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server.bind(('', int(sys.argv[1]))) | |
#server.listen(3) | |
server.setblocking(0) | |
clients = {} | |
writers = {} | |
def dropClient(cli, errors=None): | |
if errors: | |
print 'Client %s left unexpectedly:' % (clients[cli],) | |
print ' ', errors | |
else: | |
print 'Client %s left politely' % (clients[cli],) | |
del clients[cli] | |
if writers.has_key(cli): | |
del writers[cli] | |
if not errors: | |
cli.shutdown() | |
cli.close() | |
while 1: | |
try: | |
r,w,_ = select.select([server]+clients.keys(), writers.keys(), []) | |
except: | |
break | |
for cli in r: | |
print 'something happended', cli, cli==server | |
try: | |
#cli,addr = server.accept() | |
ret, addr = server.recvfrom(1024) | |
print 'Connection from %s' % (addr,) | |
clients[cli] = addr | |
except (SSL.WantReadError, SSL.WantWriteError, SSL.WantX509LookupError): | |
pass | |
except SSL.ZeroReturnError: | |
dropClient(cli) | |
except SSL.Error, errors: | |
dropClient(cli, errors) | |
except Exception as e: | |
print e | |
pass | |
else: | |
if not writers.has_key(cli): | |
writers[cli] = '' | |
print 'got', ret | |
writers[cli] = writers[cli] + ret | |
for cli, addr in clients.iteritems(): | |
try: | |
print 'sending response', cli, addr | |
ret = server.sendto(writers[cli], addr) | |
except (SSL.WantReadError, SSL.WantWriteError, SSL.WantX509LookupError) as e: | |
print e | |
pass | |
except SSL.ZeroReturnError: | |
dropClient(cli) | |
except SSL.Error, errors: | |
dropClient(cli, errors) | |
else: | |
writers[cli] = writers[cli][ret:] | |
if writers[cli] == '': | |
del writers[cli] | |
for cli in clients.keys(): | |
cli.close() | |
server.close() |
Hi,
Will dtls_server.py meet the requirements for philips hue entertainment?
DTLS Handshaking
UDP port 2100 is used for DTLS handshaking and streaming. Only DTLS mode version 1.2 with Pre-Shared Key (PSK) Key exchange method with TLS_PSK_WITH_AES_128_GCM_SHA256 set as Cipher Suite is supported.
I installed the dtls module through the below command on windows 7
pip inistall Dtls
Also I installed openssl for python using the below command
pip install pyopenssl
But when I try to run the above sample Dtls code I am getting the below error
File "clientDTLS.py", line 28, in
ctx = SSL.Context(SSL.DTLSv1_METHOD)
File "C:\Python27\lib\site-packages\cryptography\utils.py", line 127, in get
attr
obj = getattr(self._module, attr)
AttributeError: 'module' object has no attribute 'DTLSv1_METHOD'
Could you please suggest anything on this
pyopenssl library do not support DTLSv1_METHOD. TLS only. (https://pyopenssl.org/en/stable/api/ssl.html)
Try this: rbit/pydtls#15 (comment)
how to generate
server.pkey
,server.cert
andCA.cert
, can you paste the command please?