Skip to content

Instantly share code, notes, and snippets.

@isaacl
Last active August 3, 2016 17:46
Show Gist options
  • Save isaacl/9226812 to your computer and use it in GitHub Desktop.
Save isaacl/9226812 to your computer and use it in GitHub Desktop.
import sys,os
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_SDK_PATH'))
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_NDB_PATH'))
import dev_appserver
dev_appserver.fix_sys_path()
from google.appengine.ext import testbed
t = testbed.Testbed()
os.environ['APPLICATION_ID'] = 'foo'
t.activate()
t.init_datastore_v3_stub(use_sqlite=True, datastore_file='100GzipText.sqlite')
t.init_memcache_stub()
import ndb
class Entity(ndb.Model):
val = ndb.TextProperty(compressed=True)
for i in range(100):
e = Entity()
e.val = 'a' * 10000
e.put()
t.deactivate()
import gc
import inspect
import logging
import os
import pprint
import sys
import time
import weakref
from pprint import pprint as pp
logging.basicConfig(level=5,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%H:%M:%S',
filename='/tmp/ndb.%d.log' % int(time.time()),
filemode='w')
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_SDK_PATH'))
sys.path.insert(0, os.environ.get('GOOGLE_APPENGINE_NDB_PATH'))
import dev_appserver
dev_appserver.fix_sys_path()
from google.appengine.ext import testbed
t = testbed.Testbed()
os.environ['APPLICATION_ID'] = 'foo'
t.activate()
t.init_datastore_v3_stub(use_sqlite=True, datastore_file='100GzipText.sqlite')
t.init_memcache_stub()
import ndb
class Entity(ndb.Model):
val = ndb.TextProperty(compressed=True)
wd = weakref.WeakValueDictionary()
ndb.get_context().set_cache_policy(False)
ndb.get_context().set_memcache_policy(False)
# UNCOMMENT TO USE HEAPY
# Some info here: http://smira.ru/wp-content/uploads/2011/08/heapy.html
# from guppy import hpy
# hp = hpy()
# hp.setref()
def fetch_entities(paged=True, batch_size=10):
if paged:
def fetcher():
cursor = None
while True:
entities, cursor, more = Entity.query().fetch_page(batch_size, start_cursor=cursor)
for e in entities:
yield e
if not more:
return
fetch_it = fetcher()
else:
fetch_it = Entity.query().iter(batch_size=batch_size)
total_txt = 0
for i, entity in enumerate(fetch_it):
wd[i] = entity
total_txt += len(entity.val)
# if i % (batch_size // 2) == batch_size//4:
# gc.collect()
# if len(wd) > batch_size * 2:
# break
gc.collect()
return i, total_txt
# HEAPY HELPERS
def ppn(es):
ans = list(es.nodes)
for e in ans[:10]:
print pprint.pformat(e)[:150]
if len(ans) > 10:
print '...', len(ans) - 10
def g1(es):
return iter(es.nodes).next()
def print_gens(gens):
return pp([(g.gi_code,
[t for t in inspect.getmembers(g.gi_frame)
if t[0] in ('f_locals', 'f_lineno')])
for g in gens.nodes])
diff --git a/ndb/context.py b/ndb/context.py
index 00fdd36..b041157 100644
--- a/ndb/context.py
+++ b/ndb/context.py
@@ -826,6 +826,7 @@ class Context(object):
else:
val = callback(ent)
mfut.putq(val)
+ batch = None
except GeneratorExit:
raise
except Exception, err:
diff --git a/ndb/eventloop.py b/ndb/eventloop.py
index 468d37f..2571097 100644
--- a/ndb/eventloop.py
+++ b/ndb/eventloop.py
@@ -42,7 +42,7 @@ class EventLoop(object):
self.idlers = collections.deque() # Cyclic list of (callback, args, kwds)
self.inactive = 0 # How many idlers in a row were no-ops
self.queue = [] # Sorted list of (time, callback, args, kwds)
- self.rpcs = {} # Map of rpc -> (callback, args, kwds)
+ self.rpcs = collections.OrderedDict() # Map of rpc -> (callback, args, kwds)
def clear(self):
"""Remove all pending events without running any."""
@@ -108,6 +108,7 @@ class EventLoop(object):
NOTE: If the rpc is a MultiRpc, the callback will be called once
for each sub-RPC. TODO: Is this a good idea?
"""
+ logging.warning('QUEUE_RPC %r', rpc)
if rpc is None:
return
if rpc.state not in (_RUNNING, _FINISHING):
@@ -129,6 +130,7 @@ class EventLoop(object):
rpcs = [rpc]
for rpc in rpcs:
self.rpcs[rpc] = (callback, args, kwds)
+ import pprint; logging.warning('RPCS now: %s', pprint.pformat(self.rpcs.values()))
def add_idle(self, callback, *args, **kwds):
"""Add an idle callback.
@@ -194,7 +196,7 @@ class EventLoop(object):
return 0
if self.rpcs:
self.inactive = 0
- rpc = datastore_rpc.MultiRpc.wait_any(self.rpcs)
+ rpc = datastore_rpc.MultiRpc.wait_any(reversed(self.rpcs))
if rpc is not None:
_logging_debug('rpc: %s.%s', rpc.service, rpc.method)
# Yes, wait_any() may return None even for a non-empty argument.
@@ -203,6 +205,7 @@ class EventLoop(object):
raise RuntimeError('rpc %r was not given to wait_any as a choice %r' %
(rpc, self.rpcs))
callback, args, kwds = self.rpcs[rpc]
+ logging.warning('RPC FETCHED: %s.%s, %r', rpc.service, rpc.method, callback)
del self.rpcs[rpc]
if callback is not None:
callback(*args, **kwds)
diff --git a/ndb/query.py b/ndb/query.py
index 11e29c1..ed7a425 100644
--- a/ndb/query.py
+++ b/ndb/query.py
@@ -939,6 +939,7 @@ class Query(object):
rpc = batch.next_batch_async(options)
for i, result in enumerate(batch.results):
queue.putq((batch, i, result))
+ batch = None
queue.complete()
except GeneratorExit:
diff --git a/ndb/tasklets.py b/ndb/tasklets.py
index 3214def..724cec9 100644
--- a/ndb/tasklets.py
+++ b/ndb/tasklets.py
@@ -59,12 +59,13 @@ implementing an interface that expects tasklets but you have no need to
suspend -- there's no need to insert a dummy yield in order to make
the tasklet into a generator.
"""
-
+import inspect
import collections
import logging
import os
import sys
import types
+import weakref
from .google_imports import apiproxy_stub_map
from .google_imports import apiproxy_rpc
@@ -103,7 +104,7 @@ class _State(utils.threading_local):
def __init__(self):
super(_State, self).__init__()
- self.all_pending = set()
+ self.all_pending = weakref.WeakSet()
def add_pending(self, fut):
_logging_debug('all_pending: add %s', fut)
@@ -768,14 +769,29 @@ class SerialQueueFuture(Future):
If, instead of complete(), set_exception() is called, the exception
and traceback set there will be used instead of EOFError.
"""
+ i = 0
def __init__(self, info=None):
+ self.id = SerialQueueFuture.i
+ SerialQueueFuture.i+=1
+
self._full = False
self._queue = collections.deque()
self._waiting = collections.deque()
super(SerialQueueFuture, self).__init__(info=info)
# TODO: __repr__
+ def __repr__(self):
+ line = '?'
+ for line in self._where:
+ if 'tasklets.py' not in line:
+ break
+ if self._info:
+ line += ' for %s' % self._info
+ if self._geninfo:
+ line += ' %s' % self._geninfo
+ return '<%d %s %x created by %s>' % (
+ self.id, self.__class__.__name__, id(self), line)
def complete(self):
if self._full:
@@ -795,6 +811,7 @@ class SerialQueueFuture(Future):
waiter.set_exception(exc, tb)
def putq(self, value):
+ logging.warning('PUTQ %r, val %r, caller: %s', self, value, inspect.stack()[1][3])
if isinstance(value, Future):
fut = value
else:
@@ -819,6 +836,7 @@ class SerialQueueFuture(Future):
self._queue.append(fut)
def getq(self):
+ logging.warning('GETQ %r queue %d wait %d, caller %s', self, len(self._queue), len(self._waiting), inspect.stack()[1][3])
if self._queue:
fut = self._queue.popleft()
# TODO: Isn't it better to call self.set_result(None) in complete()?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment