Skip to content

Instantly share code, notes, and snippets.

@afroisalreadyinu
Last active March 4, 2016 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afroisalreadyinu/f3384f02f090da44c0da to your computer and use it in GitHub Desktop.
Save afroisalreadyinu/f3384f02f090da44c0da to your computer and use it in GitHub Desktop.
import socket
import json
from tornado.testing import AsyncTestCase, gen_test
from tornadoes import ESConnection
from tornado.iostream import IOStream
from SampleService import MainHandler
class SampleServiceTests(AsyncTestCase):
@gen_test
def test_indexing_line(self):
self.es_client = ESConnection(io_loop=self.io_loop)
main = MainHandler(io_loop=self.io_loop)
main.listen(8888, address='localhost')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = IOStream(s, io_loop=self.io_loop)
yield stream.connect(("localhost", 8888))
yield stream.write(b"There is a tornado warning\n")
def search_result(results):
yield stream.read_until(b"\n")
result_json = json.loads(results.body.decode('utf-8'))
self.assertEqual(len(result_json['hits']['hits']), 1)
res = yield self.es_client.search(
search_result,
index='default',
type='doc',
source={"query": {"match_all": {}}}
)
import sys, uuid, json
from tornado import gen
import tornado.tcpserver
from tornado.httpclient import AsyncHTTPClient
class ConnectionHandler:
def __init__(self, stream, address):
self.stream = stream
self.stream.set_close_callback(self.on_close)
@gen.coroutine
def on_connect(self):
try:
while True:
line = yield self.stream.read_until(b"\n")
yield self.process_line(line)
except tornado.iostream.StreamClosedError:
pass
@gen.coroutine
def process_line(self, line):
client = AsyncHTTPClient()
data = dict(line=line.decode('utf-8'))
url = 'http://localhost:9200/default/doc/{}'.format(str(uuid.uuid4()))
resp = yield client.fetch(url, method='POST', headers=None, body=json.dumps(data))
yield self.write(b"\n")
@gen.coroutine
def on_close(self):
print("Closing connection")
yield []
class MainHandler(tornado.tcpserver.TCPServer):
@gen.coroutine
def handle_stream(self, stream, address):
cn = ConnectionHandler(stream, address)
yield cn.on_connect()
def main():
port = int(sys.argv[1])
main = MainHandler()
main.listen(port, address='localhost')
print("Listening on port {}".format(port))
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment