Created
April 17, 2019 15:57
-
-
Save mistotebe/7bbc48ce20c8dc82f6885b370353be60 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import ldap | |
import ldap_wrapper | |
async def search(client): | |
search = client.search("cn=monitor", ldap.SCOPE_SUBTREE, attrlist=['+', '*']) | |
try: | |
async for message in search: | |
print(message) | |
result = await search | |
print(search.full_result) | |
except asyncio.CancelledError: | |
print("Search cancelled") | |
async def main(): | |
"Main method" | |
client = wrapper.AsyncClient("ldap://:1390") | |
search1 = search(client) | |
search2 = search(client) | |
await asyncio.gather(search1, search2) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"python ldap asyncio wrapper" | |
import asyncio | |
import collections | |
import collections.abc | |
import ldap | |
import ldap.ldapobject | |
import pprint | |
LDAPMessage = collections.namedtuple('LDAPMessage', ['msgtype', 'data', 'msgid', 'decoded_controls', 'name', 'value']) | |
INTERMEDIATE_RESULTS = ( | |
ldap.RES_SEARCH_REFERENCE, | |
ldap.RES_SEARCH_ENTRY, | |
ldap.RES_INTERMEDIATE, | |
) | |
class LDAPRequest(asyncio.Future): | |
"Awaitable/iterable object that corresponds to the request" | |
def __init__(self, client, msgid): | |
super().__init__() | |
self.full_result = None | |
self._client = client | |
self._msgid = msgid | |
self._queue = asyncio.Queue() | |
def on_message(self, message): | |
"Message callback to enqueue the result" | |
if message.msgtype in INTERMEDIATE_RESULTS: | |
self._queue.put_nowait(message) | |
else: | |
self.full_result = message | |
self.set_result(message.msgtype) | |
def cancel(self, abandon=False): | |
if not abandon: | |
self._client.abandon(self._msgid) | |
return super().cancel() | |
async def __aiter__(self): | |
next_item = asyncio.create_task(self._queue.get()) | |
while not self.done(): | |
done, _ = await asyncio.wait({self, next_item}, return_when=asyncio.FIRST_COMPLETED) | |
if next_item in done: | |
yield next_item.result() | |
next_item = asyncio.create_task(self._queue.get()) | |
if next_item.done(): | |
yield next_item.result() | |
else: | |
next_item.cancel() | |
while not self._queue.empty(): | |
yield self._queue.get_nowait() | |
class AsyncClient(ldap.ldapobject.LDAPObject): | |
"Asyncio friendly class" | |
def __init__(self, *args, loop=None, **kwargs): | |
super().__init__(*args, **kwargs) | |
self._in_progress = {} | |
self._loop = loop or asyncio.get_event_loop() | |
self._have_reader = False | |
def _shutdown(self): | |
self._loop.remove_reader(self) | |
self._have_reader = False | |
while self._in_progress: | |
self._in_progress.popitem()[1].cancel() | |
def _read(self): | |
while True: | |
try: | |
result = self.result4(msgid=ldap.RES_ANY, all=0, timeout=0, | |
add_intermediates=1, add_extop=1) | |
except ldap.SERVER_DOWN as e: | |
self._shutdown() | |
break | |
except ldap.LDAPError as e: | |
print("Can't process a result", e) | |
self._shutdown() | |
break | |
message = LDAPMessage._make(result) | |
if message.msgtype is None: | |
break | |
message = LDAPMessage._make(result) | |
if message.msgid == ldap.RES_UNSOLICITED: | |
self._shutdown() | |
else: | |
request = self._in_progress.get(message.msgid) | |
if request: | |
request.on_message(message) | |
def abandon(self, msgid, *args, **kwargs): | |
"Abandon the request and cancel any tasks waiting on it" | |
request = self._in_progress.pop(msgid, None) | |
if request: | |
request.cancel(abandon=True) | |
return super().abandon(msgid, *args, **kwargs) | |
def unbind(self, *args, **kwargs): | |
"Send unbind and cancel all tasks" | |
self._shutdown() | |
return super().unbind_ext(*args, **kwargs) | |
def __send_request(self, name, *args, **kwargs): | |
"Send a request and return the awaitable+iterable object" | |
method_name = name + '_ext' | |
method = getattr(super(), method_name) | |
msgid = method(*args, **kwargs) | |
request = LDAPRequest(self, msgid) | |
self._in_progress[msgid] = request | |
if not self._have_reader: | |
self._loop.add_reader(self, self._read) | |
self._have_reader = True | |
return request | |
def add(self, *args, **kwargs): | |
"Add operation" | |
return self.__send_request('add', *args, **kwargs) | |
def bind(self, *args, **kwargs): | |
"Bind operation" | |
return self.__send_request('bind', *args, **kwargs) | |
def delete(self, *args, **kwargs): | |
"Delete operation" | |
return self.__send_request('delete', *args, **kwargs) | |
def extended(self, *args, **kwargs): | |
"Extended operation" | |
return self.__send_request('extop', *args, **kwargs) | |
def modify(self, *args, **kwargs): | |
"Modify operation" | |
return self.__send_request('modify', *args, **kwargs) | |
def rename(self, *args, **kwargs): | |
"ModRDN operation" | |
return self.__send_request('rename', *args, **kwargs) | |
def search(self, *args, **kwargs): | |
"Search operation" | |
return self.__send_request('search', *args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment