Skip to content

Instantly share code, notes, and snippets.

@tzickel
Created November 15, 2018 21:52
Show Gist options
  • Save tzickel/27ef7bbc459ce70be850c2407fbd0091 to your computer and use it in GitHub Desktop.
Save tzickel/27ef7bbc459ce70be850c2407fbd0091 to your computer and use it in GitHub Desktop.
Start a redis server in the background on a free tcp port
import subprocess
import random
import os
class RedisServer(object):
def __init__(self, dockerimage=None):
while True:
self._proc = None
self._port = random.randint(1025, 65535)
if dockerimage:
cmd = 'docker run --rm -p {port}:6379 {image} --save'.format(port=self.port, image=dockerimage)
else:
cmd = 'redis-server --save --port {port}'.format(port=self.port)
self._proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
seen_redis = False
while True:
line = self._proc.stdout.readline()
if b'Redis' in line:
seen_redis = True
if line == b'':
self._port = None
break
elif b'Ready to accept connections' in line:
break
elif b'Opening Unix socket' in line and b'Address already in use' in line:
raise Exception('Unix domain already in use')
# usually could not find docker image
if not seen_redis:
raise Exception('Could not run redis')
if self._port:
break
@property
def port(self):
return self._port
def __del__(self):
if self._proc:
try:
self._proc.kill()
except Exception:
pass
self._proc = None
if __name__ == "__main__":
from redis import Redis
rs = RedisServer(dockerimage='redis:4.0')
r = Redis(port=rs.port)
print(r.set('a', 'a'))
print(r.get('a'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment