Skip to content

Instantly share code, notes, and snippets.

@ddebrunner
Last active April 23, 2019 19:15
Show Gist options
  • Save ddebrunner/b24f0adffd5a66f5302eff8ad9ea3276 to your computer and use it in GitHub Desktop.
Save ddebrunner/b24f0adffd5a66f5302eff8ad9ea3276 to your computer and use it in GitHub Desktop.
Use of functools.lru_cache with Streams metrics in an IBM Streams Python topology.
from functools import lru_cache
import threading
import streamsx.ec
# Example taken from: https://docs.python.org/3/library/functools.html#functools.lru_cache
#
# With this example the cache will be Python VM wide, thus if
# multiple map(fibs_metrics.Fib()) were invoked in the topology they
# wouldbe sharing the same cache.
class Fib(object):
def __init__(self):
self._metrics_timer = None
@lru_cache()
def fib(self, n):
if n < 2:
return n
return self.fib(n-1) + self.fib(n-2)
def __call__(self, n):
return self.fib(n)
def __enter__(self):
self._hits = streamsx.ec.CustomMetric(self, 'hits', 'fibs.Fib number of cache hits', 'Counter')
self._misses = streamsx.ec.CustomMetric(self, 'misses', 'fibs.Fib number of cache misses', 'Counter')
self._maxsize = streamsx.ec.CustomMetric(self, 'maxsize', 'fibs.Fib maximum cache size', 'Gauge')
self._currsize = streamsx.ec.CustomMetric(self, 'currsize', 'fibs.Fib current cache size', 'Gauge')
self._update_metrics()
def __exit__(self, exc_type, exc_value, traceback):
if self._metrics_timer:
self._metrics_timer.cancel()
def __getstate__(self):
# Stateless class, avoids metrics being checkpointed.
return {}
def _update_metrics(self):
ci = self.fib.cache_info()
self._hits.value = ci.hits
self._misses.value = ci.misses
self._maxsize.value = ci.maxsize
self._currsize.value = ci.currsize
self._metrics_timer = threading.Timer(5, self._update_metrics)
self._metrics_timer.start()
from streamsx.topology.topology import Topology
import streamsx.topology.context
import fibs_metrics
def main():
"""
Demonstates use of a function decorated with @lru_cache
and Streams metrics mirroring the cache info statistics
provided by @lru_cache.
"""
topo = Topology("LRUCacheMetrics")
values = topo.source(range(16))
fib_values = values.map(fibs_metrics.Fib())
fib_values.print(tag='fib')
cfg = {}
j = streamsx.topology.context.submit("DISTRIBUTED", topo, cfg)
if __name__ == '__main__':
main()
@ddebrunner
Copy link
Author

Adds to https://gist.github.com/ddebrunner/1437cbbbba0aba9a2bad56771e1829d6 to include Streams metrics for the cache statistics added by @lru_cache.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment