Skip to content

Instantly share code, notes, and snippets.

@rkojedzinszky
Created November 27, 2017 15:42
Show Gist options
  • Save rkojedzinszky/cbb6710c0cd6b205eda6f32b4bfcd7fc to your computer and use it in GitHub Desktop.
Save rkojedzinszky/cbb6710c0cd6b205eda6f32b4bfcd7fc to your computer and use it in GitHub Desktop.
Micropython poll try
try:
import usocket as socket
import uselect as select
import utime as time
import urandom as random
except ImportError:
import socket
import select
import time
import random
try:
import network
localhost = network.WLAN(network.STA_IF).ifconfig()[0]
except ImportError:
localhost = '127.0.0.1'
srv_addr = None
POLLFLAGS = (
(select.POLLIN, 'POLLIN'),
(select.POLLOUT, 'POLLOUT'),
(select.POLLHUP, 'POLLHUP'),
(select.POLLERR, 'POLLERR'),
)
def pollone(poller, timeout=0):
r = poller.poll(timeout)
if r:
print (' ' + ' | '.join([f[1] for f in POLLFLAGS if r[0][-1] & f[0]]))
else:
print (' timeout')
def nbtest(addr):
sock = socket.socket()
sock.setblocking(False)
p = select.poll()
p.register(sock, select.POLLIN | select.POLLOUT)
print (' - socket')
pollone(p)
try:
sock.connect(addr)
except OSError:
pass
print (' - connecting')
pollone(p)
print (' - after sleep')
time.sleep(1)
pollone(p)
def srvtest():
global srv_addr
sock = socket.socket()
sock.setblocking(False)
p = select.poll()
p.register(sock, select.POLLIN | select.POLLOUT)
while True:
srv_addr = socket.getaddrinfo(localhost, random.getrandbits(15) + 32768)[0][-1]
try:
sock.bind(srv_addr)
except OSError:
pass
else:
break
print ('* server socket bound to port {}'.format(srv_addr[1]))
pollone(p)
sock.listen(10)
print ('* server socket listening')
pollone(p)
cl = socket.socket()
cl.setblocking(False)
try:
cl.connect(srv_addr)
except OSError:
pass
print ('* server incomming connection')
pollone(p, timeout=30000)
def run(oport, cport, uport):
open_port = socket.getaddrinfo(*oport)[0][-1]
closed_port = socket.getaddrinfo(*cport)[0][-1]
unreach_port = socket.getaddrinfo(*uport)[0][-1]
for d, a in (
('open', open_port),
('closed', closed_port),
('unreach', unreach_port),
):
print ('* {}'.format(d))
nbtest(a)
srvtest()
if __name__ == '__main__':
import sys
run(
(sys.argv[1], int(sys.argv[2])),
(sys.argv[3], int(sys.argv[4])),
(sys.argv[5], int(sys.argv[6])),
)
# run this passing 3 (host, port) pairs to run() or to main:
# first is a host/port which is open
# seconf is a host/port which is closed
# third is a host/port which is unreachable (does not send ack/rst)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment