Skip to content

Instantly share code, notes, and snippets.

@toriningen
Last active May 28, 2021 19:59
Show Gist options
  • Save toriningen/be64f6b899e103d04030e1dd90d48c67 to your computer and use it in GitHub Desktop.
Save toriningen/be64f6b899e103d04030e1dd90d48c67 to your computer and use it in GitHub Desktop.
input() with timeout
from timed_input import timed_input
if __name__ == "__main__":
answer = timed_input(3, "You have 3 seconds to type something: ")
if answer is not None:
print(f"Good, your answer was {answer!r}")
else:
print("You've failed.")
from __future__ import annotations
from typing import TypeVar, Generic, Optional
T = TypeVar('T')
class ProtectedConstructorError(TypeError):
pass
class Result(Generic[T]):
__token = object()
def __init__(self, *, token, is_resolved: bool, value: T = None, exc: Optional[BaseException] = None):
if token is not self.__token:
raise ProtectedConstructorError()
self.__is_resolved = is_resolved
self.__value = value
self.__exc = exc
@classmethod
def resolve(cls, value: T) -> Result[T]:
return cls(token=cls.__token, is_resolved=True, value=value)
@classmethod
def reject(cls, exc: BaseException) -> Result[T]:
return cls(token=cls.__token, is_resolved=False, exc=exc)
@property
def is_resolved(self) -> bool:
return self.__is_resolved
@property
def value(self) -> T:
if not self.__is_resolved:
raise self.__exc
return self.__value
@property
def exc(self) -> Optional[BaseException]:
if self.__is_resolved:
return None
return self.__exc
def __repr__(self):
if self.__is_resolved:
return f'<Result resolved with {self.__value!r}>'
else:
return f'<Result rejected with {self.__exc!r}>'
from __future__ import annotations
import sys
from traceback import format_exc
from contextlib import contextmanager
from multiprocessing.connection import Connection, Pipe
from multiprocessing import Process
from os import fdopen
from typing import Optional
from result import Result
@contextmanager
def replace_stdin(fd: int):
orig = sys.stdin
sys.stdin = fdopen(fd)
try:
yield
finally:
sys.stdin = orig
class RemoteUnserializableError(RuntimeError):
pass
def _timed_input_entry(conn: Connection, stdin_fd: int, prompt: str) -> None:
try:
with replace_stdin(stdin_fd):
conn.send(Result.resolve(input(prompt)))
except BaseException as exc:
try:
conn.send(Result.reject(exc))
except TypeError:
conn.send(Result.reject(RemoteUnserializableError(format_exc())))
def timed_input(timeout: float, prompt: str = '', add_eol=False) -> str:
conn, child_conn = Pipe(False)
p = Process(
target=_timed_input_entry,
args=(
child_conn,
sys.stdin.fileno(),
str(prompt),
),
daemon=True,
)
p.start()
if conn.poll(timeout):
return conn.recv().value
else:
p.terminate()
if add_eol:
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment