Created
October 28, 2022 07:38
-
-
Save rootcss/734483e1bc2f3de384142a21242f7950 to your computer and use it in GitHub Desktop.
asyncio mongo script for testing in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from contextlib import contextmanager | |
| from motor import motor_asyncio | |
| import time | |
| _times = [] | |
| _results = [] | |
| @contextmanager | |
| def timeit(): | |
| st = time.monotonic() | |
| yield | |
| _times.append(time.monotonic()-st) | |
| def _get_mongo_client(conn_str): | |
| return motor_asyncio.AsyncIOMotorClient( | |
| conn_str, | |
| serverSelectionTimeoutMS=5000, | |
| ) | |
| async def get_server_info(client): | |
| try: | |
| server_info = await client.server_info() | |
| print("server version:", server_info['version']) | |
| except Exception as ex: | |
| print("Unable to connect to the server.", ex) | |
| async def work(db_conn): | |
| # limit the concurrency with asyncio.Semaphore(concurrency_value) contextmanager if needed | |
| with timeit(): | |
| result = await db_conn['mycollection'].insert_one({'hello': 'world'}) | |
| _results.append(result.inserted_id) | |
| async def print_pending_tasks(): | |
| while len(asyncio.all_tasks()) != 2: | |
| print("pending tasks: ", len(asyncio.all_tasks())) | |
| await asyncio.sleep(0.5) | |
| async def run_tasks(db_conn): | |
| tasks = [asyncio.ensure_future(print_pending_tasks())] | |
| tasks += [asyncio.ensure_future(work(db_conn)) for _ in range(10000)] | |
| print("Requested Tasks count: ", len(tasks)) | |
| await asyncio.gather(*tasks) | |
| async def run_all(): | |
| conn_str = "mongodb://localhost:27017" | |
| client = _get_mongo_client(conn_str) | |
| db_conn = client.my_test_db | |
| await run_tasks(db_conn) | |
| if __name__ == '__main__': | |
| """ | |
| How to run: | |
| 1. Install dependent libs. | |
| 2. Install mongodb. | |
| 3. run this script. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| loop.run_until_complete(run_all()) | |
| print( | |
| f"Completed {len(_results)} tasks with avg {sum(_times)/len(_times)} seconds") | |
| print("Fin.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment