This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pprint import pprint | |
import boto3 | |
_aws_key = 'KEY' | |
_aws_secret = 'SECRET' | |
_aws_customer_id = '943957561169' | |
_aws_product_id = '9950b668-6d73-4b74-9554-70919bbf9724' | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
from connect.client import ConnectClient | |
client = ConnectClient( | |
api_key=os.environ.get('CONNECT_API_KEY'), | |
endpoint='https://api.conn.rocks/public/v1', | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import models | |
from dj_cqrs.mixins import ReplicaMixin | |
class AccountReplica(ReplicaMixin, models.Model): | |
CQRS_ID = 'account' | |
id = models.IntegerField(primary_key=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CQRS = { | |
'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport', | |
'host': RABBITMQ_HOST, | |
'port': RABBITMQ_PORT, | |
'user': RABBITMQ_USERNAME, | |
'password': RABBITMQ_PASSWORD, | |
'queue': 'replica', # settings used only in replica | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import models | |
from dj_cqrs.mixins import MasterMixin | |
class Order(MasterMixin, models.Model): | |
CQRS_ID = 'order' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from pusher_client import pusher_client | |
async def push(): | |
await pusher_client.trigger('private-foobar', 'my-event', {'message': 'hello world'}) | |
asyncio.run(push()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, Form | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from pusher_client import pusher_client | |
class PusherAuth(BaseModel): | |
channel_name: str |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from dotenv import load_dotenv | |
import pusher | |
from pusher.aiohttp import AsyncIOBackend | |
load_dotenv() | |
pusher_client = pusher.Pusher( | |
app_id=os.getenv('PUSHER_APP_ID'), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<head> | |
<title>CRM</title> | |
</head> | |
<body> | |
<h1>PoC</h1> | |
<p> | |
Publish an event to channel <code>private-foobar</code> | |
with event name <code>my-event</code>; it will appear below: | |
</p> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.db import connection | |
from opencensus.trace.execution_context import get_opencensus_tracer | |
class SQLCorrelationMiddleware: | |
def __init__(self, get_response): | |
self.get_response = get_response | |
def __call__(self, request): |
NewerOlder