Skip to content

Instantly share code, notes, and snippets.

@ironpythonbot
Created September 28, 2013 01:11
Show Gist options
  • Save ironpythonbot/6737321 to your computer and use it in GitHub Desktop.
Save ironpythonbot/6737321 to your computer and use it in GitHub Desktop.
CodePlex Issue #20143 Plain Text Attachments
# test2_close.py
from __future__ import with_statement
import types
import itertools
# Patching itertools.chain:
class chain(object):
old_chain = itertools.chain
def __new__(cls, *args):
return cls.old_chain(*args)
@staticmethod
def from_iterable(i):
for iterable in i:
for x in iterable: yield x
itertools.chain = chain
# This exception is never raised.
class StopProof(Exception): pass
class stopIteratorContext(object):
def __init__(self, iterator_context):
self.context = iterator_context
def __enter__(self):
return stopIterator(self.context.__enter__())
def __exit__(self, type, value, tb):
self.context.__exit__(type, value, tb)
class stopIterator(object):
def __init__(self, iterator):
self.iterator = iter(iterator)
def __iter__(self): return self
def next(self):
if self.iterator:
try:
return self.iterator.next()
except StopProof:
self.iterator = None
raise StopIteration
class chain_context(object):
def __init__(self, outer_it):
self.outer_it = outer_iterable(outer_it)
def __enter__(self):
return itertools.chain.from_iterable(self.outer_it)
def __exit__(self, type, value, tb): self.outer_it.close()
class outer_iterable(object):
def __init__(self, outer_it):
self.outer_it = iter(outer_it)
self.inner_it = None
def __iter__(self): return self
def close(self):
print "outer_iterable: close: begin, inner_it is", repr(self.inner_it)
if hasattr(self.inner_it, '__exit__'):
print "outer_iterable: close: inner_it hasattr __exit__ branch"
self.inner_it.__exit__(None, None, None)
elif hasattr(self.inner_it, 'close'):
self.inner_it.close()
print "outer_iterable: close: inner_it hasattr close branch"
print "outer_iterable: close: outer_it is", repr(self.outer_it)
if hasattr(self.outer_it, 'close'):
self.outer_it.close()
print "outer_iterable: close: outer_it hasattr close branch"
def next(self):
ans = self.outer_it.next()
if hasattr(ans, '__enter__'):
self.inner_it = ans
return ans.__enter__()
ans = iter(ans)
self.inner_it = ans
return ans
def prove(fn_list, arg_list):
return stopIteratorContext(chain_context(prove2(fn_list, arg)
for arg in arg_list))
def prove2(fn_list, arg):
return chain_context(fn(arg) for fn in fn_list)
def inner_goal(seq):
try:
for ans in seq:
yield ans
except Exception, ex:
print "inner_goal: caught", repr(ex)
raise
finally:
print "inner_goal: finally"
def outer_goal(dummy):
try:
for seq in ((1, 2), (3, 4)):
with prove((inner_goal,), (seq,)) as it:
for ans in it:
print "outer_goal: got ans ==", ans
yield ans
break
print "outer_goal: after for"
print "outer_goal: after with"
finally:
print "outer_goal: finally"
print "outer_goal: done"
def test():
with prove((outer_goal,), (10,)) as it:
for ans in it:
print "test got:", ans
print "test: done"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment