Skip to content

Instantly share code, notes, and snippets.

@zodman zodman/app.py Secret
Last active Apr 17, 2019

Embed
What would you like to do?
small backend with pinpoint
import os
from django_micro import configure, get_app_label
from dotenv import read_dotenv
from django_micro import route, run
from django.http import JsonResponse
import requests
from aws_requests_auth.aws_auth import AWSRequestsAuth
import json
import dj_database_url
from django.contrib import admin
from django.db import models
import datetime
#load enviroment
read_dotenv(override=True)
## CONFIGURATION SECTION
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR,"static")
DEBUG = os.getenv("DEBUG", default=1)
ALLOWED_HOSTS = ['*',] # noqa
DBPATH=os.path.join(BASE_DIR, "app.db")
DATABASES= {
'default': dj_database_url.config(default='sqlite:///{}'.format(DBPATH))
}
## configure SECTION
configure(locals(), django_admin=True)
# {{{ UTILS
def get_auth():
conf = dict(
aws_access_key=os.getenv("AWS_ACCESS_KEY"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_host=os.getenv("AWS_HOST"),
aws_region=os.getenv("AWS_REGION"),
aws_service=os.getenv("AWS_SERVICE"))
return AWSRequestsAuth(**conf)
# models
class Device(models.Model):
device_id = models.CharField(unique=True, max_length=255)
type=models.CharField(max_length=20)
def __str__(self):
return self.device_id
class Meta:
app_label = get_app_label()
unique_together=('device_id','type')
@admin.register(Device)
class DeviceAdmin(admin.ModelAdmin):
list_display=("id", "device_id","type")
route('admin/', admin.site.urls)
# {{{ Views
#@route('v1/apps/', name='apps')
def apps(request):
auth = get_auth()
endpoint = os.getenv("ENDPOINT")
url = "https://{}/v1/apps/".format(endpoint)
r = requests.get(url, auth=auth)
return JsonResponse(r.json())
@route('v1/segments/', name='segments')
def segments(request):
auth = get_auth()
endpoint = os.getenv("ENDPOINT")
app_id = os.getenv("APPLICATION_ID")
url = "https://{}/v1/apps/{}/segments/".format(endpoint, app_id)
r = requests.get(url, auth=auth)
return JsonResponse(r.json())
@route("v1/create/")
def create(request):
import random
auth = get_auth()
endpoint = os.getenv("ENDPOINT")
app_id = os.getenv("APPLICATION_ID")
url = "https://{}/v1/apps/{}/campaigns/".format(endpoint, app_id)
segment=os.getenv("SEGMENT_ID")
now = datetime.datetime.now()
title = request.POST.get("title")
body= request.POST.get("body")
click_action= request.POST.get("click_action")
data = request.POST.get("data")
open_section = request.POST.get("open_section")
payload = {
'Name': 'Campaing {} {}'.format(random.randint(0,100), now.strftime("%Y-%m-%d %H:%M")),
'Description': "Description custom",
'HoldoutPercent':0,
'SegmentId':segment, # SEGMENTNAME: ALL
'MessageConfiguration':{
'DefaultMessage':{
'Action':'OPEN_APP',
'Body':body,
'Title':title,
'SilentPush':False,
'JsonBody': json.dumps({
'Title':title,
'Body':body,
'ClickAction': click_action,
'data':data,
'OPEN_SECTION':open_section,
'aps':{
'alert':'cualquier cosa!',
'badge':1,
'sound':'default',
}
})
}
},
'Schedule': {
'IsLocalTime':False,
'Timezone':'UTC',
'StartTime':'IMMEDIATE',
# 'EndTime':now.isoformat(),
'Frequency': 'ONCE',
}
}
print(payload)
r = requests.post(url,data=json.dumps(payload), auth=auth)
return JsonResponse(r.json())
@route('v1/list/', name='campaings')
def list(request):
auth = get_auth()
endpoint = os.getenv("ENDPOINT")
app_id = os.getenv("APPLICATION_ID")
url = "https://{}/v1/apps/{}/campaigns/".format(endpoint, app_id)
r = requests.get(url, auth=auth)
return JsonResponse(r.json())
@route('v1/info/<int:pk>', name='info')
def info(request,pk=None):
auth = get_auth()
endpoint = os.getenv("ENDPOINT")
app_id = os.getenv("APPLICATION_ID")
if pk:
id=pk
else:
data = json.loads(request.body)
id = data.get("requestid")
url = "https://{}/v1/apps/{}/endpoints/{}".format(endpoint, app_id,id)
r = requests.get(url,auth=auth)
return JsonResponse(r.json())
@route('v1/registerdevice/<str:type>/', name='registerdevice')
def registerdevice(request, type='GCM'):
auth = get_auth()
endpoint = os.getenv("ENDPOINT")
app_id = os.getenv("APPLICATION_ID")
if request.POST.get("device_id"):
device_id = request.POST.get("device_id")
else:
data = json.loads(request.body)
device_id = data["device_id"]
device_obj, created = Device.objects.get_or_create(device_id=device_id, type=type)
url = "https://{}/v1/apps/{}/endpoints/{}".format(endpoint, app_id, device_obj.id)
payload = {
'Address': device_id,
#'Attributes': {},
'ChannelType': "APNS_SANDBOX" if "APNS" in type else type,
#'Demographic': {},
#'EffectiveDate': "",
#'EndpointStatus': '',
#'Location': {},
#'Metrics': {},
#'OptOut': 'ALL',
#'RequestID': '',
##'User': {},
}
r = requests.put(url, data=json.dumps(payload), auth=auth)
r.raise_for_status()
return JsonResponse(r.json())
application = run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.