Skip to content

Instantly share code, notes, and snippets.

@raztud
Created May 15, 2016 20:51
Show Gist options
  • Save raztud/69ad220911cf28218c8b99a3cc94ab99 to your computer and use it in GitHub Desktop.
Save raztud/69ad220911cf28218c8b99a3cc94ab99 to your computer and use it in GitHub Desktop.
asyncio tcp server write number of requests to cassandra; golang client
import sys
import asyncio
import asyncio.streams
import uuid
from cassandra.cluster import Cluster
class MyServer:
def __init__(self, uid):
self.server = None # encapsulates the server sockets
self.clients = {} # task -> (reader, writer)
self.uid = uid
cluster = Cluster(['127.0.0.1'], protocol_version=4)
self.session = cluster.connect()
self.session.set_keyspace("raztud")
def _accept_client(self, client_reader, client_writer):
"""
This method accepts a new client connection and creates a Task
to handle this client. self.clients is updated to keep track
of the new client.
"""
# start a new Task to handle this specific client connection
task = asyncio.Task(self._handle_client(client_reader, client_writer))
self.clients[task] = (client_reader, client_writer)
def client_done(task):
print("client task done:", task, file=sys.stderr)
del self.clients[task]
task.add_done_callback(client_done)
@asyncio.coroutine
def _handle_client(self, client_reader, client_writer):
"""
This method actually does the work to handle the requests for
a specific client. The protocol is line oriented, so there is
a main loop that reads a line with a request and then sends
out one or more lines back to the client with the result.
"""
while True:
try:
data = (yield from client_reader.readline()).decode("utf-8")
except:
data = None
if not data: # an empty string means the client disconnected
break
self.session.execute("update requests set number = number+1 WHERE id = {0}".format(self.uid));
# client_writer.write("end\n".encode("utf-8"))
# This enables us to have flow control in our connection.
yield from client_writer.drain()
def start(self, loop):
"""
Starts the TCP server, so that it listens on port 8080.
For each client that connects, the accept_client method gets
called. This method runs the loop until the server sockets
are ready to accept connections.
"""
self.server = loop.run_until_complete(
asyncio.streams.start_server(self._accept_client,
'127.0.0.1', 8080,
loop=loop))
def stop(self, loop):
"""
Stops the TCP server, i.e. closes the listening socket(s).
This method runs the loop until the server sockets are closed.
"""
if self.server is not None:
self.server.close()
loop.run_until_complete(self.server.wait_closed())
self.server = None
def main():
loop = asyncio.get_event_loop()
uid = str(uuid.uuid1())
print ("Start with uuid: {0}".format(uid))
# creates a server and starts listening to TCP connections
server = MyServer(uid=uid)
server.start(loop)
try:
loop.run_forever()
finally:
server.close()
loop.close()
if __name__ == '__main__':
main()
package main
import (
"flag"
"fmt"
"net"
"os"
"sync"
)
func main() {
var numberOfMessages = flag.Int("noMsg", 100, "number of messages")
flag.Parse()
fmt.Printf("Send %d messagess. Please wait...\n", *numberOfMessages)
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
fmt.Println(err)
conn.Close()
}
var wg sync.WaitGroup
for i := 0; i < *numberOfMessages; i++ {
wg.Add(1)
go sendRequest(&wg, conn)
}
wg.Wait()
conn.Close()
}
func sendRequest(wg *sync.WaitGroup, conn net.Conn) {
defer wg.Done()
strEcho := "Hello\n"
_, err := conn.Write([]byte(strEcho))
if err != nil {
println("Write to server failed:", err.Error())
os.Exit(1)
}
// reply := make([]byte, 1024)
// _, err = conn.Read(reply)
// if err != nil {
// println("Write to server failed:", err.Error())
// }
//
// println("reply from server=", string(reply))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment