Skip to content

Instantly share code, notes, and snippets.

@wecsam
Last active April 28, 2018 01:01
Show Gist options
  • Save wecsam/84ccd4e819a5c290efd9ddb24e2d38c9 to your computer and use it in GitHub Desktop.
Save wecsam/84ccd4e819a5c290efd9ddb24e2d38c9 to your computer and use it in GitHub Desktop.
Like cat but with rate limiting
#!/usr/bin/env python3
import collections, os.path, queue, sys, textwrap, threading, time
UNITS = {
"char": lambda fin: fin.read(1),
"line": lambda fin: fin.readline(),
}
def producer(backlog, stop, unit_getter, fin):
'''
Continually does backlog.put(unit_getter(fin)). If KeyboardInterrupt
happens, it is caught and stop.set() is called.
Arguments:
backlog:
a queue.Queue into which data will be put
stop:
a threading.Event that will be set upon KeyboardInterrupt
unit_getter:
a function that takes a file object as the sole parameter and that
returns a string
fin:
a file object from which data will be read
'''
while True:
try:
data = unit_getter(fin)
except KeyboardInterrupt:
stop.set()
backlog.put("")
break
else:
backlog.put(data)
def consumer(backlog, stop, max_units_per_period, seconds_in_period, fout):
'''
Continually does fout.write(backlog.get()) but limits that to
max_units_per_period times every seconds_in_period seconds.
Arguments:
backlog:
a queue.Queue from which data will be gotten
stop:
a threading.Event that, when set, will cause this function to exit
after backlog.get stops blocking
max_units_per_period:
an integer that represents the maximum number of writes in a period
seconds_in_period:
a float that represents the length of a period in seconds
fout:
a file object to which data will be written
'''
write_times_in_period = collections.deque()
while not stop.is_set():
# Only keep times within the period in write_times_in_period.
while write_times_in_period and \
time.perf_counter() - write_times_in_period[0] > seconds_in_period:
write_times_in_period.popleft()
# If the maximum number of items per period has not been reached, write
# an item. If the maximum has been reached, then sleep a little.
if len(write_times_in_period) < max_units_per_period:
fout.write(backlog.get())
fout.flush()
backlog.task_done()
# Save the time that this item was written.
write_times_in_period.append(time.perf_counter())
else:
delay = min(
0.1,
seconds_in_period - time.perf_counter() +
write_times_in_period[0] - 0.01
)
if delay > 0.0:
time.sleep(delay)
print("^C")
def start(
unit_getter,
max_units_per_period,
seconds_in_period,
fin=sys.stdin,
fout=sys.stdout
):
backlog = queue.Queue()
stop = threading.Event()
# Start the consumer in another thread.
threading.Thread(
target=consumer,
args=(backlog, stop, max_units_per_period, seconds_in_period, fout)
).start()
# Start the producer in this thread.
producer(backlog, stop, unit_getter, fin)
def main():
# See the end of this function for the proper argument pattern.
if len(sys.argv) == 4:
try:
unit_getter = UNITS[sys.argv[1]]
except KeyError:
print("Error: the unit was not understood.")
else:
try:
max_units_per_period = int(sys.argv[2])
except ValueError:
print("Error: [maximum units per period] must be an integer.")
else:
try:
seconds_in_period = float(sys.argv[3])
except ValueError:
print("Error: [seconds in period] must be a float.")
else:
start(unit_getter, max_units_per_period, seconds_in_period)
return 0
# Print the usage pattern.
print(
"Usage:",
os.path.basename(sys.argv[0]),
"(" + "|".join(UNITS.keys()) + ")",
"[maximum units per period]",
"[seconds in period]",
end="\n\n"
)
print(
textwrap.fill(
"This script continually reads data from STDIN and writes it to "
"STDOUT. However, it limits the rate at which the data is written "
"to STDOUT. You set the rate by choosing a unit (argument 1), the "
"maximum number of those units that can be written within a "
"rolling period, and the number of seconds that a period lasts. "
"Press Ctrl+C to quit."
)
)
return 1
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment