Last active
April 23, 2019 19:15
-
-
Save ddebrunner/b24f0adffd5a66f5302eff8ad9ea3276 to your computer and use it in GitHub Desktop.
Use of functools.lru_cache with Streams metrics in an IBM Streams Python topology.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import lru_cache | |
import threading | |
import streamsx.ec | |
# Example taken from: https://docs.python.org/3/library/functools.html#functools.lru_cache | |
# | |
# With this example the cache will be Python VM wide, thus if | |
# multiple map(fibs_metrics.Fib()) were invoked in the topology they | |
# wouldbe sharing the same cache. | |
class Fib(object): | |
def __init__(self): | |
self._metrics_timer = None | |
@lru_cache() | |
def fib(self, n): | |
if n < 2: | |
return n | |
return self.fib(n-1) + self.fib(n-2) | |
def __call__(self, n): | |
return self.fib(n) | |
def __enter__(self): | |
self._hits = streamsx.ec.CustomMetric(self, 'hits', 'fibs.Fib number of cache hits', 'Counter') | |
self._misses = streamsx.ec.CustomMetric(self, 'misses', 'fibs.Fib number of cache misses', 'Counter') | |
self._maxsize = streamsx.ec.CustomMetric(self, 'maxsize', 'fibs.Fib maximum cache size', 'Gauge') | |
self._currsize = streamsx.ec.CustomMetric(self, 'currsize', 'fibs.Fib current cache size', 'Gauge') | |
self._update_metrics() | |
def __exit__(self, exc_type, exc_value, traceback): | |
if self._metrics_timer: | |
self._metrics_timer.cancel() | |
def __getstate__(self): | |
# Stateless class, avoids metrics being checkpointed. | |
return {} | |
def _update_metrics(self): | |
ci = self.fib.cache_info() | |
self._hits.value = ci.hits | |
self._misses.value = ci.misses | |
self._maxsize.value = ci.maxsize | |
self._currsize.value = ci.currsize | |
self._metrics_timer = threading.Timer(5, self._update_metrics) | |
self._metrics_timer.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from streamsx.topology.topology import Topology | |
import streamsx.topology.context | |
import fibs_metrics | |
def main(): | |
""" | |
Demonstates use of a function decorated with @lru_cache | |
and Streams metrics mirroring the cache info statistics | |
provided by @lru_cache. | |
""" | |
topo = Topology("LRUCacheMetrics") | |
values = topo.source(range(16)) | |
fib_values = values.map(fibs_metrics.Fib()) | |
fib_values.print(tag='fib') | |
cfg = {} | |
j = streamsx.topology.context.submit("DISTRIBUTED", topo, cfg) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Adds to https://gist.github.com/ddebrunner/1437cbbbba0aba9a2bad56771e1829d6 to include Streams metrics for the cache statistics added by
@lru_cache
.