Created
April 23, 2019 17:13
-
-
Save ddebrunner/1437cbbbba0aba9a2bad56771e1829d6 to your computer and use it in GitHub Desktop.
Use of functools.lru_cache in an IBM Streams Python topology.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import lru_cache | |
# Example taken from: https://docs.python.org/3/library/functools.html#functools.lru_cache | |
# | |
# With this example the cache will be Python VM wide, thus if | |
# multiple map(fibs.fib) were invoked in the topology they | |
# wouldbe sharing the same cache. | |
@lru_cache() | |
def fib(n): | |
if n < 2: | |
return n | |
return fib(n-1) + fib(n-2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from streamsx.topology.topology import Topology | |
import streamsx.topology.context | |
import fibs | |
import time | |
def main(): | |
""" | |
Demonstates use of a function decorated with @lru_cache. | |
The key point is that the decorated function (fibs.fib) | |
must be declared in a separate module to the main application. | |
Apart from running the function with the cache a separate | |
stream with the cache statistics is executed to show the | |
caching is working. | |
""" | |
topo = Topology("LRUCache1") | |
values = topo.source(range(16)) | |
fib_values = values.map(fibs.fib) | |
fib_values.print(tag='fib') | |
# Print out cache stats for fibs.fib. | |
# Note we need to do it as a stream and not locally | |
# as the Streams application is running in a different | |
# Python VM to this local one declaring the application. | |
stats = topo.source(lambda : [time.sleep(3), fibs.fib.cache_info()]) | |
stats.print(tag='stats') | |
cfg = {} | |
j = streamsx.topology.context.submit("STANDALONE", topo, cfg) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample of using
@lru_cache
in an IBM Streams topology.Key point is that the function decorated with
@lru_cache
cannot be a function declaration in the Python main module.https://docs.python.org/3/library/functools.html#functools.lru_cache