Skip to content

Instantly share code, notes, and snippets.

@myui
Last active April 19, 2020 17:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save myui/aaeef548a17eb90c4e88f824c3ca1bcd to your computer and use it in GitHub Desktop.
Save myui/aaeef548a17eb90c4e88f824c3ca1bcd to your computer and use it in GitHub Desktop.

Data preparation

-- set mapred.max.split.size=128000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.tez.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.mapjoin.smalltable.filesize=30000000;
-- set hive.optimize.s3.query=true;
set hive.exec.dynamic.partition.mode=nonstrict; 
set hive.optimize.sort.dynamic.partition=false;
create database criteo;
use criteo;

drop table train_raw;
create external table train_raw (
  rowid bigint,
  label int,
  -- quantitative variables
  i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
  -- categorical variables
  c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT 
DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/train_raw/';

drop table test_raw;
create external table test_raw (
  rowid bigint,
  -- quantitative variables
  i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
  -- categorical variables
  c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT 
DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/test_raw/';
awk '{IFS="\t"; OFS="\t"} {print 10000000+NR-1,$0}' train.txt | \
hadoop fs -put - /dataset/criteo/train/train.txt

awk '{IFS="\t"; OFS="\t"} {print 60000000+NR-1,$0}' test.txt | \
hadoop fs -put - /dataset/criteo/test/test.txt

Feature Engineering

drop table train;
create external table train (
  rowid bigint,
  features array<string>,
  label int
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/train/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

INSERT OVERWRITE TABLE train 
select
  rowid, 
  add_field_indicies(array_concat(
    quantitative_features(
      array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
      ln(i1+1),
      ln(i2+4), -- min(i2) is -3
      ln(i3+1),
      ln(i4+1),
      ln(i5+1),
      ln(i6+1),
      ln(i7+1),
      ln(i8+1),
      ln(i9+1),
      ln(i10+1),
      ln(i11+1),
      ln(i12+1),
      ln(i13+1),
      "-emit_null"
    ),
    categorical_features(
      array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
      c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
      "-emit_null -force_value"
    )
  )) as features,
  label
from 
  train_raw
where
  label IS NOT NULL -- rowid 54203165
CLUSTER BY rand(1)
;

drop table test;
create external table test (
  rowid bigint,
  features array<string>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/test/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

INSERT OVERWRITE TABLE test 
select
  rowid, 
  add_field_indicies(array_concat(
    quantitative_features(
      array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
      ln(i1+1),
      ln(i2+4), -- min(i2) is -3
      ln(i3+1),
      ln(i4+1),
      ln(i5+1),
      ln(i6+1),
      ln(i7+1),
      ln(i8+1),
      ln(i9+1),
      ln(i10+1),
      ln(i11+1),
      ln(i12+1),
      ln(i13+1),
      "-emit_null"
    ),
    categorical_features(
      array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
      c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
      "-emit_null -force_value"
    )
  )) as features
from 
  test_raw
;

Training

http://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/emr-hadoop-task-config.html

-- Hive on Tez
SET tez.task.resource.memory.mb=3840;
SET hive.tez.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;

-- Hive on MapReuce
SET mapreduce.map.memory.mb=3840;
SET mapreduce.map.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;

-- use more mappers to avoid OOM in mappers
SET mapred.max.split.size=64000000;
drop table ffm_model;
create external table ffm_model (
  model_id string,
  i int,
  Wi float,
  Vi array<float>
) 
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

use criteo;
add jar tmp/hivemall-core-0.4.2-rc.2-with-dependencies.jar;
source tmp/define-all.hive;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.tez.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.mapjoin.smalltable.filesize=30000000;
SET tez.task.resource.memory.mb=3840;
SET hive.tez.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
SET mapreduce.map.memory.mb=3840;
SET mapreduce.map.java.opts=-server -Xmx3328m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
SET mapred.max.split.size=64000000;

INSERT OVERWRITE TABLE ffm_model
select
  train_ffm(features, label, "-c -iters 10 -factors 4 -feature_hashing 20")
from
  train
;

## Smaller prediction model

drop table ffm_model_int;
create external table ffm_model_int (
  model_id int,
  i int,
  Wi float,
  Vi array<float>
) 
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model_int/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

WITH tmp as (
  select
    train_ffm(features, label, "-c -iters 10 -factors 4 -feature_hashing 20")
      as (model_id, i, wi, vi)
  from 
    train
)
INSERT OVERWRITE TABLE ffm_model_int
select
  cast(model_id as int) as model_id,
  i,
  wi,
  vi
from
  tmp
;

Prediction

drop table testing_exploded;
create external table testing_exploded (
  rowid bigint,
  i int,
  j int,
  xi double,
  xj double
) 
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/testing_exploded/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

INSERT OVERWRITE TABLE testing_exploded
select
  t1.rowid, 
  t2.i,
  t2.j,
  t2.Xi,
  t2.Xj
from
  test t1
  LATERAL VIEW feature_pairs(features, "-ffm -feature_hashing 20") t2 as i, j, Xi, Xj
;

Prediction ensemble

-- ANALYZE TABLE ffm_model_int COMPUTE STATISTICS;
-- ANALYZE TABLE testing_exploded COMPUTE STATISTICS;
ANALYZE TABLE ffm_model_int COMPUTE STATISTICS FOR COLUMNS;
ANALYZE TABLE testing_exploded COMPUTE STATISTICS FOR COLUMNS;

use criteo;
add jar tmp/hivemall-core-0.4.2-rc.2-with-dependencies.jar;
source tmp/define-all.hive;

SET tez.runtime.compress=true;
SET tez.runtime.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- SET tez.runtime.pipelined-shuffle.enabled=true;
-- SET tez.runtime.shuffle.memory-to-memory.enable=true;
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
SET hive.vectorized.execution.reduce.groupby.enabled=true;
SET tez.runtime.shuffle.keep-alive.enabled=true;
SET tez.runtime.optimize.local.fetch=true;
SET tez.runtime.io.sort.mb=512;
SET hive.optimize.correlation=true;
SET hive.optimize.reducededuplication=true;
SET hive.merge.nway.joins=true;
-- SET tez.runtime.pipelined.sorter.lazy-allocate.memory=true;
SET hive.tez.auto.reducer.parallelism=true;
SET hive.exec.reducers.bytes.per.reducer=536870912;
-- SET hive.exec.reducers.bytes.per.reducer=1073741824;
SET tez.am.grouping.split-waves=1.4;
-- SET tez.task.resource.memory.mb=2880;
-- SET hive.tez.java.opts=-server -Xmx2368m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;
-- SET mapreduce.map.memory.mb=2880;
-- SET mapreduce.map.java.opts=-server -Xmx2368m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC;

drop table ffm_predicted;
create table ffm_predicted
  ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY "\t"
    LINES TERMINATED BY "\n"
  STORED AS TEXTFILE
as
WITH predicted as (
  select
    t1.rowid,
    p1.model_id,
    sigmoid(ffm_predict(p1.Wi, p1.Vi, p2.Vi, t1.Xi, t1.Xj)) as score
  from 
    testing_exploded t1
    JOIN ffm_model_int p1 ON (p1.i = t1.i) -- at least p1.i = 0 and t1.i = 0 exists
    LEFT OUTER JOIN ffm_model_int p2 ON (p2.model_id = p1.model_id and p2.i = t1.j)  
  where
    p1.Wi is not null OR p2.Vi is not null
  group by
    t1.rowid, p1.model_id
)
select
  rowid,
  avg(score) as predicted -- classification
  -- avg(score) as predicted -- regression
from
  predicted
group by
  rowid
;

drop table ffm_predicted;
create table ffm_predicted
  ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY "\t"
    LINES TERMINATED BY "\n"
  STORED AS TEXTFILE
as
WITH predicted as (
  select
    t1.rowid,    
    ffm_predict(p1.Wi, p1.Vi, p2.Vi, t1.Xi, t1.Xj) / count(distinct p1.model_id) as score
  from 
    testing_exploded t1
    JOIN ffm_model_int p1 ON (p1.i = t1.i) -- at least p1.i = 0 and t1.i = 0 exists
    LEFT OUTER JOIN ffm_model_int p2 ON (p2.model_id = p1.model_id and p2.i = t1.j)  
  where
    p1.Wi is not null OR p2.Vi is not null
  group by
    t1.rowid
)
select
  rowid,
  sigmoid(score) as predicted -- classification
  -- score as predicted -- regression
from
  predicted
group by
  rowid
;
rm -rf ffm_predicted.*
hadoop fs -getmerge /user/hive/warehouse/criteo.db/ffm_predicted ffm_predicted.tsv
echo "Id,Predicted" > ffm_predicted.submit 
awk '{OFS=","} {print $1,$2}' ffm_predicted.tsv >> ffm_predicted.submit 
zip ffm_predicted.zip ffm_predicted.submit 

Model ensemble

drop table ffm_model_ensemble;
create external table ffm_model_ensemble (
  i int,
  Wi float,
  Vi array<float>
) 
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model_ensemble/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

INSERT OVERWRITE TABLE ffm_model_ensemble
select
  i,
  avg(wi) as wi,
  array_avg(vi) as vi
from
  ffm_model
group by
  i
;

drop table ffm_ensemble_predicted;
create table ffm_ensemble_predicted
  ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY "\t"
    LINES TERMINATED BY "\n"
  STORED AS TEXTFILE
as
WITH predicted as (
  select
    t1.rowid,
    ffm_predict(
      p1.Wi,      
      p1.Vi, -- Vij
      p2.Vi, -- Vji
      t1.Xi,
      t1.Xj
    ) as predicted
  from 
    testing_exploded t1
    JOIN ffm_model_ensemble p1 ON (p1.i = t1.i) -- at least p1.i = 0 and t1.i = 0 exists
    LEFT OUTER JOIN ffm_model_ensemble p2 ON (p2.i = t1.j) 
  where
    p1.Wi is not null OR p2.Vi is not null
  group by
    t1.rowid
)
select
  rowid,
  sigmoid(predicted) as predicted -- classification
  -- predicted as predicted -- regression
from
  predicted
;
hadoop fs -getmerge /user/hive/warehouse/criteo.db/ffm_ensemble_predicted ffm_ensemble_predicted.tsv
echo "Id,Predicted" > ffm_ensemble_predicted.submit 
awk '{OFS=","} {print $1,$2}' ffm_ensemble_predicted.tsv >> ffm_ensemble_predicted.submit 
zip ffm_ensemble_predicted.zip ffm_ensemble_predicted.submit 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment