Created
September 28, 2013 01:11
-
-
Save ironpythonbot/6737321 to your computer and use it in GitHub Desktop.
CodePlex Issue #20143 Plain Text Attachments
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# test2_close.py | |
from __future__ import with_statement | |
import types | |
import itertools | |
# Patching itertools.chain: | |
class chain(object): | |
old_chain = itertools.chain | |
def __new__(cls, *args): | |
return cls.old_chain(*args) | |
@staticmethod | |
def from_iterable(i): | |
for iterable in i: | |
for x in iterable: yield x | |
itertools.chain = chain | |
# This exception is never raised. | |
class StopProof(Exception): pass | |
class stopIteratorContext(object): | |
def __init__(self, iterator_context): | |
self.context = iterator_context | |
def __enter__(self): | |
return stopIterator(self.context.__enter__()) | |
def __exit__(self, type, value, tb): | |
self.context.__exit__(type, value, tb) | |
class stopIterator(object): | |
def __init__(self, iterator): | |
self.iterator = iter(iterator) | |
def __iter__(self): return self | |
def next(self): | |
if self.iterator: | |
try: | |
return self.iterator.next() | |
except StopProof: | |
self.iterator = None | |
raise StopIteration | |
class chain_context(object): | |
def __init__(self, outer_it): | |
self.outer_it = outer_iterable(outer_it) | |
def __enter__(self): | |
return itertools.chain.from_iterable(self.outer_it) | |
def __exit__(self, type, value, tb): self.outer_it.close() | |
class outer_iterable(object): | |
def __init__(self, outer_it): | |
self.outer_it = iter(outer_it) | |
self.inner_it = None | |
def __iter__(self): return self | |
def close(self): | |
print "outer_iterable: close: begin, inner_it is", repr(self.inner_it) | |
if hasattr(self.inner_it, '__exit__'): | |
print "outer_iterable: close: inner_it hasattr __exit__ branch" | |
self.inner_it.__exit__(None, None, None) | |
elif hasattr(self.inner_it, 'close'): | |
self.inner_it.close() | |
print "outer_iterable: close: inner_it hasattr close branch" | |
print "outer_iterable: close: outer_it is", repr(self.outer_it) | |
if hasattr(self.outer_it, 'close'): | |
self.outer_it.close() | |
print "outer_iterable: close: outer_it hasattr close branch" | |
def next(self): | |
ans = self.outer_it.next() | |
if hasattr(ans, '__enter__'): | |
self.inner_it = ans | |
return ans.__enter__() | |
ans = iter(ans) | |
self.inner_it = ans | |
return ans | |
def prove(fn_list, arg_list): | |
return stopIteratorContext(chain_context(prove2(fn_list, arg) | |
for arg in arg_list)) | |
def prove2(fn_list, arg): | |
return chain_context(fn(arg) for fn in fn_list) | |
def inner_goal(seq): | |
try: | |
for ans in seq: | |
yield ans | |
except Exception, ex: | |
print "inner_goal: caught", repr(ex) | |
raise | |
finally: | |
print "inner_goal: finally" | |
def outer_goal(dummy): | |
try: | |
for seq in ((1, 2), (3, 4)): | |
with prove((inner_goal,), (seq,)) as it: | |
for ans in it: | |
print "outer_goal: got ans ==", ans | |
yield ans | |
break | |
print "outer_goal: after for" | |
print "outer_goal: after with" | |
finally: | |
print "outer_goal: finally" | |
print "outer_goal: done" | |
def test(): | |
with prove((outer_goal,), (10,)) as it: | |
for ans in it: | |
print "test got:", ans | |
print "test: done" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment