Skip to content

Instantly share code, notes, and snippets.

@dcramer
Created January 29, 2015 06:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcramer/3a8b4768dd3abb4732cd to your computer and use it in GitHub Desktop.
Save dcramer/3a8b4768dd3abb4732cd to your computer and use it in GitHub Desktop.
sentry-mailgun
import logging
from django.http import HttpResponse
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from email_reply_parser import EmailReplyParser
from email.utils import parseaddr
from sentry.tasks.email import process_inbound_email
from sentry.utils.email import email_to_group_id
@never_cache
@csrf_exempt
@require_http_methods(['POST'])
def process_inbound_message(request):
to_email = parseaddr(request.POST['To'])[1]
from_email = parseaddr(request.POST['From'])[1]
try:
group_id = email_to_group_id(to_email)
except Exception:
logging.info('%r is not a valid email address', to_email)
return HttpResponse(status=500)
payload = EmailReplyParser.parse_reply(request.POST['body-plain']).strip()
if not payload:
# If there's no body, we don't need to go any further
return HttpResponse(status=200)
process_inbound_email.delay(from_email, group_id, payload)
return HttpResponse(status=201)
import mock
from sentry.testutils import TestCase
from sentry.utils.email import group_id_to_email
body_plain = "foo bar"
class TestProcessInboundMessage(TestCase):
urls = 'getsentry.conf.urls.app'
def setUp(self):
super(TestProcessInboundMessage, self).setUp()
self.event = self.create_event(event_id='a' * 32)
self.mailto = group_id_to_email(self.group.pk)
@mock.patch('getsentry.web.mailgun.process_inbound_email')
def test_simple(self, process_inbound_email):
resp = self.client.post('/api/0/mailgun/inbound/', {
'To': 'Sentry <%s>' % (self.mailto,),
'From': 'David <%s>' % (self.user.email,),
'body-plain': body_plain,
})
assert resp.status_code == 201
process_inbound_email.delay.assert_called_once_with(
self.user.email,
self.group.id,
body_plain,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment