Skip to content

Instantly share code, notes, and snippets.

@cjsmeele
Created August 6, 2019 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjsmeele/2606317ecc6f192a7b09faa76184d874 to your computer and use it in GitHub Desktop.
Save cjsmeele/2606317ecc6f192a7b09faa76184d874 to your computer and use it in GitHub Desktop.
Test iRODS python client library query auto-closing
from __future__ import print_function
from irods.session import iRODSSession
from irods.models import Collection, DataObject
from irods.exception import MultipleResultsFound
def make_session():
return iRODSSession(
host = 'localhost',
port = 1247,
user = 'rods',
password = 'rods',
zone = 'tempZone')
# Functions that run a query method that will result in query execution {{{
def test_one(q):
''' Test one(). Query should be closed after fetching one result. '''
try:
q.one()
except MultipleResultsFound:
# irrelevant result
pass
def test_first(q):
''' Test first(). Query should be closed after fetching one result. '''
q.first()
def test_iter(q):
''' Test __iter__() / get_results(). Query should be closed on generator exit (with break). '''
for x in q:
break # single iteration
# }}}
session = None
def run_test(f):
''' Run a test function in a loop, with a simple query as an argument. '''
global session
if not session:
session = make_session()
print('executing ' + f.__name__)
for i in range(100):
try:
f(session.query(Collection.name, DataObject.name))
except Exception as e:
print('... failed after {} successful queries: {}'.format(i, repr(e)))
session = None
return
print('... OK')
run_test(test_one)
run_test(test_first)
run_test(test_iter)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment