|
import gc |
|
import sys |
|
import time |
|
import weakref |
|
|
|
|
|
class LeakyReader: |
|
"""Simulates a MetricReader that can cause memory leaks.""" |
|
|
|
def __init__(self, name: str, data_size: int): |
|
self.name = name |
|
self.data = list(range(data_size)) # Simulate memory usage |
|
self._callbacks = [] |
|
|
|
def add_callback(self, callback): |
|
self._callbacks.append(callback) |
|
|
|
def process_metrics(self): |
|
temp_data = [x * 2 for x in self.data] |
|
return len(temp_data) |
|
|
|
def __repr__(self): |
|
return f"LeakyReader({self.name})" |
|
|
|
def __del__(self): |
|
print(f"Destroying {self.name}") |
|
|
|
|
|
class ProblematicProvider: |
|
"""A provider that keeps strong references to readers β causes memory leaks.""" |
|
|
|
def __init__(self): |
|
self.name = "ProblematicProvider" |
|
self._readers = set() |
|
|
|
def add_reader(self, reader: LeakyReader): |
|
self._readers.add(reader) |
|
reader.add_callback(lambda: self.name) # Creates circular reference |
|
|
|
def get_reader_count(self): |
|
return len(self._readers) |
|
|
|
def process_all(self): |
|
return sum(reader.process_metrics() for reader in self._readers) |
|
|
|
|
|
class FixedProvider: |
|
"""A provider that uses weak references β avoids memory leaks.""" |
|
|
|
def __init__(self): |
|
self.name = "FixedProvider" |
|
self._readers = weakref.WeakSet() |
|
|
|
def add_reader(self, reader: LeakyReader): |
|
self._readers.add(reader) |
|
reader.add_callback(lambda: self.name) |
|
|
|
def get_reader_count(self): |
|
return len(self._readers) |
|
|
|
def process_all(self): |
|
return sum(reader.process_metrics() for reader in list(self._readers)) |
|
|
|
|
|
def create_memory_leak_scenario(rounds, readers_per_round, data_size): |
|
"""Demonstrates a memory leak scenario.""" |
|
print("\nRunning memory leak scenario...") |
|
|
|
provider = ProblematicProvider() |
|
|
|
for round_num in range(rounds): |
|
print(f"\nRound {round_num + 1}") |
|
|
|
readers = [] |
|
for i in range(readers_per_round): |
|
reader = LeakyReader(f"LeakReader_{round_num+1}_{i+1}", data_size) |
|
provider.add_reader(reader) |
|
readers.append(reader) |
|
|
|
print(f"Readers created: {len(readers)}") |
|
print(f"Tracked by provider: {provider.get_reader_count()}") |
|
|
|
total = provider.process_all() |
|
print(f"Metrics processed: {total}") |
|
|
|
del readers |
|
gc.collect() |
|
|
|
print(f"After GC: {provider.get_reader_count()} still tracked") |
|
|
|
time.sleep(0.1) |
|
|
|
print(f"\nFinal: {provider.get_reader_count()} readers still in memory (expected: 0)") |
|
|
|
|
|
def create_fixed_scenario(rounds, readers_per_round, data_size): |
|
"""Demonstrates a no-leak scenario using weak references.""" |
|
print("\nRunning fixed scenario...") |
|
|
|
provider = FixedProvider() |
|
|
|
for round_num in range(rounds): |
|
print(f"\nRound {round_num + 1}") |
|
|
|
readers = [] |
|
for i in range(readers_per_round): |
|
reader = LeakyReader(f"FixedReader_{round_num+1}_{i+1}", data_size) |
|
provider.add_reader(reader) |
|
readers.append(reader) |
|
|
|
print(f"Readers created: {len(readers)}") |
|
print(f"Tracked by provider: {provider.get_reader_count()}") |
|
|
|
total = provider.process_all() |
|
print(f"Metrics processed: {total}") |
|
|
|
del readers |
|
gc.collect() |
|
|
|
print(f"After GC: {provider.get_reader_count()} still tracked") |
|
|
|
time.sleep(0.1) |
|
|
|
print(f"\nFinal: {provider.get_reader_count()} readers in memory (expected: 0)") |
|
|
|
|
|
def main(): |
|
# Parameters for the simulation |
|
rounds = 15 |
|
readers_per_round = 10 |
|
data_size = 20_000 |
|
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--leak": |
|
create_memory_leak_scenario(rounds, readers_per_round, data_size) |
|
else: |
|
create_fixed_scenario(rounds, readers_per_round, data_size) |
|
|
|
time.sleep(1) # Allow time for memory profiler to finalize |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |