-- set mapred.max.split.size=128000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.tez.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.mapjoin.smalltable.filesize=30000000;
-- set hive.optimize.s3.query=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=false;
create database criteo;
use criteo;
drop table train_raw;
create external table train_raw (
rowid bigint,
label int,
-- quantitative variables
i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
-- categorical variables
c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/train_raw/';
drop table test_raw;
create external table test_raw (
rowid bigint,
-- quantitative variables
i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
-- categorical variables
c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/test_raw/';
awk '{IFS="\t"; OFS="\t"} {print 10000000+NR-1,$0}' train.txt | \
hadoop fs -put - /dataset/criteo/train/train.txt
awk '{IFS="\t"; OFS="\t"} {print 60000000+NR-1,$0}' test.txt | \
hadoop fs -put - /dataset/criteo/test/test.txt
drop table train;
create external table train (
rowid bigint,
features array<string>,
label int
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/train/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE train
select
rowid,
add_field_indicies(array_concat(
quantitative_features(
array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
ln(i1+1),
ln(i2+4), -- min(i2) is -3
ln(i3+1),
ln(i4+1),
ln(i5+1),
ln(i6+1),
ln(i7+1),
ln(i8+1),
ln(i9+1),
ln(i10+1),
ln(i11+1),
ln(i12+1),
ln(i13+1),
"-emit_null"
),
categorical_features(
array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
"-emit_null -force_value"
)
)) as features,
label
from
train_raw
where
label IS NOT NULL -- rowid 54203165
CLUSTER BY rand(1)
;
drop table test;
create external table test (
rowid bigint,
features array<string>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/test/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE test
select
rowid,
add_field_indicies(array_concat(
quantitative_features(
array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
ln(i1+1),
ln(i2+4), -- min(i2) is -3
ln(i3+1),
ln(i4+1),
ln(i5+1),
ln(i6+1),
ln(i7+1),
ln(i8+1),
ln(i9+1),
ln(i10+1),
ln(i11+1),
ln(i12+1),
ln(i13+1),
"-emit_null"
),
categorical_features(
array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
"-emit_null -force_value"
)
)) as features
from
test_raw
;
http://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/emr-hadoop-task-config.html
-- Hive on Tez
SET tez.task.resource.memory.mb=3840;
SET hive.tez.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
-- Hive on MapReuce
SET mapreduce.map.memory.mb=3840;
SET mapreduce.map.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
-- use more mappers to avoid OOM in mappers
SET mapred.max.split.size=64000000;
drop table ffm_model;
create external table ffm_model (
model_id string,
i int,
Wi float,
Vi array<float>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
use criteo;
add jar tmp/hivemall-core-0.4.2-rc.2-with-dependencies.jar;
source tmp/define-all.hive;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.tez.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.mapjoin.smalltable.filesize=30000000;
SET tez.task.resource.memory.mb=3840;
SET hive.tez.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
SET mapreduce.map.memory.mb=3840;
SET mapreduce.map.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
SET mapred.max.split.size=64000000;
INSERT OVERWRITE TABLE ffm_model
select
train_ffm(features, label, "-c -iters 10 -factors 4 -feature_hashing 20")
from
train
;
## Smaller prediction model
drop table ffm_model_int;
create external table ffm_model_int (
model_id int,
i int,
Wi float,
Vi array<float>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model_int/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
WITH tmp as (
select
train_ffm(features, label, "-c -iters 10 -factors 4 -feature_hashing 20")
as (model_id, i, wi, vi)
from
train
)
INSERT OVERWRITE TABLE ffm_model_int
select
cast(model_id as int) as model_id,
i,
wi,
vi
from
tmp
;
drop table testing_exploded;
create external table testing_exploded (
rowid bigint,
i int,
j int,
xi double,
xj double
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/testing_exploded/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE testing_exploded
select
t1.rowid,
t2.i,
t2.j,
t2.Xi,
t2.Xj
from
test t1
LATERAL VIEW feature_pairs(features, "-ffm -feature_hashing 20") t2 as i, j, Xi, Xj
;
-- ANALYZE TABLE ffm_model_int COMPUTE STATISTICS;
-- ANALYZE TABLE testing_exploded COMPUTE STATISTICS;
ANALYZE TABLE ffm_model_int COMPUTE STATISTICS FOR COLUMNS;
ANALYZE TABLE testing_exploded COMPUTE STATISTICS FOR COLUMNS;
use criteo;
add jar tmp/hivemall-core-0.4.2-rc.2-with-dependencies.jar;
source tmp/define-all.hive;
SET tez.runtime.compress=true;
SET tez.runtime.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- SET tez.runtime.pipelined-shuffle.enabled=true;
-- SET tez.runtime.shuffle.memory-to-memory.enable=true;
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
SET hive.vectorized.execution.reduce.groupby.enabled=true;
SET tez.runtime.shuffle.keep-alive.enabled=true;
SET tez.runtime.optimize.local.fetch=true;
SET tez.runtime.io.sort.mb=512;
SET hive.optimize.correlation=true;
SET hive.optimize.reducededuplication=true;
SET hive.merge.nway.joins=true;
-- SET tez.runtime.pipelined.sorter.lazy-allocate.memory=true;
SET hive.tez.auto.reducer.parallelism=true;
SET hive.exec.reducers.bytes.per.reducer=536870912;
-- SET hive.exec.reducers.bytes.per.reducer=1073741824;
SET tez.am.grouping.split-waves=1.4;
-- SET tez.task.resource.memory.mb=2880;
-- SET hive.tez.java.opts=-server -Xmx2368m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
-- SET mapreduce.map.memory.mb=2880;
-- SET mapreduce.map.java.opts=-server -Xmx2368m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
drop table ffm_predicted;
create table ffm_predicted
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
STORED AS TEXTFILE
as
WITH predicted as (
select
t1.rowid,
p1.model_id,
sigmoid(ffm_predict(p1.Wi, p1.Vi, p2.Vi, t1.Xi, t1.Xj)) as score
from
testing_exploded t1
JOIN ffm_model_int p1 ON (p1.i = t1.i) -- at least p1.i = 0 and t1.i = 0 exists
LEFT OUTER JOIN ffm_model_int p2 ON (p2.model_id = p1.model_id and p2.i = t1.j)
where
p1.Wi is not null OR p2.Vi is not null
group by
t1.rowid, p1.model_id
)
select
rowid,
avg(score) as predicted -- classification
-- avg(score) as predicted -- regression
from
predicted
group by
rowid
;
drop table ffm_predicted;
create table ffm_predicted
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
STORED AS TEXTFILE
as
WITH predicted as (
select
t1.rowid,
ffm_predict(p1.Wi, p1.Vi, p2.Vi, t1.Xi, t1.Xj) / count(distinct p1.model_id) as score
from
testing_exploded t1
JOIN ffm_model_int p1 ON (p1.i = t1.i) -- at least p1.i = 0 and t1.i = 0 exists
LEFT OUTER JOIN ffm_model_int p2 ON (p2.model_id = p1.model_id and p2.i = t1.j)
where
p1.Wi is not null OR p2.Vi is not null
group by
t1.rowid
)
select
rowid,
sigmoid(score) as predicted -- classification
-- score as predicted -- regression
from
predicted
group by
rowid
;
rm -rf ffm_predicted.*
hadoop fs -getmerge /user/hive/warehouse/criteo.db/ffm_predicted ffm_predicted.tsv
echo "Id,Predicted" > ffm_predicted.submit
awk '{OFS=","} {print $1,$2}' ffm_predicted.tsv >> ffm_predicted.submit
zip ffm_predicted.zip ffm_predicted.submit
drop table ffm_model_ensemble;
create external table ffm_model_ensemble (
i int,
Wi float,
Vi array<float>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model_ensemble/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE ffm_model_ensemble
select
i,
avg(wi) as wi,
array_avg(vi) as vi
from
ffm_model
group by
i
;
drop table ffm_ensemble_predicted;
create table ffm_ensemble_predicted
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
STORED AS TEXTFILE
as
WITH predicted as (
select
t1.rowid,
ffm_predict(
p1.Wi,
p1.Vi, -- Vij
p2.Vi, -- Vji
t1.Xi,
t1.Xj
) as predicted
from
testing_exploded t1
JOIN ffm_model_ensemble p1 ON (p1.i = t1.i) -- at least p1.i = 0 and t1.i = 0 exists
LEFT OUTER JOIN ffm_model_ensemble p2 ON (p2.i = t1.j)
where
p1.Wi is not null OR p2.Vi is not null
group by
t1.rowid
)
select
rowid,
sigmoid(predicted) as predicted -- classification
-- predicted as predicted -- regression
from
predicted
;
hadoop fs -getmerge /user/hive/warehouse/criteo.db/ffm_ensemble_predicted ffm_ensemble_predicted.tsv
echo "Id,Predicted" > ffm_ensemble_predicted.submit
awk '{OFS=","} {print $1,$2}' ffm_ensemble_predicted.tsv >> ffm_ensemble_predicted.submit
zip ffm_ensemble_predicted.zip ffm_ensemble_predicted.submit