Skip to content

Instantly share code, notes, and snippets.

@jvvk
Created October 20, 2019 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jvvk/66efd0a9adec587f473fcc3daadaccdf to your computer and use it in GitHub Desktop.
Save jvvk/66efd0a9adec587f473fcc3daadaccdf to your computer and use it in GitHub Desktop.
from subprocess import Popen
from datetime import date, datetime
import os
import math
import codecs
from itertools import islice
from sqlalchemy import create_engine, Column, Index
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.sqlite import FLOAT, INTEGER, TEXT
TODAY = date.today().strftime("%d%m%Y")
IMDB_URL = "https://datasets.imdbws.com/"
BASE_DIR = "C:\\vamshi\\programming\\python\\imdb\\data\\"
RAW_DATA_DIR = BASE_DIR + "raw_data_" + TODAY
TSV_DATA_DIR = BASE_DIR + "tsv_data_" + TODAY
DATA_BASE = BASE_DIR + "imdb_" + TODAY + ".db"
CHUNK_SIZE = 100000
def download_files(folder, url):
args = [
"wget",
"-r",
"-np",
"-l",
"1",
"-A",
"gz",
"-nd",
"--no-check-certificate",
"-P",
folder,
url,
]
child = Popen(args)
stdoutdata, stderrdata = child.communicate()
return child.returncode
def extract_gz_Files(srcDir, dstDir):
for filename in os.listdir(srcDir):
if filename.endswith(".gz"):
zipname = os.path.join(srcDir, filename)
child = Popen(
[
"C:\\Program Files\\7-Zip\\7z.exe",
"e",
zipname,
"-o" + dstDir
]
)
stdoutdata, stderrdata = child.communicate()
dstFile = os.path.join(
dstDir, os.path.splitext(os.path.split(zipname)[1])[0]
)
if child.returncode == 0:
try:
os.rename(os.path.join(dstDir, "data.tsv"), dstFile)
except Exception:
print("Couldnt rename file - ", zipname)
else:
print("Couldnt unzip file - ", zipname)
def next_n_lines(file_opened, N):
return [x.strip("\n") for x in islice(file_opened, N)]
def try_convering_to_num(astring):
""" Is the given string an integer? """
try:
n = float(astring)
except ValueError:
return astring
else:
try:
if math.isnan(n):
return ""
else:
return int(n)
except Exception:
return n
class IMDB_Database:
Base = declarative_base()
class Title_akas(Base):
__tablename__ = "title_akas"
titleId = Column(TEXT, primary_key=True)
ordering = Column(INTEGER, primary_key=True)
title = Column(TEXT)
region = Column(TEXT)
language = Column(TEXT)
types = Column(TEXT)
attributes = Column(TEXT)
isOriginalTitle = Column(INTEGER)
class Title_basics(Base):
__tablename__ = "title_basics"
tconst = Column(TEXT, primary_key=True)
titleType = Column(TEXT)
primaryTitle = Column(TEXT)
originalTitle = Column(TEXT)
isAdult = Column(INTEGER)
startYear = Column(INTEGER)
endYear = Column(INTEGER)
runtimeMinutes = Column(INTEGER)
genres = Column(TEXT)
class Title_crew(Base):
__tablename__ = "title_crew"
tconst = Column(TEXT, primary_key=True)
directors = Column(TEXT)
writers = Column(TEXT)
class Title_episode(Base):
__tablename__ = "title_episode"
tconst = Column(TEXT, primary_key=True)
parentTconst = Column(TEXT)
seasonNumber = Column(INTEGER)
episodeNumber = Column(INTEGER)
class Title_principals(Base):
__tablename__ = "title_principals"
tconst = Column(TEXT, primary_key=True)
ordering = Column(INTEGER, primary_key=True)
nconst = Column(TEXT)
category = Column(TEXT)
job = Column(TEXT)
characters = Column(TEXT)
class Title_ratings(Base):
__tablename__ = "title_ratings"
tconst = Column(TEXT, primary_key=True)
averageRating = Column(FLOAT)
numVotes = Column(INTEGER)
class Name_basics(Base):
__tablename__ = "name_basics"
nconst = Column(TEXT, primary_key=True)
primaryName = Column(TEXT)
birthYear = Column(INTEGER)
deathYear = Column(INTEGER)
primaryProfession = Column(TEXT)
knownForTitles = Column(TEXT)
FILE_TABLE_MAP = {
"title.akas.tsv": Title_akas,
"title.basics.tsv": Title_basics,
"title.crew.tsv": Title_crew,
"title.episode.tsv": Title_episode,
"title.principals.tsv": Title_principals,
"title.ratings.tsv": Title_ratings,
"name.basics.tsv": Name_basics,
}
def __init__(self, database):
self.engine = create_engine("sqlite:///" + database)
self.Base.metadata.drop_all(self.engine)
self.Base.metadata.create_all(self.engine)
# create indices
Index("title_crew_id_index", self.Title_ratings.tconst, unique=True)
Index("title_ratings_id_index", self.Title_ratings.tconst, unique=True)
Index("name_id_index", self.Name_basics.tconst, unique=True)
Index("title_id_index", self.Title_basics.tconst, unique=True)
Index("title_akas_id_index", self.Title_akas.titleId)
Index("title_principals_id_index", self.Title_principals.tconst)
Index("title_principals_movie_id_index", self.Title_principals.nconst)
def load_data(self, dataDir):
for file_name in os.listdir(dataDir):
tsv_file_path = os.path.join(dataDir, file_name)
with codecs.open(tsv_file_path, "r", encoding="utf8") as f_in:
fields = next_n_lines(f_in, 1)[0].split("\t")
while True:
lines = next_n_lines(f_in, CHUNK_SIZE)
if not lines:
break
records = []
for line in lines:
values = tuple(
map(
lambda x: try_convering_to_num(x),
["" if f == "\\N" else f
for f in line.split("\t")],
)
)
records.append(dict(zip(fields, values)))
self.engine.execute(
IMDB_Database.FILE_TABLE_MAP[file_name].__table__.insert(),
records,
)
if __name__ == "__main__":
print("start ", datetime.now())
download_files(RAW_DATA_DIR, IMDB_URL)
extract_gz_Files(RAW_DATA_DIR, TSV_DATA_DIR)
imdb = IMDB_Database(DATA_BASE)
imdb.load_data(TSV_DATA_DIR)
print("end ", datetime.now())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment