Skip to content

Instantly share code, notes, and snippets.

@MrHassanMurtaza
Last active August 9, 2020 17:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MrHassanMurtaza/6733e72b1fe0ab7165a55a30541dfee9 to your computer and use it in GitHub Desktop.
Save MrHassanMurtaza/6733e72b1fe0ab7165a55a30541dfee9 to your computer and use it in GitHub Desktop.
Poll Azure Storage Queue and Do Multiprocessing
import sys
from os import environ
from datetime import datetime
from multiprocessing import Pool
from azure.storage.queue import QueueServiceClient
CONNECTION_STRING = environ.get('CONNECTION_STRING')
def get_queue_client(queue_name='test-queue'):
""" Get queue poison client """
if not CONNECTION_STRING:
logger.critical('AZURE_STORAGE_CONNECTION_STRING required')
sys.exit(1)
queue_service_client = QueueServiceClient.from_connection_string(
conn_str=CONNECTION_STRING)
return queue_service_client.get_queue_client(queue_name)
def process_message(message):
""" process message from queue """
print(f'Processing message [{message}]')
def poll_queue():
""" Continously poll poison queue to get new messages """
while True:
logger.info('Polling for new messages at {}'.format(
datetime.now().strftime("%I:%M%p on %B %d, %Y")))
messages = queue_client.receive_messages()
p = Pool(2)
p.map(process_message, messages)
p.close()
p.join()
sleep(1000)
if __name__ == "__main__":
queue_client = get_queue_client()
poll_queue()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment