Created
January 29, 2015 06:12
-
-
Save dcramer/3a8b4768dd3abb4732cd to your computer and use it in GitHub Desktop.
sentry-mailgun
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from django.http import HttpResponse | |
from django.views.decorators.cache import never_cache | |
from django.views.decorators.csrf import csrf_exempt | |
from django.views.decorators.http import require_http_methods | |
from email_reply_parser import EmailReplyParser | |
from email.utils import parseaddr | |
from sentry.tasks.email import process_inbound_email | |
from sentry.utils.email import email_to_group_id | |
@never_cache | |
@csrf_exempt | |
@require_http_methods(['POST']) | |
def process_inbound_message(request): | |
to_email = parseaddr(request.POST['To'])[1] | |
from_email = parseaddr(request.POST['From'])[1] | |
try: | |
group_id = email_to_group_id(to_email) | |
except Exception: | |
logging.info('%r is not a valid email address', to_email) | |
return HttpResponse(status=500) | |
payload = EmailReplyParser.parse_reply(request.POST['body-plain']).strip() | |
if not payload: | |
# If there's no body, we don't need to go any further | |
return HttpResponse(status=200) | |
process_inbound_email.delay(from_email, group_id, payload) | |
return HttpResponse(status=201) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mock | |
from sentry.testutils import TestCase | |
from sentry.utils.email import group_id_to_email | |
body_plain = "foo bar" | |
class TestProcessInboundMessage(TestCase): | |
urls = 'getsentry.conf.urls.app' | |
def setUp(self): | |
super(TestProcessInboundMessage, self).setUp() | |
self.event = self.create_event(event_id='a' * 32) | |
self.mailto = group_id_to_email(self.group.pk) | |
@mock.patch('getsentry.web.mailgun.process_inbound_email') | |
def test_simple(self, process_inbound_email): | |
resp = self.client.post('/api/0/mailgun/inbound/', { | |
'To': 'Sentry <%s>' % (self.mailto,), | |
'From': 'David <%s>' % (self.user.email,), | |
'body-plain': body_plain, | |
}) | |
assert resp.status_code == 201 | |
process_inbound_email.delay.assert_called_once_with( | |
self.user.email, | |
self.group.id, | |
body_plain, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment