Skip to content

Instantly share code, notes, and snippets.

@Pyprohly
Last active March 20, 2019 16:54
Show Gist options
  • Save Pyprohly/6a8837812cbf7db96c65306e3a55d2a8 to your computer and use it in GitHub Desktop.
Save Pyprohly/6a8837812cbf7db96c65306e3a55d2a8 to your computer and use it in GitHub Desktop.
Candidate PRAW streaming implementation
from collections.abc import MutableSet, Sequence
from itertools import tee
class OrderedSet(MutableSet, Sequence):
@staticmethod
def _ignore_duplicates(it):
# Ignore duplicates while maintaining order
seen = set()
seen_add = seen.add
for i in it:
if i not in seen:
seen_add(i)
yield i
def __init__(self, items: [iter, int] = None, maxsize=None):
if isinstance(maxsize, int):
if isinstance(items, int):
raise TypeError('Expected either `items` or `maxsize` as int')
if maxsize < 0:
raise ValueError('`maxsize` must be non-negative')
self.maxsize = maxsize
self._fifo = []
self._set = set()
if items is None:
return
elif isinstance(items, int):
self.maxsize = items
else:
it1, it2 = tee(items)
self._fifo = list(self._ignore_duplicates(it1))
self._set = set(it2)
def __repr__(self):
return '%s(%r%s)' % (
type(self).__name__,
self._fifo,
'' if self.maxsize is None else ', maxsize={}'.format(self.maxsize))
def __hash__(self):
return hash((self._fifo, self._set, self.maxsize))
def __contains__(self, item):
return item in self._set
def __iter__(self):
return iter(self._fifo)
def __len__(self):
return len(self._set)
def __getitem__(self, key):
if isinstance(key, slice):
return type(self)(self._fifo[key], maxsize=self.maxsize)
return self._fifo[key]
def __eq__(self, other):
return (self._fifo == other._fifo
and self.maxsize == other.maxsize)
def _integrity_check(self):
len_fifo = len(self._fifo)
if len_fifo != len(self._set):
return False
if self.maxsize is not None and len_fifo > self.maxsize:
return False
for i in self:
if i not in self._set:
return False
return True
def popset(self):
p = self._set.pop()
self._fifo.remove(p)
return p
def poplist(self, index=-1):
p = self._fifo.pop()
self._set.remove(p)
return p
def _evict(self):
if self.maxsize is not None:
while len(self) >= self.maxsize:
self._set.remove(self._fifo.pop(0))
def add(self, value):
if value not in self._set:
self._evict()
self._fifo.append(value)
self._set.add(value)
def discard(self, value):
try:
index = self._fifo.index(value)
except ValueError:
pass
else:
self._set.remove(self._fifo.pop(index))
import sys
import traceback
import threading
import queue
from random import Random
#from praw.models.util import BoundedSet
from boundedset import OrderedSet
class Stream:
limit_upper = 100
limit_lower = 70
before_upper_max = 60
before_lower_max = 38
before_upper_min = 9
before_lower_min = 3
@staticmethod
def _ar(a, b, c):
if b == 0:
return 1
if a == 0 or c > 28:
return 8*c/b
if b > 14 and b - 4 > a:
return 3*b/a
return c/b
@classmethod
def _get_before_range(cls, y):
return (int((cls.before_lower_max - cls.before_lower_min)*y + cls.before_lower_min),
int((cls.before_upper_max - cls.before_upper_min)*y + cls.before_upper_min))
@property
def _activity(self):
return self._activity_influence
@_activity.setter
def _activity(self, value):
self._activity_influence = max(0, min(value, 1))
@property
def fetch_efficiency(self):
return 1 - self._activity
def before_range(self):
return self._get_before_range(self._activity)
def _q_wait(self):
if callable(self.wait_for):
return self.wait_for()
return self.wait_for
def __init__(self, func, batch_size=100, wait_for=1, skip_existing=True, attribute_name='fullname'):
"""
batch_size: the number of items to yield before returning None. If there are fewer
items in the stream queue than batch_size then None may be returned earlier.
If batch_size is set to None then the stream will never yield a None.
(A batch_size of 0, or negative, would return None the whole time.)
wait_for: seconds to wait for the queue to populate when it is empty, before yielding
None. If batch_size is None then this parameter is ignored.
"""
self.func = func
self.wait_for = wait_for
self._attribute_name = attribute_name
self._batch_size = batch_size
self._skip_first_batch = skip_existing
self._limit = self.limit_upper
self._target_qsize = self._limit if batch_size is None else max(batch_size, self._limit)
self._queue = queue.Queue(maxsize=self._limit + self._target_qsize)
self._seen = OrderedSet(301)
self._bubble = 0
self._before = None
self._before_select = 0
self._fetching = False
self._error_last_fetch = False
self._exc_value = None
self._rand = Random()
self.activity_rating_samples = 16
self._activity_rating_list = []
self._fetch_count = None
self._activity = 1
self._before_lower, self._before_upper = self.before_range()
def __iter__(self):
return self
def __next__(self):
if self._exc_value:
self.error_handler(self._exc_value)
if self._batch_size is not None:
if self._bubble >= self._batch_size:
self._bubble = 0
return None
self._bubble += 1
while True:
if not self._fetching:
if self._queue.qsize() < self._target_qsize:
self._fetch_next_batch()
try:
w = self._q_wait()
item = self._queue.get(timeout=w)
except queue.Empty:
if self._batch_size is None:
continue
self._bubble = 0
return None
return item
def _fetch_next_batch(self):
self._fetching = True
thread = threading.Thread(target=self._fetch_procedure)
thread.start()
def _fetch_procedure(self):
fetch_function = lambda: reversed(list(self.func(
limit=self._limit,
params={'before': self._before})))
self._error_last_fetch = False
try:
result = fetch_function()
except Exception as e:
self._error_last_fetch = True
self._exc_value = e
self._fetching = False
return
self._fetch_count = dict.fromkeys(['found', 'unseen'], 0)
stream_seen_add = self._seen.add
stream_queue_put = self._queue.put
if self._skip_first_batch:
self._skip_first_batch = False
for item in result:
self._fetch_count['found'] += 1
attr = getattr(item, self._attribute_name)
stream_seen_add(attr)
self._fetch_count['unseen'] = self._fetch_count['found']
else:
for item in result:
self._fetch_count['found'] += 1
attr = getattr(item, self._attribute_name)
if attr in self._seen:
continue
self._fetch_count['unseen'] += 1
stream_seen_add(attr)
stream_queue_put(item)
#print(self._before_select, self._fetch_count['found'], self._fetch_count['unseen'])
self._adjust_limit()
self._adjust_before()
self._fetching = False
def _adjust_limit(self):
if self._limit <= self.limit_lower:
self._limit = self.limit_upper
else:
self._limit -= 1
def _adjust_before(self):
self._activity_rating_list.append(self._ar(
self._before_select, self._fetch_count['found'], self._fetch_count['unseen']))
if len(self._activity_rating_list) >= self.activity_rating_samples:
a = sum(self._activity_rating_list)/len(self._activity_rating_list)
m = max(0, min(a, 1))
#print(self._activity, m, (self._activity + m)/2)
self._activity = (self._activity + m)/2
#print(self.fetch_efficiency)
self._before_lower, self._before_upper = self.before_range()
self._activity_rating_list.clear()
if (self._error_last_fetch
or self._fetch_count['found'] == 0
or self._activity > .90
or len(self._seen) < self._before_upper
or self._rand.randrange(200) == 0):
self._before_select = 0
self._before = None
else:
self._before_select = self._rand.randrange(self._before_lower, self._before_upper)
self._before = self._seen[-min(self._before_select, len(self._seen))]
def error_handler(self, exception):
print('Ignoring exception in stream.', file=sys.stderr)
traceback.print_exception(type(exception), exception, exception.__traceback__, file=sys.stderr)
def handler(self, name=None):
def decorator(func):
fname = func.__name__ if name is None else name
setattr(self, fname, func)
return func
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment