Skip to content

Instantly share code, notes, and snippets.

@MaxClerkwell
Created April 25, 2023 08:10
Show Gist options
  • Save MaxClerkwell/2176ef050d26ae6276ed4965e0bd4857 to your computer and use it in GitHub Desktop.
Save MaxClerkwell/2176ef050d26ae6276ed4965e0bd4857 to your computer and use it in GitHub Desktop.
FastAPI to upload .csv and write into Database
import csv
import io
from fastapi import FastAPI, File, UploadFile, HTTPException
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
# Datenbankkonfiguration
DATABASE_URL = "sqlite:///./sensordaten.db"
engine = create_engine(DATABASE_URL)
Base = declarative_base()
# Tabellen
class Sensor(Base):
__tablename__ = "sensors"
id = Column(Integer, primary_key=True)
sensor_id = Column(String, unique=True)
measurements = relationship("Measurement", back_populates="sensor")
class Measurement(Base):
__tablename__ = "measurements"
id = Column(Integer, primary_key=True)
sensor_id = Column(Integer, ForeignKey("sensors.id"))
timestamp = Column(String)
humidity = Column(Float)
sensor = relationship("Sensor", back_populates="measurements")
# Tabellen erstellen
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
app = FastAPI()
@app.post("/uploadcsv/")
async def upload_csv(file: UploadFile = File(...)):
if file.content_type != "text/csv":
raise HTTPException(status_code=400, detail="File must be in CSV format.")
content = (await file.read()).decode("utf-8")
csvfile = csv.reader(io.StringIO(content), delimiter=",")
session = SessionLocal()
try:
for row in csvfile:
sensor_id, timestamp, humidity = row
sensor = session.query(Sensor).filter(Sensor.sensor_id == sensor_id).first()
if not sensor:
sensor = Sensor(sensor_id=sensor_id)
session.add(sensor)
session.flush()
measurement = Measurement(sensor_id=sensor.id, timestamp=timestamp, humidity=float(humidity))
session.add(measurement)
session.flush()
session.commit()
except Exception as e:
session.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
return {"status": "success"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment