Skip to content

Instantly share code, notes, and snippets.

@gregburek
Created December 7, 2011 01:51
Show Gist options
  • Save gregburek/1441055 to your computer and use it in GitHub Desktop.
Save gregburek/1441055 to your computer and use it in GitHub Desktop.
Rate limiting function calls with Python Decorators
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
@utilitarianexe
Copy link

make it work nice across threads

def rate_limited(max_per_second):
'''
Decorator that make functions not be called faster than
'''
lock = threading.Lock()
minInterval = 1.0 / float(max_per_second)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(_args,__kargs):
lock.acquire()
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
lock.release()
ret = func(_args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate

@rjnienaber
Copy link

Here's the correctly formatted version of the above python code:

import time, threading

#make it work nice across threads
def rate_limited(max_per_second):
  '''
  Decorator that make functions not be called faster than
  '''
  lock = threading.Lock()
  minInterval = 1.0 / float(max_per_second)
  def decorate(func):
    lastTimeCalled = [0.0]
    def rateLimitedFunction(args,*kargs):
      lock.acquire()
      elapsed = time.clock() - lastTimeCalled[0]
      leftToWait = minInterval - elapsed

      if leftToWait>0:
        time.sleep(leftToWait)

      lock.release()

      ret = func(args,*kargs)
      lastTimeCalled[0] = time.clock()
      return ret
    return rateLimitedFunction
  return decorate

@andresriancho
Copy link

My version, with minor improvements and PEP-8:

import time
import threading

from functools import wraps


def rate_limited(max_per_second):
    """
    Decorator that make functions not be called faster than
    """
    lock = threading.Lock()
    min_interval = 1.0 / float(max_per_second)

    def decorate(func):
        last_time_called = [0.0]

        @wraps(func)
        def rate_limited_function(*args, **kwargs):
            lock.acquire()
            elapsed = time.clock() - last_time_called[0]
            left_to_wait = min_interval - elapsed

            if left_to_wait > 0:
                time.sleep(left_to_wait)

            lock.release()

            ret = func(*args, **kwargs)
            last_time_called[0] = time.clock()
            return ret

        return rate_limited_function

    return decorate

@timeyyy
Copy link

timeyyy commented Jan 11, 2015

  1. the first call to the function to goes through without the time delay
  2. Setting added to either allow successive calls to be queued as the above code versions, or to be simply ignored.
  3. Added Test code
import time
import threading
from functools import wraps

def rate_limited(max_per_second, mode='wait', delay_first_call=False):
    """
    Decorator that make functions not be called faster than

    set mode to 'kill' to just ignore requests that are faster than the 
    rate.

    set delay_first_call to True to delay the first call as well
    """
    lock = threading.Lock()
    min_interval = 1.0 / float(max_per_second)
    def decorate(func):
        last_time_called = [0.0]
        @wraps(func)
        def rate_limited_function(*args, **kwargs):
            def run_func():
                lock.release()
                ret = func(*args, **kwargs)
                last_time_called[0] = time.perf_counter()
                return ret
            lock.acquire()
            elapsed = time.perf_counter() - last_time_called[0]
            left_to_wait = min_interval - elapsed
            if delay_first_call:    
                if left_to_wait > 0:
                    if mode == 'wait':
                        time.sleep(left_to_wait)
                        return run_func()
                    elif mode == 'kill':
                        lock.release()
                        return
                else:
                    return run_func()
            else:
                # Allows the first call to not have to wait
                if not last_time_called[0] or elapsed > min_interval:   
                    return run_func()       
                elif left_to_wait > 0:
                    if mode == 'wait':
                        time.sleep(left_to_wait)
                        return run_func()
                    elif mode == 'kill':
                        lock.release()
                        return
        return rate_limited_function
    return decorate

@rate_limited(2, mode='wait') 
def print_num_wait(num):
    print (num )

@rate_limited(1/2, mode='kill')
def print_num_kill(num):
    print(num)

@rate_limited(2, mode='kill', delay_first_call=True) 
def print_num_kill_delay(num):
    print(num)  

@rate_limited(1/3, mode='wait', delay_first_call=True) 
def print_num_wait_delay(num):
    print(num)  

@rate_limited(1/3, mode='refresh_timer') 
def print_num_wait_refresh(num):
    print(num)  
if __name__ == "__main__":
    print('Rate limited at 2 per second at most')   
    print() 
    print("Mode is Kill")
    print("1 000 000 print requests sent to decorated function")
    for i in range(1,1000000):
        print_num_kill(i) 

    print()
    print('Mode is Wait - default')
    print("10 print requests sent to decorated function")
    for i in range(1,11):
        print_num_wait(i) 

    print()
    print('Mode is Kill with Delay on first request')
    print("1 000 000 print requests sent to decorated function")
    for i in range(1, 1000000):
        print_num_kill_delay(i)                 

    print()
    print('Mode is Wait with Delay on first request')
    print("5 print requests sent to decorated function")
    for i in range(1, 6):
        print_num_wait_delay(i)  

@gfranxman
Copy link

Why is last_time_called a list?

@zcmyth
Copy link

zcmyth commented Jul 23, 2015

Because it is not a local variable, and we can't modify it unless nonlocal is used.
However, nonlocal is a new feature in python 3. In order to modify last_time_called in python 2.x, as a workaround, we use array and modify its element.

@sintezcs
Copy link

sintezcs commented Aug 12, 2016

Multi-theading issues occur in both versions, that use locks posted above. last_time_called is being updated after the lock has been released. That leads to upredictable fluctuations in rate limits when the decorator is being called from multiple threads. The final fixed version that is being used in production:

def rate_limited(max_per_second):
    lock = threading.Lock()
    min_interval = 1.0 / max_per_second

    def decorate(func):
        last_time_called = time.perf_counter()

        @wraps(func)
        def rate_limited_function(*args, **kwargs):
            lock.acquire()
            nonlocal last_time_called
            elapsed = time.perf_counter() - last_time_called
            left_to_wait = min_interval - elapsed

            if left_to_wait > 0:
                time.sleep(left_to_wait)

            ret = func(*args, **kwargs)
            last_time_called = time.perf_counter()
            lock.release()
            return ret

        return rate_limited_function

    return decorate

Note: nonlocal keyword has been used, so the code is only compatible with python 3

@yifeikong
Copy link

@sintezcs great work

@manurueda
Copy link

love it

@mwek
Copy link

mwek commented May 25, 2017

Version posted by @sintezcs will not release the lock if func raises Exception.

def rate_limited(max_per_second: int):
    """Rate-limits the decorated function locally, for one process."""
    lock = threading.Lock()
    min_interval = 1.0 / max_per_second

    def decorate(func):
        last_time_called = time.perf_counter()

        @wraps(func)
        def rate_limited_function(*args, **kwargs):
            lock.acquire()
            nonlocal last_time_called
            try:
                elapsed = time.perf_counter() - last_time_called
                left_to_wait = min_interval - elapsed
                if left_to_wait > 0:
                    time.sleep(left_to_wait)

                return func(*args, **kwargs)
            finally:
                last_time_called = time.perf_counter()
                lock.release()

        return rate_limited_function

    return decorate

@tim-seoss
Copy link

tim-seoss commented Sep 20, 2017

This is great. I can't see any mention of a license tho, which means:

Code without an explicit license is protected by copyright and is by default All Rights Reserved. The person or people who wrote the code are protected as such. Any time you're using software you didn't write, licensing should be considered and abided. - Brian Doll, GitHub's VP of Marketing

https://www.infoworld.com/article/2615869/open-source-software/github-needs-to-take-open-source-seriously.html

i.e. using the code will be copyright infringement in most parts of the world, and so is risky... Any chance we could make it open source so that it can be used as well?

https://choosealicense.com/no-license/

At this stage, this would require the original poster, and anyone who has modified a given version (including intermediate versions) to declare licensing. Can I suggest the original posted makes a choice following this guidance here:

https://choosealicense.com/

maybe MIT? MIT licenced code can of course be re-licenced under other terms by modifiers if they want to, so that their modifications fall under copyleft (e.g. MIT -> GPL, but not the other way around).

Thanks!

Tim.

@oPromessa
Copy link

oPromessa commented Mar 4, 2018

Thanks team for sharing this. I've built a class for use in multiprocessing mode. Hope it helps others.

  • I'm using time.time()
  • Using multiprocessing instead of threading
  • it's compatible with python 2.7 and 3.6
"""
    by oPromessa, 2017
    Published on https://github.com/oPromessa/flickr-uploader/

    Inspired by: https://gist.github.com/gregburek/1441055

    Helper class and functions to rate limiting function calls with Python Decorators
"""

# ----------------------------------------------------------------------------
# Import section for Python 2 and 3 compatible code
# from __future__ import absolute_import, division, print_function, unicode_literals
from __future__ import division    # This way: 3 / 2 == 1.5; 3 // 2 == 1

# ----------------------------------------------------------------------------
# Import section
#
import sys
import logging
import multiprocessing
import time
from functools import wraps


# -----------------------------------------------------------------------------
# class LastTime to be used with rate_limited
#
class LastTime:
    """
        >>> import rate_limited as rt
        >>> a = rt.LastTime()
        >>> a.add_cnt()
        >>> a.get_cnt()
        1
        >>> a.add_cnt()
        >>> a.get_cnt()
        2
    """

    def __init__(self, name='LT'):
        # Init variables to None
        self.name = name
        self.ratelock = None
        self.cnt = None
        self.last_time_called = None

        # Instantiate control variables
        self.ratelock = multiprocessing.Lock()
        self.cnt = multiprocessing.Value('i', 0)
        self.last_time_called = multiprocessing.Value('d', 0.0)

        logging.debug('\t__init__: name=[{!s}]'.format(self.name))

    def acquire(self):
        self.ratelock.acquire()

    def release(self):
        self.ratelock.release()

    def set_last_time_called(self):
        self.last_time_called.value = time.time()
        # self.debug('set_last_time_called')

    def get_last_time_called(self):
        return self.last_time_called.value

    def add_cnt(self):
        self.cnt.value += 1

    def get_cnt(self):
        return self.cnt.value

    def debug(self, debugname='LT'):
        now=time.time()
        logging.debug('___Rate name:[{!s}] '
                      'debug=[{!s}] '
                      '\n\t        cnt:[{!s}] '
                      '\n\tlast_called:{!s} '
                      '\n\t  timenow():{!s} '
                      .format(self.name,
                              debugname,
                              self.cnt.value,
                              time.strftime(
                                '%T.{}'
                                .format(str(self.last_time_called.value -
                                            int(self.last_time_called.value))
                                            .split('.')[1][:3]),
                                time.localtime(self.last_time_called.value)),
                              time.strftime(
                                '%T.{}'
                                .format(str(now -
                                            int(now))
                                            .split('.')[1][:3]),
                                time.localtime(now))))


# -----------------------------------------------------------------------------
# rate_limited
#
# retries execution of a function
def rate_limited(max_per_second):

    min_interval = 1.0 / max_per_second
    LT = LastTime('rate_limited')

    def decorate(func):
        LT.acquire()
        if LT.get_last_time_called() == 0:
            LT.set_last_time_called()
        LT.debug('DECORATE')
        LT.release()

        @wraps(func)
        def rate_limited_function(*args, **kwargs):

            logging.warning('___Rate_limited f():[{!s}]: '
                            'Max_per_Second:[{!s}]'
                            .format(func.__name__, max_per_second))

            try:
                LT.acquire()
                LT.add_cnt()
                xfrom = time.time()

                elapsed = xfrom - LT.get_last_time_called()
                left_to_wait = min_interval - elapsed
                logging.debug('___Rate f():[{!s}] '
                              'cnt:[{!s}] '
                              '\n\tlast_called:{!s} '
                              '\n\t time now():{!s} '
                              'elapsed:{:6.2f} '
                              'min:{!s} '
                              'to_wait:{:6.2f}'
                              .format(func.__name__,
                                      LT.get_cnt(),
                                      time.strftime(
                                            '%T',
                                            time.localtime(
                                                LT.get_last_time_called())),
                                      time.strftime('%T',
                                                    time.localtime(xfrom)),
                                      elapsed,
                                      min_interval,
                                      left_to_wait))
                if left_to_wait > 0:
                    time.sleep(left_to_wait)

                ret = func(*args, **kwargs)

                LT.debug('OVER')
                LT.set_last_time_called()
                LT.debug('NEXT')

            except Exception as ex:
                # CODING: To be changed once reportError is on a module
                sys.stderr.write('+++000 '
                                 'Exception on rate_limited_function: [{!s}]\n'
                                 .format(ex))
                sys.stderr.flush()
                # reportError(Caught=True,
                #              CaughtPrefix='+++',
                #              CaughtCode='000',
                #              CaughtMsg='Exception on rate_limited_function',
                #              exceptUse=True,
                #              # exceptCode=ex.code,
                #              exceptMsg=ex,
                #              NicePrint=False,
                #              exceptSysInfo=True)
                raise
            finally:
                LT.release()
            return ret

        return rate_limited_function

    return decorate
# -----------------------------------------------------------------------------
# Samples
#@rate_limited(5) # 5 calls per second
# def print_num(num):
#     print (num )


# -----------------------------------------------------------------------------
# If called directly run doctests
#
if __name__ == "__main__":

    logging.basicConfig(level=logging.DEBUG,
                        format='[%(asctime)s]:[%(processName)-11s]' +
                               '[%(levelname)-8s]:[%(name)s] %(message)s')

    import doctest
    doctest.testmod()

    # Comment following line to allow further debugging/testing
    # sys.exit(0)

    # n for n calls per second  (ex. 3 means 3 calls per second)
    # 1/n for n seconds per call (ex. 0.5 meand 4 seconds in between calls)
    @rate_limited(1)
    def print_num(prc, num):
        """
        """
        print('\t\t***prc:[{!s}] num:[{!s}] '
              'rate_limit timestamp:[{!s}]'
              .format(prc, num, time.strftime('%T')))

    print('-------------------------------------------------Single Processing')
    for process in range(1, 3):
        for j in range(1, 2):
            print_num(process, j)

    print('-------------------------------------------------Multi Processing')
    def fmulti(x, prc):
        import random

        for i in range(1,x):
            r = random.randrange(6)
            print('\t\t[prc:{!s}] [{!s}]'
                  '->- WORKing {!s}s----[{!s}]'
                  .format(prc, i, r, time.strftime('%T')))
            time.sleep(r)
            print('\t\t[prc:{!s}] [{!s}]--> Before:---[{!s}]'
                  .format(prc, i, time.strftime('%T')))
            print_num(prc, i)
            print('\t\t[prc:{!s}] [{!s}]<-- After----[{!s}]'
                  .format(prc, i, time.strftime('%T')))

    TaskPool = []

    for j in range(1,4):
        Task = multiprocessing.Process(target=fmulti, args=(5,j))
        TaskPool.append(Task)
        Task.start()

    for j in TaskPool:
        print('{!s}.is_alive = {!s}'.format(j.name, j.is_alive()))

    while (True):
        if not (any(multiprocessing.active_children())):
            print('===No active children Processes.')
            break
        for p in multiprocessing.active_children():
            print('==={!s}.is_alive = {!s}'.format(p.name, p.is_alive()))
            uploadTaskActive = p
        print('===Will wait for 60 on {!s}.is_alive = {!s}'
              .format(uploadTaskActive.name,
                      uploadTaskActive.is_alive()))
        uploadTaskActive.join(timeout=60)
        print('===Waited for 60s on {!s}.is_alive = {!s}'
              .format(uploadTaskActive.name,
                      uploadTaskActive.is_alive()))

    # Wait for join all jobs/tasks in the Process Pool
    # All should be done by now!
    for j in TaskPool:
        j.join()
        print('==={!s} (is alive: {!s}).exitcode = {!s}'
              .format(j.name, j.is_alive(), j.exitcode))

@3lixy
Copy link

3lixy commented May 19, 2018

@oPromessa thanks for sharing and nice job on the logging and example.

@slidenerd
Copy link

Why dont you use a counter instead of a list. Each time the function is called, increment the counter by 1 and reset the counter to 0 when the second or minute has changed

@luizfzs
Copy link

luizfzs commented Mar 19, 2020

Any updates regarding licensing of these pieces of code?

@codekiln
Copy link

Note, there are other libraries on PyPI that do this, such as ratelimit

@hugokernel
Copy link

A more simple version but with async compatibility https://gist.github.com/hugokernel/c478d639849e1e772b1395a546100031

@yeus
Copy link

yeus commented Dec 2, 2021

I created a similar asyncio version but using the token bucket algorithm for
max number of calls during/time interval which results in a maximum burst rate. it can be combined with the above rate limiter without burst rate..

https://gist.github.com/yeus/dff02dce88c6da9073425b5309f524dd

@teocns
Copy link

teocns commented Sep 28, 2023

An extended version of @oPromessa 's utility. Thank you for sharing @oPromessa !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment