Skip to content

Instantly share code, notes, and snippets.

@euri10
Last active January 30, 2020 13:30
Show Gist options
  • Save euri10/bbb14991e73010725e6af54e872fb375 to your computer and use it in GitHub Desktop.
Save euri10/bbb14991e73010725e6af54e872fb375 to your computer and use it in GitHub Desktop.
class NgrokTunnel:
def __init__(self, port: str):
assert find_executable(
"ngrok"
), "ngrok command must be installed, see https://ngrok.com/"
self.port = port
def start(self, ngrok_die_check_delay: float = 0.5) -> Optional[str]:
logger.debug(f"Starting ngrok tunnel for port {self.port}")
pattern = r"^.*addr=(?P<addr>http://localhost:(?P<port>\d+)) url=(?P<ngrokurl>https://(?P<ngroksubdomain>.*)\.ngrok\.io)$"
self.ngrok = subprocess.Popen(
[
"ngrok",
"http",
self.port,
"--log",
"stdout",
],
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
)
found = False
ngrokurl = None
while not found:
for line in self.ngrok.stdout:
# logger.debug(line, end="")
m = re.match(pattern, line)
if m is not None:
# addr = m.group("addr")
# port = m.group("port")
ngrokurl = m.group("ngrokurl")
# ngroksubdomain = m.group("ngroksubdomain")
found = True
break
# See that we don't instantly die
time.sleep(ngrok_die_check_delay)
assert self.ngrok.poll() is None, "ngrok terminated abrutly"
return ngrokurl
def stop(self) -> None:
"""Tell ngrok to tear down the tunnel.
Stop the background tunneling process.
"""
self.ngrok.terminate()
@pytest.fixture(scope="session", autouse=True)
def ngrokurl_subscriber() -> Iterable[Optional[str]]:
ngrok_subscriber = NgrokTunnel("5002")
url = ngrok_subscriber.start()
yield url
ngrok_subscriber.stop()
@pytest.fixture(scope="session")
async def subscriber_server(subscriber_test_app: FastAPI) -> AsyncIterator[Server]:
config = Config(app=subscriber_test_app, port=5002)
server = Server(config=config)
task = asyncio.ensure_future(server.serve())
try:
while not server.started:
await asyncio.sleep(0.0001)
yield server
finally:
logger.debug("END SUBSCRIBER SERVER")
server.should_exit = True
await task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment