Because each document may have multiple category, topic and entity. If we only use the highest confident label, then the complexity can be greatly reduced. After that, document vs category, topic and entity would be many-to-one relationship.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sqlalchemy
from sqlalchemy import MetaData, Table, Column, ForeignKey, Index
from sqlalchemy import String, Integer, Float, DateTime
from outbrain.config import db_file_path
metadata = MetaData()
#--- Atomic object ---
t_document = Table("document", metadata,
Column("document_id", Integer, primary_key=True),
Column("source_id", Integer),
Column("publisher_id", Integer),
Column("publish_time", DateTime),
# category, topic, entity只选择最大confidence_level的那个使用
Column("category_id", Integer),
Column("category_confidence", Float),
Column("topic_id", Integer),
Column("topic_confidence", Float),
Column("entity_id", Integer),
Column("entity_confidence", Float),
)
t_ad = Table("ad", metadata,
Column("ad_id", Integer, primary_key=True),
Column("document_id", Integer),
Column("campaign_id", Integer),
Column("advertiser_id", Integer),
)
#--- Association ---
t_ad_and_document = Table("ad_and_document", metadata,
Column("document_id", Integer, primary_key=True, ForeignKey("document.document_id")),
Column("ad_id", Integer, primary_key=True, ForeignKey("ad.ad_id")),
Column("clicked", Integer),
)
engine = sqlalchemy.create_engine("sqlite:///%s" % db_file_path)
metadata.create_all(engine)
Because ad_id
in clicks_train
and clicks_test
is subset of promoted_content
, so we only need to focus on promoted_content
. This script feed ad_id and it's metadata to database:
def feed_ad():
"""
**中文文档**
建立ad_id的主表
"""
logger.info("Run feed_ad() ...")
logger.info("read promoted_content ...", 1)
p = Path(config.data_dir, "promoted_content.csv", nrows=nrows)
df = pd.read_csv(p.abspath)
logger.info("write to database ...", 1)
df.to_sql(database.t_ad.name, engine, index=False, if_exists="replace")
feed_ad()
Document table is little tricky, first we have to choose highest confident one for category
, topic
and entity
. And then join them by document_id
.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import numpy as np, pandas as pd
from pathlib_mate import Path
from sfm import sqlalchemy_mate, timer
from sqlalchemy import select, func, distinct
from outbrain import config, database
from outbrain.database import engine, metadata
from outbrain.logger import logger
from outbrain.util.iotool import csv_tuple_iterator
nrows = None
#--- Categories, Topics, Entities ---
def pick_highest_confidence(filename):
"""对CSV文件进行处理, 选择confidence_level最高的那个作为document相对应的属性。
"""
logger.info("run pick_highest_confidence(%s) ..." % filename)
p = Path(config.data_dir, filename)
df_raw = pd.read_csv(p.abspath, nrows=nrows)
label_name = df_raw.columns[1]
data = dict()
counter = 0
for document_id, label, confidence_level in zip(
*(array for col, array in df_raw.iteritems()) ):
counter += 1
try:
if confidence_level > data[document_id]["confidence_level"]:
data[document_id][label_name] = label
data[document_id]["conf"] = confidence_level
except:
data[document_id] = {
"document_id": document_id,
label_name: label,
"confidence_level": confidence_level,
}
df = pd.DataFrame(list(data.values()))
return df
def merge_category_topic_entity():
"""将按照confidence_level排序后的三个表进行join, 并把join的结果写入csv。
"""
logger.info("run merge_category_topic_entity() ...")
df_cate = pick_highest_confidence("documents_categories.csv")
df_topic = pick_highest_confidence("documents_topics.csv")
df_entity = pick_highest_confidence("documents_entities.csv")
logger.info("outer join dataframe ...", 1)
df = df_cate.merge(df_topic, how="outer", on="document_id").\
merge(df_entity, how="outer", on="document_id")
df = df[["document_id",
"category_id", "confidence_level_x",
"topic_id", "confidence_level_y",
"entity_id", "confidence_level",]]
df.columns = ["document_id",
"category_id", "category_confidence",
"topic_id", "topic_confidence",
"entity_id", "entity_confidence",]
logger.info("write to csv ...", 1)
df.to_csv(Path(config.temp_data_dir, "documents_labels.csv").abspath, index=False)
# merge_category_topic_entity()
Then we could merge to documents_meta
and feed into database.
def feed_documents():
"""
**中文文档**
建立document_id的主表
"""
logger.info("read documents_meta ...", 1)
p = Path(config.data_dir, "documents_meta.csv")
df_meta = pd.read_csv(p.abspath, parse_dates=["publish_time",], nrows=nrows)
df_meta = df_meta.dropna(subset=["source_id"], inplace=False)
logger.info("read documents_labels ...", 1)
p = Path(config.temp_data_dir, "documents_labels.csv")
df_label = pd.read_csv(p.abspath, nrows=nrows)
logger.info("outer join ...", 1)
df = df_meta.merge(df_label, how="outer", on="document_id")
logger.info("write to database ...", 1)
df.to_sql(database.t_document.name, engine, index=False, if_exists="replace")
feed_documents()