Skip to content

Instantly share code, notes, and snippets.

@rorre
Created November 21, 2021 03:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rorre/2e7223af1c43c29414aff1844f004d8e to your computer and use it in GitHub Desktop.
Save rorre/2e7223af1c43c29414aff1844f004d8e to your computer and use it in GitHub Desktop.
Karya Angkatan Importer
from PIL import Image
from pathlib import Path
compressed_dir = Path("assets/compressed")
compressed_dir.mkdir(exist_ok=True)
for p in Path("assets/student").iterdir():
print(p)
im = Image.open(p)
im.thumbnail((720, 720))
im.save(compressed_dir / p.name, quality=90)
from typing import List, Dict
import aiofiles
import gspread
import asyncio
from PIL import Image
from ka_backend.helper.database import database
from ka_backend.helper.files import save_file
from ka_backend.models import House, Student
from aiofiles.os import wrap
from pathlib import Path
from urllib.parse import urlparse
house_instances = {}
gc = gspread.oauth()
# Student Data Form
file = gc.open_by_key("lol")
worksheet = file.get_worksheet(0)
data_diri = worksheet.get_all_values()[1:]
# Introduction Video Form
video_file = gc.open_by_key("asd")
video_ws = video_file.get_worksheet(0)
video_stuffs = video_ws.get_all_values()[1:]
def format_video(url):
video_url = urlparse(url)
# Not youtube :/
if not ("youtu.be" in video_url.netloc or "youtube.com" in video_url.netloc):
return None
if video_url.path.startswith("/shorts/"):
# YouTube Shorts
video_id = video_url.path.split("/")[-1]
return "https://youtu.be/" + video_id
return video_url.geturl()
video_urls: Dict[str, str] = {}
for u in data_diri:
video_urls[u[2]] = format_video(u[14])
for u in video_stuffs:
video_urls[u[2]] = format_video(u[4])
async def update_videos():
"""Updates introduction video entries"""
await database.connect()
async with database.transaction():
for npm, vid in video_urls.items():
if not vid:
continue
print("NPM:", npm)
s = await Student.objects.get_or_none(npm=int(npm))
if not s:
continue
await s.update(video_diri=vid)
async def run():
"""Import everything"""
await database.connect()
async with database.transaction():
change_happen = False
new_users = []
for user in data_diri:
print("User:", user[1])
house_name = user[5]
if house_name not in house_instances:
codename = house_name.split()[-1].lower()
house = await House.objects.get_or_none(nama=house_name)
if not house:
house = await House.objects.create(
nama=house_name, codename=codename
)
house_instances[house_name] = house
house = house_instances[house_name]
kwargs = dict(
npm=user[2],
username=user[3].lower(),
nama=user[1].title(),
jurusan=user[4].lower().replace(" ", "_"),
ttl=f"{user[6].title()}, {user[7]}",
hobi=user[8].title(),
twitter=user[10].replace("@", ""),
line=user[12].replace("@", ""),
instagram=user[11].replace("@", ""),
foto_diri="default-student.png",
video_diri=video_urls.get(user[2], None),
house=house,
message=" \n".join(user[15].splitlines()),
about=" \n".join(user[16].splitlines()),
interests=user[9].split(", "),
)
s = await Student.objects.get_or_none(npm=user[2])
if s:
update = False
kwargs.pop("foto_diri")
for k in kwargs:
if k != "npm" and getattr(s, k) != kwargs[k]:
update = True
change_happen = True
if update:
await s.update(**kwargs)
else:
s = Student(**kwargs)
new_users.append(s)
change_happen = True
await Student.objects.bulk_create(new_users)
if not change_happen:
return
async def import_pics(directory):
"""Import pictures from directory"""
await database.connect()
class FakeUploadFile:
def __init__(self, fname, f):
self.filename = fname
self.read = f.read
files: List[Path] = list(await wrap(Path(directory).iterdir)())
async with database.transaction():
for fname in files:
print(fname)
async with aiofiles.open(fname, "rb") as f:
saved_name = await save_file("Student", FakeUploadFile(fname.name, f))
npm = fname.name[:10]
s = await Student.objects.get_or_none(npm=npm)
if not s:
continue
await s.update(foto_diri=saved_name)
def verify_images(directory):
count = 0
for fname in Path(directory).iterdir():
im = Image.open(str(fname))
aspect = im.width / im.height
if not (aspect <= 1):
print("Landscape:", fname.name, aspect)
count += 1
print(count)
asyncio.run(update_videos())
asyncio.run(run())
asyncio.run(import_pics("fotodiri"))
verify_images("fotodiri")
import mimetypes
import gspread
from pydrive2.drive import GoogleDrive
from pydrive2.auth import GoogleAuth
gc = gspread.oauth()
file = gc.open_by_key("")
worksheet = file.get_worksheet(0)
data_diri = worksheet.get_all_values()[1:]
gauth = GoogleAuth()
gauth.LocalWebserverAuth()
drive = GoogleDrive(gauth)
for u in data_diri:
print("User:", u[1])
url = u[13]
npm = u[2]
drive_id = url.split("=")[-1]
f = drive.CreateFile({"id": drive_id})
name = f["title"]
ext = mimetypes.guess_extension(f["mimeType"])
rename_file = f"{npm}_FotoDiri{ext}"
if name == rename_file:
continue
f["title"] = f"{npm}_FotoDiri{ext}"
f.Upload()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment