Skip to content

Instantly share code, notes, and snippets.

@vhutov
Last active January 16, 2023 23:10
Show Gist options
  • Save vhutov/1f5d6a9c4b259f3b108946aaa6a4fce8 to your computer and use it in GitHub Desktop.
Save vhutov/1f5d6a9c4b259f3b108946aaa6a4fce8 to your computer and use it in GitHub Desktop.
Populate SQL db with tracks data
from sqlalchemy import Column, ForeignKey, MetaData, String, Table
metadata = MetaData()
artists = Table(
"artists", metadata,
Column('uri', String(32), primary_key=True),
Column('name', String(100), nullable=False)
)
tracks = Table(
"tracks", metadata,
Column('uri', String(32), primary_key=True),
Column('name', String(100), nullable=False),
Column('artist_uri', String(32), ForeignKey('artists.uri'), nullable=False)
)
import csv
import itertools
from io import TextIOWrapper
from zipfile import ZipFile
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
from model import artists, metadata, tracks
def connect():
url_object = URL.create(
"mysql+mysqlconnector",
username="user",
password="user123",
host="localhost",
database='music'
)
return create_engine(url_object, echo=True, future=True)
def create_tables(engine):
metadata.create_all(engine)
def lines_stream():
with ZipFile('./scripts/playlist_events.zip', 'r') as file:
files = file.namelist()
for f in files:
with file.open(f, 'r') as myfile:
myfile = TextIOWrapper(myfile)
reader = csv.DictReader(myfile, delimiter=',')
yield from reader
def row_batches(lines_iterator, bs):
while (True):
rows = list(itertools.islice(lines_iterator, bs))
if not rows:
break
yield rows
def to_artist_row(line):
return {
'uri': line['artist_uri'],
'name': line['artist_name']
}
def to_track_row(line):
return {
'uri': line['track_uri'],
'name': line['track_name'],
'artist_uri': line['artist_uri']
}
def prepare_insert(seen_artists, seen_tracks, batch):
artist_rows = [
to_artist_row(r) for r in batch if r['artist_uri'] not in seen_artists
]
track_rows = [
to_track_row(r) for r in batch if r['track_uri'] not in seen_tracks
]
seen_artists.update(r['uri'] for r in artist_rows)
seen_tracks.update(r['uri'] for r in track_rows)
return artist_rows, track_rows
def populate_tables(engine):
batch_size = 1_000
lines = lines_stream()
seen_artists = set()
seen_tracks = set()
for batch, i in zip(row_batches(lines, batch_size), range(0)):
batch_offest = i*batch_size
print(f'Inserting [{batch_offest}, {batch_offest + batch_size}) values')
artist_rows, track_rows = prepare_insert(seen_artists, seen_tracks, batch)
with engine.connect() as connection:
if artist_rows:
connection.execute(
artists.insert().prefix_with('IGNORE'),
artist_rows
)
if track_rows:
connection.execute(
tracks.insert().prefix_with('IGNORE'),
track_rows
)
if artist_rows or track_rows:
connection.commit()
engine = connect()
create_tables(engine)
populate_tables(engine)
import itertools
from elasticsearch import Elasticsearch, helpers
from sqlalchemy import create_engine, select
from sqlalchemy.engine import URL
from model import artists, tracks
def create_db_connection():
url_object = URL.create(
"mysql+mysqlconnector",
username="user",
password="user123",
host="localhost",
database='music'
)
return create_engine(url_object, echo=True, future=True)
def create_elastic():
return Elasticsearch(hosts='http://localhost:9200')
def create_indinces(es):
track_mappings = {
'properties': {
'name': {
'type': 'text',
'analyzer': 'english'
},
'artist': {
'type': 'text',
'analyzer': 'english'
}
}
}
artist_mappings = {
'properties': {
'name': {
'type': 'text',
'analyzer': 'english'
},
}
}
if not es.indices.exists(index='tracks'):
es.indices.create(index='tracks', mappings=track_mappings)
if not es.indices.exists(index='artists'):
es.indices.create(index='artists', mappings=artist_mappings)
def to_es_action_track(item):
return {
'_op_type': 'create',
'_index': 'tracks',
'_id': item['uri'],
'name': item['name'],
'artist': item['artist']
}
def to_es_action_artist(item):
return {
'_op_type': 'create',
'_index': 'artists',
'_id': item['uri'],
'name': item['name']
}
def to_actions_track(batch):
return [to_es_action_track(i) for i in batch if i is not None]
def to_actions_artists(batch):
return [to_es_action_artist(i) for i in batch if i is not None]
def read_tracks(connection):
columns = select(tracks, artists.c.name.label('artist'))
rows = connection.execute(columns.select_from(tracks).join(artists))
return rows
def read_artists(connection):
return connection.execute(artists.select())
def rows_batches(rows, bs=1000):
while True:
batch = list(itertools.islice(rows, bs))
if not batch:
return
yield batch
def index_tracks(engine, es):
with engine.connect() as connection:
rows = read_tracks(connection)
for batch in rows_batches(rows):
print('indexing next batch')
actions = to_actions_track(batch)
resp = helpers.bulk(es, actions=actions)
print(resp)
print('finished indexing tracks')
def index_artists(engine, es):
with engine.connect() as connection:
rows = read_artists(connection)
for batch in rows_batches(rows):
print('indexing next batch')
actions = to_actions_artists(batch)
resp = helpers.bulk(es, actions=actions)
print(resp)
print('finished indexing artists')
engine = create_db_connection()
es = create_elastic()
create_indinces(es)
index_tracks(engine, es)
index_artists(engine, es)
from io import TextIOWrapper
from zipfile import ZipFile
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
index_names = {
'matrix_factorisation': 'matrix_factorisation_similarity.txt',
'mlp': 'multilayer_perceptron_similarity.txt',
'coco': 'coco_similar.txt',
'tracks': 'track_similarity.txt'
}
def populate_index(index_name, file_name):
for l in file_name:
[key, *values] = l.strip().split(' ')
full_key = f'{index_name}:{key}'
r.lpush(full_key, *values)
with ZipFile('./similar.zip', 'r') as file:
files = file.namelist()
for f in files:
for index_name, suffix in index_names.items():
if f.endswith(suffix):
with file.open(f, 'r') as zip_file:
populate_index(index_name, TextIOWrapper(zip_file))
continue
sqlalchemy
mysql-connector-python
redis
elasticsearch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment