-
-
Save rorre/2e7223af1c43c29414aff1844f004d8e to your computer and use it in GitHub Desktop.
Karya Angkatan Importer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from PIL import Image | |
from pathlib import Path | |
compressed_dir = Path("assets/compressed") | |
compressed_dir.mkdir(exist_ok=True) | |
for p in Path("assets/student").iterdir(): | |
print(p) | |
im = Image.open(p) | |
im.thumbnail((720, 720)) | |
im.save(compressed_dir / p.name, quality=90) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Dict | |
import aiofiles | |
import gspread | |
import asyncio | |
from PIL import Image | |
from ka_backend.helper.database import database | |
from ka_backend.helper.files import save_file | |
from ka_backend.models import House, Student | |
from aiofiles.os import wrap | |
from pathlib import Path | |
from urllib.parse import urlparse | |
house_instances = {} | |
gc = gspread.oauth() | |
# Student Data Form | |
file = gc.open_by_key("lol") | |
worksheet = file.get_worksheet(0) | |
data_diri = worksheet.get_all_values()[1:] | |
# Introduction Video Form | |
video_file = gc.open_by_key("asd") | |
video_ws = video_file.get_worksheet(0) | |
video_stuffs = video_ws.get_all_values()[1:] | |
def format_video(url): | |
video_url = urlparse(url) | |
# Not youtube :/ | |
if not ("youtu.be" in video_url.netloc or "youtube.com" in video_url.netloc): | |
return None | |
if video_url.path.startswith("/shorts/"): | |
# YouTube Shorts | |
video_id = video_url.path.split("/")[-1] | |
return "https://youtu.be/" + video_id | |
return video_url.geturl() | |
video_urls: Dict[str, str] = {} | |
for u in data_diri: | |
video_urls[u[2]] = format_video(u[14]) | |
for u in video_stuffs: | |
video_urls[u[2]] = format_video(u[4]) | |
async def update_videos(): | |
"""Updates introduction video entries""" | |
await database.connect() | |
async with database.transaction(): | |
for npm, vid in video_urls.items(): | |
if not vid: | |
continue | |
print("NPM:", npm) | |
s = await Student.objects.get_or_none(npm=int(npm)) | |
if not s: | |
continue | |
await s.update(video_diri=vid) | |
async def run(): | |
"""Import everything""" | |
await database.connect() | |
async with database.transaction(): | |
change_happen = False | |
new_users = [] | |
for user in data_diri: | |
print("User:", user[1]) | |
house_name = user[5] | |
if house_name not in house_instances: | |
codename = house_name.split()[-1].lower() | |
house = await House.objects.get_or_none(nama=house_name) | |
if not house: | |
house = await House.objects.create( | |
nama=house_name, codename=codename | |
) | |
house_instances[house_name] = house | |
house = house_instances[house_name] | |
kwargs = dict( | |
npm=user[2], | |
username=user[3].lower(), | |
nama=user[1].title(), | |
jurusan=user[4].lower().replace(" ", "_"), | |
ttl=f"{user[6].title()}, {user[7]}", | |
hobi=user[8].title(), | |
twitter=user[10].replace("@", ""), | |
line=user[12].replace("@", ""), | |
instagram=user[11].replace("@", ""), | |
foto_diri="default-student.png", | |
video_diri=video_urls.get(user[2], None), | |
house=house, | |
message=" \n".join(user[15].splitlines()), | |
about=" \n".join(user[16].splitlines()), | |
interests=user[9].split(", "), | |
) | |
s = await Student.objects.get_or_none(npm=user[2]) | |
if s: | |
update = False | |
kwargs.pop("foto_diri") | |
for k in kwargs: | |
if k != "npm" and getattr(s, k) != kwargs[k]: | |
update = True | |
change_happen = True | |
if update: | |
await s.update(**kwargs) | |
else: | |
s = Student(**kwargs) | |
new_users.append(s) | |
change_happen = True | |
await Student.objects.bulk_create(new_users) | |
if not change_happen: | |
return | |
async def import_pics(directory): | |
"""Import pictures from directory""" | |
await database.connect() | |
class FakeUploadFile: | |
def __init__(self, fname, f): | |
self.filename = fname | |
self.read = f.read | |
files: List[Path] = list(await wrap(Path(directory).iterdir)()) | |
async with database.transaction(): | |
for fname in files: | |
print(fname) | |
async with aiofiles.open(fname, "rb") as f: | |
saved_name = await save_file("Student", FakeUploadFile(fname.name, f)) | |
npm = fname.name[:10] | |
s = await Student.objects.get_or_none(npm=npm) | |
if not s: | |
continue | |
await s.update(foto_diri=saved_name) | |
def verify_images(directory): | |
count = 0 | |
for fname in Path(directory).iterdir(): | |
im = Image.open(str(fname)) | |
aspect = im.width / im.height | |
if not (aspect <= 1): | |
print("Landscape:", fname.name, aspect) | |
count += 1 | |
print(count) | |
asyncio.run(update_videos()) | |
asyncio.run(run()) | |
asyncio.run(import_pics("fotodiri")) | |
verify_images("fotodiri") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mimetypes | |
import gspread | |
from pydrive2.drive import GoogleDrive | |
from pydrive2.auth import GoogleAuth | |
gc = gspread.oauth() | |
file = gc.open_by_key("") | |
worksheet = file.get_worksheet(0) | |
data_diri = worksheet.get_all_values()[1:] | |
gauth = GoogleAuth() | |
gauth.LocalWebserverAuth() | |
drive = GoogleDrive(gauth) | |
for u in data_diri: | |
print("User:", u[1]) | |
url = u[13] | |
npm = u[2] | |
drive_id = url.split("=")[-1] | |
f = drive.CreateFile({"id": drive_id}) | |
name = f["title"] | |
ext = mimetypes.guess_extension(f["mimeType"]) | |
rename_file = f"{npm}_FotoDiri{ext}" | |
if name == rename_file: | |
continue | |
f["title"] = f"{npm}_FotoDiri{ext}" | |
f.Upload() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment