Skip to content

Instantly share code, notes, and snippets.

View maxipavlovic's full-sized avatar
🎮
Focusing

Max Kolyubyakin maxipavlovic

🎮
Focusing
View GitHub Profile
@maxipavlovic
maxipavlovic / issue.py
Created March 6, 2024 22:39
AWS MP SearchAgreements Issue
from pprint import pprint
import boto3
_aws_key = 'KEY'
_aws_secret = 'SECRET'
_aws_customer_id = '943957561169'
_aws_product_id = '9950b668-6d73-4b74-9554-70919bbf9724'
@maxipavlovic
maxipavlovic / items_list.py
Created July 24, 2023 15:57
Connect - Get list of all product items
import json
import os
from connect.client import ConnectClient
client = ConnectClient(
api_key=os.environ.get('CONNECT_API_KEY'),
endpoint='https://api.conn.rocks/public/v1',
)
from django.db import models
from dj_cqrs.mixins import ReplicaMixin
class AccountReplica(ReplicaMixin, models.Model):
CQRS_ID = 'account'
id = models.IntegerField(primary_key=True)
CQRS = {
'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
'host': RABBITMQ_HOST,
'port': RABBITMQ_PORT,
'user': RABBITMQ_USERNAME,
'password': RABBITMQ_PASSWORD,
'queue': 'replica', # settings used only in replica
}
from django.db import models
from dj_cqrs.mixins import MasterMixin
class Order(MasterMixin, models.Model):
CQRS_ID = 'order'
import asyncio
from pusher_client import pusher_client
async def push():
await pusher_client.trigger('private-foobar', 'my-event', {'message': 'hello world'})
asyncio.run(push())
from fastapi import FastAPI, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from pusher_client import pusher_client
class PusherAuth(BaseModel):
channel_name: str
import os
from dotenv import load_dotenv
import pusher
from pusher.aiohttp import AsyncIOBackend
load_dotenv()
pusher_client = pusher.Pusher(
app_id=os.getenv('PUSHER_APP_ID'),
<!DOCTYPE html>
<head>
<title>CRM</title>
</head>
<body>
<h1>PoC</h1>
<p>
Publish an event to channel <code>private-foobar</code>
with event name <code>my-event</code>; it will appear below:
</p>
from django.db import connection
from opencensus.trace.execution_context import get_opencensus_tracer
class SQLCorrelationMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):