Skip to content

Instantly share code, notes, and snippets.

@appeltel
Created June 1, 2017 01:34
Show Gist options
  • Save appeltel/09d77eb489494ae1e2703933108cb60a to your computer and use it in GitHub Desktop.
Save appeltel/09d77eb489494ae1e2703933108cb60a to your computer and use it in GitHub Desktop.
Simple asyncio streams redis client for GET/SET
import asyncio
def build_command(*args):
command = '*' + str(len(args)) + '\r\n'
for arg in args:
command += '$' + str(len(arg)) + '\r\n'
command += arg + '\r\n'
return command.encode('utf-8')
class RedisClient:
def __init__(self, host='127.0.0.1', port=6379):
self.host = host
self.port = port
self.reader = None
self.writer = None
async def connect(self):
self.reader, self.writer = await asyncio.open_connection(
self.host,
self.port
)
def disconnect(self):
self.writer.close()
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
self.disconnect()
async def get(self, key):
self.writer.write(build_command('GET', key))
return await self.get_string_response()
async def set(self, key, value):
self.writer.write(build_command('SET', key, value))
return await self.get_string_response()
async def get_string_response(self):
ctrl_char = await self.reader.readexactly(1)
if ctrl_char == b'+':
resp = await self.reader.readuntil(separator=b'\r\n')
return resp[:-2].decode('utf-8')
elif ctrl_char == b'$':
resp_length = int(await self.reader.readuntil(separator=b'\r\n'))
if resp_length == -1:
return None
resp = await self.reader.readexactly(resp_length)
await self.reader.readexactly(2)
return resp.decode('utf-8')
else:
raise Exception('Restricted Protocol Error')
async def test_redis():
async with RedisClient() as client:
result = await client.set("foo", "bar")
print(f'CMD "SET foo bar" result = {result}')
result = await client.get("foo")
print(f'CMD "GET foo" result = {result}')
result = await client.get("baz")
print(f'CMD "GET baz" result = {result}')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test_redis())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment