Skip to content

Instantly share code, notes, and snippets.

@AlaShibanAtKlo
Created January 8, 2023 11:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlaShibanAtKlo/effa8d3091cbe569f202306bbb197c04 to your computer and use it in GitHub Desktop.
Save AlaShibanAtKlo/effa8d3091cbe569f202306bbb197c04 to your computer and use it in GitHub Desktop.
from aiocache import Cache
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi import Response
import hashlib
import segment.analytics as analytics
analytics.write_key = 'REDACTED'
class EventItem(BaseModel):
userId: str
eventName: str
class EventCache(BaseModel):
hasCompiled: bool
# @klotho::persist {
# id = "seenCache"
# }
eventCache = Cache(Cache.MEMORY)
# @klotho::expose {
# id = "signalapi"
# target = "public"
# }
app = FastAPI()
@app.post("/event")
async def handleSignalEvent(eventItem: EventItem, response: Response):
userHash = hashlib.md5(eventItem.userId.encode()).hexdigest()
userEvent = EventCache(hasCompiled=True)
userEvent.hasCompiled = True
eventsFound = await eventCache.get(userHash)
if eventsFound is None:
analytics.track(eventItem.userId, 'FIRST_COMPILE')
await eventCache.set(userHash, userEvent)
return {"message": "FIRST_COMPILE"}
else:
return {"message": "ALREADY_COMPILED"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment