Created
July 16, 2025 09:26
-
-
Save Anmol777-coder/8c1b9315027f76e3944919d3cd12ca69 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request, Depends, Query | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse | |
from fastapi.templating import Jinja2Templates | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from dotenv import load_dotenv | |
import shutil | |
import os | |
import uuid | |
import hashlib | |
import sys | |
import base64 | |
import json | |
from typing import List, Optional | |
import uvicorn | |
from datetime import datetime, timedelta | |
from passlib.context import CryptContext | |
from sqlalchemy import create_engine, Column, String, DateTime, Boolean, Integer, Text, Float | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker, Session | |
from pydantic import BaseModel, EmailStr | |
# Import custom modules | |
from auth import OAuth2Manager, getcurrentuser, createaccesstoken | |
from otp_service import OTPService | |
from models import User, UserCreate, UserResponse, TokenResponse, Base | |
# Load environment variables | |
load_dotenv() | |
# Database setup | |
DATABASE_URL = os.getenv("DATABASEURL", "sqlite:///./ainsoft_store.db") | |
engine = create_engine(DATABASE_URL) | |
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
# FastAPI app | |
app = FastAPI(title="Ainsoft Store", description="Made by Anmol Yadav - Student") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Directories | |
UPLOAD_DIR = "uploads" | |
CODE_DIR = "codeuploads" | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
os.makedirs(CODE_DIR, exist_ok=True) | |
os.makedirs("static/css", exist_ok=True) | |
os.makedirs("static/js", exist_ok=True) | |
os.makedirs("templates", exist_ok=True) | |
# Mount static | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads") | |
app.mount("/code", StaticFiles(directory=CODE_DIR), name="code") | |
# Templates | |
templates = Jinja2Templates(directory="templates") | |
# Services | |
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
oauth2_manager = OAuth2Manager() | |
otp_service = OTPService() | |
security = HTTPBearer() | |
# Optional encryption setup | |
from cryptography.fernet import Fernet | |
SECRET_KEY = os.getenv("SECRET_KEY") | |
FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(SECRET_KEY.encode()).digest())) if SECRET_KEY else None | |
def encrypt_value(value: str) -> str: | |
return FERNET.encrypt(value.encode()).decode() if FERNET else value | |
def decrypt_value(value: str) -> str: | |
return FERNET.decrypt(value.encode()).decode() if FERNET else value | |
# Optional Razorpay | |
try: | |
import razorpay | |
RAZORPAY_KEY = os.getenv("RAZORPAYKEYID") | |
RAZORPAY_SECRET = os.getenv("RAZORPAYKEYSECRET") | |
razorpay_client = razorpay.Client(auth=(RAZORPAY_KEY, RAZORPAY_SECRET)) if RAZORPAY_KEY and RAZORPAY_SECRET else None | |
except ImportError: | |
razorpay_client = None | |
# In-memory storage (you can move to DB later) | |
file_store = [] | |
code_store = [] | |
payment_store = [] | |
# Helpers | |
def get_db(): | |
db = SessionLocal() | |
try: | |
yield db | |
finally: | |
db.close() | |
@app.post("/api/upload/file") | |
async def upload_file( | |
file: UploadFile = File(...), | |
title: str = Form(...), | |
description: str = Form(...), | |
category: str = Form(...), | |
price: float = Form(0.0), | |
currentuser: User = Depends(getcurrentuser) | |
): | |
file_id = str(uuid.uuid4()) | |
path = os.path.join(UPLOAD_DIR, f"{file_id}_{file.filename}") | |
with open(path, "wb") as f: | |
shutil.copyfileobj(file.file, f) | |
file_store.append({ | |
"id": file_id, | |
"title": title, | |
"description": description, | |
"category": category, | |
"price": price, | |
"filename": file.filename, | |
"path": path, | |
"uploaded_by": currentuser.email, | |
"uploaded_at": datetime.utcnow().isoformat() | |
}) | |
return {"message": "File uploaded", "file_id": file_id} | |
@app.post("/api/upload/code") | |
async def upload_code( | |
file: UploadFile = File(...), | |
title: str = Form(...), | |
description: str = Form(...), | |
language: str = Form(...), | |
currentuser: User = Depends(getcurrentuser) | |
): | |
file_id = str(uuid.uuid4()) | |
path = os.path.join(CODE_DIR, f"{file_id}_{file.filename}") | |
with open(path, "wb") as f: | |
shutil.copyfileobj(file.file, f) | |
with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
content = f.read() | |
code_store.append({ | |
"id": file_id, | |
"title": title, | |
"description": description, | |
"language": language, | |
"filename": file.filename, | |
"content": content, | |
"path": path, | |
"uploaded_by": currentuser.email, | |
"uploaded_at": datetime.utcnow().isoformat() | |
}) | |
return {"message": "Code uploaded", "code_id": file_id} | |
@app.get("/api/search") | |
async def search_items(query: str = Query(...)): | |
query = query.lower() | |
results = [] | |
for file in file_store: | |
if query in file['title'].lower() or query in file['description'].lower(): | |
results.append({"type": "file", "data": file}) | |
for code in code_store: | |
if query in code['title'].lower() or query in code['description'].lower(): | |
results.append({"type": "code", "data": code}) | |
return {"results": results, "count": len(results)} | |
@app.post("/api/payment/create-order") | |
async def create_order(file_id: str = Form(...), currentuser: User = Depends(getcurrentuser)): | |
if not razorpay_client: | |
raise HTTPException(status_code=500, detail="Razorpay not configured") | |
file = next((f for f in file_store if f['id'] == file_id), None) | |
if not file: | |
raise HTTPException(status_code=404, detail="File not found") | |
if file['price'] <= 0: | |
raise HTTPException(status_code=400, detail="File is free") | |
order = razorpay_client.order.create({ | |
"amount": int(file['price'] * 100), | |
"currency": "INR", | |
"payment_capture": 1 | |
}) | |
payment_store.append({ | |
"user": currentuser.email, | |
"file_id": file_id, | |
"order_id": order['id'], | |
"status": "pending", | |
"created_at": datetime.utcnow().isoformat() | |
}) | |
return {"order_id": order['id'], "amount": file['price'], "currency": "INR"} | |
@app.post("/api/payment/verify") | |
async def verify_payment(order_id: str = Form(...), payment_id: str = Form(...), signature: str = Form(...)): | |
if not razorpay_client: | |
raise HTTPException(status_code=500, detail="Razorpay not configured") | |
try: | |
razorpay_client.utility.verify_payment_signature({ | |
"razorpay_order_id": order_id, | |
"razorpay_payment_id": payment_id, | |
"razorpay_signature": signature | |
}) | |
for record in payment_store: | |
if record['order_id'] == order_id: | |
record['status'] = 'completed' | |
record['payment_id'] = payment_id | |
record['verified_at'] = datetime.utcnow().isoformat() | |
return {"message": "Payment verified"} | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"Verification failed: {str(e)}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment