Skip to content

Instantly share code, notes, and snippets.

@westover
Last active January 3, 2016 23:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save westover/c663db93372f1e3f207f to your computer and use it in GitHub Desktop.
Save westover/c663db93372f1e3f207f to your computer and use it in GitHub Desktop.
Stream parse SQS spout
from __future__ import absolute_import, print_function, unicode_literals
from streamparse.spout import Spout
import boto
from boto.sqs.connection import SQSError
import threading
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
class SQSSpout(Spout):
"""
Generic SQS pub sub spout
The SQSChannels list can be customized
"""
AWS_ACCESS_KEY_ID = None
AWS_SECRET_ACCESS_KEY = None
QUEUE = ''
def initialize(self, stormconf, context):
self.log(self.AWS_ACCESS_KEY_ID)
self.sqs = self.connect_to_sqs()
self.message_queue = Queue()
self.polling_worker = threading.Thread(
target=self.polling_thread).start()
def connect_to_sqs(self):
"""
Method to handle connecting to sqs
:return: SQS Queue Object
"""
sqs_conn = boto.connect_sqs(
aws_access_key_id=self.AWS_ACCESS_KEY_ID,
aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY)
sqs_queue = sqs_conn.get_queue(self.QUEUE)
if sqs_queue is None:
raise SQSError(404, 'SQS Queue Not Found')
self.log('Connected to SQS {}'.format(sqs_queue.url))
self.log('SQS Queue size: {}'.format(sqs_queue.count()))
return sqs_queue
def polling_thread(self):
"""
SQS polling loop to wait for new messages.
:return: message
"""
while True:
try:
sqs_message = self.sqs.read(wait_time_seconds=20)
if sqs_message is not None:
self.log('Message Received, Raw Message: {}'.format(
sqs_message.get_body()))
self.message_queue.put(sqs_message)
else:
self.log('Ok still waiting')
except SQSError:
self.log('SQS Error!', 'error')
# Timeouts are pretty clear cut but we should refresh our
# connection
self.sqs = self.connect_to_sqs()
def next_tuple(self):
"""
Implement the needed next tuple method to pass the raw sqs messages into the topology
:return:
"""
try:
this_message = self.message_queue.get_nowait()
self.emit([this_message.get_body()])
self.sqs.delete_message(this_message)
except Empty:
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment