Skip to content

Instantly share code, notes, and snippets.

@mistotebe
Created April 17, 2019 15:57
Show Gist options
  • Save mistotebe/7bbc48ce20c8dc82f6885b370353be60 to your computer and use it in GitHub Desktop.
Save mistotebe/7bbc48ce20c8dc82f6885b370353be60 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import asyncio
import ldap
import ldap_wrapper
async def search(client):
search = client.search("cn=monitor", ldap.SCOPE_SUBTREE, attrlist=['+', '*'])
try:
async for message in search:
print(message)
result = await search
print(search.full_result)
except asyncio.CancelledError:
print("Search cancelled")
async def main():
"Main method"
client = wrapper.AsyncClient("ldap://:1390")
search1 = search(client)
search2 = search(client)
await asyncio.gather(search1, search2)
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
"python ldap asyncio wrapper"
import asyncio
import collections
import collections.abc
import ldap
import ldap.ldapobject
import pprint
LDAPMessage = collections.namedtuple('LDAPMessage', ['msgtype', 'data', 'msgid', 'decoded_controls', 'name', 'value'])
INTERMEDIATE_RESULTS = (
ldap.RES_SEARCH_REFERENCE,
ldap.RES_SEARCH_ENTRY,
ldap.RES_INTERMEDIATE,
)
class LDAPRequest(asyncio.Future):
"Awaitable/iterable object that corresponds to the request"
def __init__(self, client, msgid):
super().__init__()
self.full_result = None
self._client = client
self._msgid = msgid
self._queue = asyncio.Queue()
def on_message(self, message):
"Message callback to enqueue the result"
if message.msgtype in INTERMEDIATE_RESULTS:
self._queue.put_nowait(message)
else:
self.full_result = message
self.set_result(message.msgtype)
def cancel(self, abandon=False):
if not abandon:
self._client.abandon(self._msgid)
return super().cancel()
async def __aiter__(self):
next_item = asyncio.create_task(self._queue.get())
while not self.done():
done, _ = await asyncio.wait({self, next_item}, return_when=asyncio.FIRST_COMPLETED)
if next_item in done:
yield next_item.result()
next_item = asyncio.create_task(self._queue.get())
if next_item.done():
yield next_item.result()
else:
next_item.cancel()
while not self._queue.empty():
yield self._queue.get_nowait()
class AsyncClient(ldap.ldapobject.LDAPObject):
"Asyncio friendly class"
def __init__(self, *args, loop=None, **kwargs):
super().__init__(*args, **kwargs)
self._in_progress = {}
self._loop = loop or asyncio.get_event_loop()
self._have_reader = False
def _shutdown(self):
self._loop.remove_reader(self)
self._have_reader = False
while self._in_progress:
self._in_progress.popitem()[1].cancel()
def _read(self):
while True:
try:
result = self.result4(msgid=ldap.RES_ANY, all=0, timeout=0,
add_intermediates=1, add_extop=1)
except ldap.SERVER_DOWN as e:
self._shutdown()
break
except ldap.LDAPError as e:
print("Can't process a result", e)
self._shutdown()
break
message = LDAPMessage._make(result)
if message.msgtype is None:
break
message = LDAPMessage._make(result)
if message.msgid == ldap.RES_UNSOLICITED:
self._shutdown()
else:
request = self._in_progress.get(message.msgid)
if request:
request.on_message(message)
def abandon(self, msgid, *args, **kwargs):
"Abandon the request and cancel any tasks waiting on it"
request = self._in_progress.pop(msgid, None)
if request:
request.cancel(abandon=True)
return super().abandon(msgid, *args, **kwargs)
def unbind(self, *args, **kwargs):
"Send unbind and cancel all tasks"
self._shutdown()
return super().unbind_ext(*args, **kwargs)
def __send_request(self, name, *args, **kwargs):
"Send a request and return the awaitable+iterable object"
method_name = name + '_ext'
method = getattr(super(), method_name)
msgid = method(*args, **kwargs)
request = LDAPRequest(self, msgid)
self._in_progress[msgid] = request
if not self._have_reader:
self._loop.add_reader(self, self._read)
self._have_reader = True
return request
def add(self, *args, **kwargs):
"Add operation"
return self.__send_request('add', *args, **kwargs)
def bind(self, *args, **kwargs):
"Bind operation"
return self.__send_request('bind', *args, **kwargs)
def delete(self, *args, **kwargs):
"Delete operation"
return self.__send_request('delete', *args, **kwargs)
def extended(self, *args, **kwargs):
"Extended operation"
return self.__send_request('extop', *args, **kwargs)
def modify(self, *args, **kwargs):
"Modify operation"
return self.__send_request('modify', *args, **kwargs)
def rename(self, *args, **kwargs):
"ModRDN operation"
return self.__send_request('rename', *args, **kwargs)
def search(self, *args, **kwargs):
"Search operation"
return self.__send_request('search', *args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment