-
-
Save Pyprohly/6a8837812cbf7db96c65306e3a55d2a8 to your computer and use it in GitHub Desktop.
Candidate PRAW streaming implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections.abc import MutableSet, Sequence | |
from itertools import tee | |
class OrderedSet(MutableSet, Sequence): | |
@staticmethod | |
def _ignore_duplicates(it): | |
# Ignore duplicates while maintaining order | |
seen = set() | |
seen_add = seen.add | |
for i in it: | |
if i not in seen: | |
seen_add(i) | |
yield i | |
def __init__(self, items: [iter, int] = None, maxsize=None): | |
if isinstance(maxsize, int): | |
if isinstance(items, int): | |
raise TypeError('Expected either `items` or `maxsize` as int') | |
if maxsize < 0: | |
raise ValueError('`maxsize` must be non-negative') | |
self.maxsize = maxsize | |
self._fifo = [] | |
self._set = set() | |
if items is None: | |
return | |
elif isinstance(items, int): | |
self.maxsize = items | |
else: | |
it1, it2 = tee(items) | |
self._fifo = list(self._ignore_duplicates(it1)) | |
self._set = set(it2) | |
def __repr__(self): | |
return '%s(%r%s)' % ( | |
type(self).__name__, | |
self._fifo, | |
'' if self.maxsize is None else ', maxsize={}'.format(self.maxsize)) | |
def __hash__(self): | |
return hash((self._fifo, self._set, self.maxsize)) | |
def __contains__(self, item): | |
return item in self._set | |
def __iter__(self): | |
return iter(self._fifo) | |
def __len__(self): | |
return len(self._set) | |
def __getitem__(self, key): | |
if isinstance(key, slice): | |
return type(self)(self._fifo[key], maxsize=self.maxsize) | |
return self._fifo[key] | |
def __eq__(self, other): | |
return (self._fifo == other._fifo | |
and self.maxsize == other.maxsize) | |
def _integrity_check(self): | |
len_fifo = len(self._fifo) | |
if len_fifo != len(self._set): | |
return False | |
if self.maxsize is not None and len_fifo > self.maxsize: | |
return False | |
for i in self: | |
if i not in self._set: | |
return False | |
return True | |
def popset(self): | |
p = self._set.pop() | |
self._fifo.remove(p) | |
return p | |
def poplist(self, index=-1): | |
p = self._fifo.pop() | |
self._set.remove(p) | |
return p | |
def _evict(self): | |
if self.maxsize is not None: | |
while len(self) >= self.maxsize: | |
self._set.remove(self._fifo.pop(0)) | |
def add(self, value): | |
if value not in self._set: | |
self._evict() | |
self._fifo.append(value) | |
self._set.add(value) | |
def discard(self, value): | |
try: | |
index = self._fifo.index(value) | |
except ValueError: | |
pass | |
else: | |
self._set.remove(self._fifo.pop(index)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import traceback | |
import threading | |
import queue | |
from random import Random | |
#from praw.models.util import BoundedSet | |
from boundedset import OrderedSet | |
class Stream: | |
limit_upper = 100 | |
limit_lower = 70 | |
before_upper_max = 60 | |
before_lower_max = 38 | |
before_upper_min = 9 | |
before_lower_min = 3 | |
@staticmethod | |
def _ar(a, b, c): | |
if b == 0: | |
return 1 | |
if a == 0 or c > 28: | |
return 8*c/b | |
if b > 14 and b - 4 > a: | |
return 3*b/a | |
return c/b | |
@classmethod | |
def _get_before_range(cls, y): | |
return (int((cls.before_lower_max - cls.before_lower_min)*y + cls.before_lower_min), | |
int((cls.before_upper_max - cls.before_upper_min)*y + cls.before_upper_min)) | |
@property | |
def _activity(self): | |
return self._activity_influence | |
@_activity.setter | |
def _activity(self, value): | |
self._activity_influence = max(0, min(value, 1)) | |
@property | |
def fetch_efficiency(self): | |
return 1 - self._activity | |
def before_range(self): | |
return self._get_before_range(self._activity) | |
def _q_wait(self): | |
if callable(self.wait_for): | |
return self.wait_for() | |
return self.wait_for | |
def __init__(self, func, batch_size=100, wait_for=1, skip_existing=True, attribute_name='fullname'): | |
""" | |
batch_size: the number of items to yield before returning None. If there are fewer | |
items in the stream queue than batch_size then None may be returned earlier. | |
If batch_size is set to None then the stream will never yield a None. | |
(A batch_size of 0, or negative, would return None the whole time.) | |
wait_for: seconds to wait for the queue to populate when it is empty, before yielding | |
None. If batch_size is None then this parameter is ignored. | |
""" | |
self.func = func | |
self.wait_for = wait_for | |
self._attribute_name = attribute_name | |
self._batch_size = batch_size | |
self._skip_first_batch = skip_existing | |
self._limit = self.limit_upper | |
self._target_qsize = self._limit if batch_size is None else max(batch_size, self._limit) | |
self._queue = queue.Queue(maxsize=self._limit + self._target_qsize) | |
self._seen = OrderedSet(301) | |
self._bubble = 0 | |
self._before = None | |
self._before_select = 0 | |
self._fetching = False | |
self._error_last_fetch = False | |
self._exc_value = None | |
self._rand = Random() | |
self.activity_rating_samples = 16 | |
self._activity_rating_list = [] | |
self._fetch_count = None | |
self._activity = 1 | |
self._before_lower, self._before_upper = self.before_range() | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if self._exc_value: | |
self.error_handler(self._exc_value) | |
if self._batch_size is not None: | |
if self._bubble >= self._batch_size: | |
self._bubble = 0 | |
return None | |
self._bubble += 1 | |
while True: | |
if not self._fetching: | |
if self._queue.qsize() < self._target_qsize: | |
self._fetch_next_batch() | |
try: | |
w = self._q_wait() | |
item = self._queue.get(timeout=w) | |
except queue.Empty: | |
if self._batch_size is None: | |
continue | |
self._bubble = 0 | |
return None | |
return item | |
def _fetch_next_batch(self): | |
self._fetching = True | |
thread = threading.Thread(target=self._fetch_procedure) | |
thread.start() | |
def _fetch_procedure(self): | |
fetch_function = lambda: reversed(list(self.func( | |
limit=self._limit, | |
params={'before': self._before}))) | |
self._error_last_fetch = False | |
try: | |
result = fetch_function() | |
except Exception as e: | |
self._error_last_fetch = True | |
self._exc_value = e | |
self._fetching = False | |
return | |
self._fetch_count = dict.fromkeys(['found', 'unseen'], 0) | |
stream_seen_add = self._seen.add | |
stream_queue_put = self._queue.put | |
if self._skip_first_batch: | |
self._skip_first_batch = False | |
for item in result: | |
self._fetch_count['found'] += 1 | |
attr = getattr(item, self._attribute_name) | |
stream_seen_add(attr) | |
self._fetch_count['unseen'] = self._fetch_count['found'] | |
else: | |
for item in result: | |
self._fetch_count['found'] += 1 | |
attr = getattr(item, self._attribute_name) | |
if attr in self._seen: | |
continue | |
self._fetch_count['unseen'] += 1 | |
stream_seen_add(attr) | |
stream_queue_put(item) | |
#print(self._before_select, self._fetch_count['found'], self._fetch_count['unseen']) | |
self._adjust_limit() | |
self._adjust_before() | |
self._fetching = False | |
def _adjust_limit(self): | |
if self._limit <= self.limit_lower: | |
self._limit = self.limit_upper | |
else: | |
self._limit -= 1 | |
def _adjust_before(self): | |
self._activity_rating_list.append(self._ar( | |
self._before_select, self._fetch_count['found'], self._fetch_count['unseen'])) | |
if len(self._activity_rating_list) >= self.activity_rating_samples: | |
a = sum(self._activity_rating_list)/len(self._activity_rating_list) | |
m = max(0, min(a, 1)) | |
#print(self._activity, m, (self._activity + m)/2) | |
self._activity = (self._activity + m)/2 | |
#print(self.fetch_efficiency) | |
self._before_lower, self._before_upper = self.before_range() | |
self._activity_rating_list.clear() | |
if (self._error_last_fetch | |
or self._fetch_count['found'] == 0 | |
or self._activity > .90 | |
or len(self._seen) < self._before_upper | |
or self._rand.randrange(200) == 0): | |
self._before_select = 0 | |
self._before = None | |
else: | |
self._before_select = self._rand.randrange(self._before_lower, self._before_upper) | |
self._before = self._seen[-min(self._before_select, len(self._seen))] | |
def error_handler(self, exception): | |
print('Ignoring exception in stream.', file=sys.stderr) | |
traceback.print_exception(type(exception), exception, exception.__traceback__, file=sys.stderr) | |
def handler(self, name=None): | |
def decorator(func): | |
fname = func.__name__ if name is None else name | |
setattr(self, fname, func) | |
return func | |
return decorator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment