-
-
Save zmievsa/0459f9011114af8e87e7e347c9a9faf2 to your computer and use it in GitHub Desktop.
Code review screening problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pika import BlockingConnection, ConnectionParameters | |
from sqlalchemy.ext.declarative import declarative_base | |
import uuid | |
from fastapi import FastAPI | |
from pydantic import BaseModel | |
from sqlalchemy import Column, Integer, String, create_engine | |
from fastapi.encoders import jsonable_encoder | |
from sqlalchemy.orm import Session | |
import requests | |
from fastapi.responses import JSONResponse | |
Base = declarative_base() | |
app = FastAPI() | |
class User(Base): | |
__tablename__ = "users" | |
id = Column(Integer, primary_key=True) | |
username = Column(String) | |
age = Column(Integer) | |
class UserCreateRequest(BaseModel): | |
username: str | |
age: int | |
class UserResponse(UserCreateRequest): | |
id: uuid.UUID | |
DATABASE_URL = "sqlite:///./test.db" | |
engine = create_engine(DATABASE_URL) | |
def get_db_session(): | |
db_session = Session(bind=engine) | |
return db_session | |
@app.put("/create-user/") | |
async def create_user(user: UserCreateRequest) -> UserResponse: | |
db_user = User(**user.dict()) | |
db_session = get_db_session() | |
try: | |
db_session.add(db_user) | |
db_session.commit() | |
except: | |
db_session.rollback() | |
return JSONResponse(content=jsonable_encoder({"message": "user already exists"})) | |
finally: | |
db_session.close() | |
internal_service_response = requests.post("http://192.168.54.38/api/notify", data={"user_id": db_user.id}) | |
if internal_service_response.status_code != 200: | |
raise RuntimeError("Internal service failed.") | |
connection = BlockingConnection(ConnectionParameters("192.168.55.34")) | |
channel = connection.channel() | |
channel.queue_declare(queue="user_queue") | |
channel.basic_publish(exchange="", routing_key="user_queue", body=bytes(str(db_user.id))) | |
connection.close() | |
return db_user | |
@app.get("/get-user/") | |
async def get_user(user_id: uuid.UUID) -> UserResponse: | |
db_session = get_db_session() | |
db_user = db_session.get(User, user_id) | |
return db_user |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment