python simple_queues.py
Currently MongoEngine library only support a single database connection, managing collections by Document classes in global scope.
It has a support to change database connections by switch_db
but it's made by a global class attribute called ._meta['db_alias']
When you define many sub ReferenceFields in a hierarchical tree, only first Document object has its ._meta['db_alias']
atttibute changed, not working to query Documents that implement other documents.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def create_async_results(total_items: int): | |
for r in range(total_items): | |
await asyncio.sleep(1) | |
yield r | |
async def example_of_async_for_loop(): | |
async for item in create_async_results(10): | |
print(item) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from random import randint | |
async def execute_your_task(task_number: int): | |
""" Specific task to run in parallel """ | |
# display begin of parallel task in console | |
print('-> starting parallel task:', task_number) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from random import randint | |
async def execute_your_task(task_number: int): | |
""" Specific task to run in parallel """ | |
# display begin of parallel task in console | |
print('-> starting parallel task:', task_number) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ssh -o "ProxyCommand=nc -X 5 -x 127.0.0.1:9050 %h %p" root@your-domain.com |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def replace_dynamic_variable(element): | |
# If the element is a string, replace "{variable_dynamic}" with "hacked" | |
if isinstance(element, str): | |
element = element.replace("{variable_dynamic}", "hacked") | |
# If the element is a dictionary, iterate through each key-value pair | |
elif isinstance(element, dict): | |
for key, value in element.items(): | |
element[key] = replace_dynamic_variable(value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import traceback | |
# define some sample function with a nonexisting var | |
def my_function(): | |
some_var = nonexisting_var | |
# create dict to store all raised exceptions | |
raised_exceptions = {} | |
# for 3 times execute a broken function (that generates execptions) |
NewerOlder