Skip to content

Instantly share code, notes, and snippets.

@polymorphm
Created November 15, 2011 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save polymorphm/1366741 to your computer and use it in GitHub Desktop.
Save polymorphm/1366741 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- mode: python; coding: utf-8 -*-
#
# <header-blahblahblah>
import sys, functools, socket, logging
SERVER_FAMILY = socket.AF_INET # socket.AF_INET6 or socket.AF_INET ?
SERVER_ADDR = '11.22.33.44'
SERVER_PORT = 8124
CLIENT_SO_KEEPALIVE = False
from tornado import ioloop
from tornado import iostream
def nb_raw_input(callback, prompt=None, io_loop=None):
io_loop = io_loop or ioloop.IOLoop.instance()
def raw_input_daemon():
args = []
if prompt is not None:
args.append(prompt)
value, error = None, None
try:
value = raw_input(*args)
except Exception as e:
error = e
io_loop.add_callback(functools.partial(callback, value, error))
from threading import Thread
t = Thread(target=raw_input_daemon)
t.daemon = True
t.start()
class TestKeepAliveClient(object):
def __init__(self):
self._log = logging.getLogger('test-alive-client')
self._log.info('begin test-keep-alive-client')
self._is_reading_keyboard = False
sock = socket.socket(SERVER_FAMILY, socket.SOCK_STREAM)
if CLIENT_SO_KEEPALIVE:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._stream = iostream.IOStream(sock)
self._stream.set_close_callback(self._close_handle)
self._stream.connect((SERVER_ADDR, SERVER_PORT), self._connect_handle)
def _exit(self):
self._log.info('exit test-keep-alive-client')
io_loop = ioloop.IOLoop.instance()
io_loop.stop()
def _connect_handle(self):
self._log.info('connected! {}'.format(dict(closed=self._stream.closed())))
self._begin_read_keyboard()
self._stream.read_until_close(
functools.partial(self._read_handle, last=True),
streaming_callback=self._read_handle,
)
def _close_handle(self):
self._log.info('closed!')
if not self._is_reading_keyboard:
self._exit()
def _read_handle(self, data, last=None):
if last is None:
last = False
self._log.info('read: {}'.format(dict(
data=data,
last=last,
)))
def _begin_read_keyboard(self):
self._is_reading_keyboard = True
nb_raw_input(self._keyboard_handle, prompt='test-keep-alive-client> ')
def _keyboard_handle(self, value, error):
if error is None:
if value:
self._log.info('sending from keyboard: {}'.format(dict(value=value)))
if not self._stream.closed():
self._stream.write(value + '\n')
else:
self._log.info('not sent -- stream is closed')
else:
if isinstance(error, EOFError):
self._exit()
self._begin_read_keyboard()
def main():
logging.basicConfig(
format='%(levelname)s %(asctime)-15s %(name)s: %(message)s',
level=logging.DEBUG)
TestKeepAliveClient()
io_loop = ioloop.IOLoop.instance()
io_loop.start()
if __name__ == '__main__':
error = main()
if error:
exit(error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment