Skip to content

Instantly share code, notes, and snippets.

@myui
Last active August 29, 2015 14:07
Show Gist options
  • Save myui/2019e3c1d90745b5e70b to your computer and use it in GitHub Desktop.
Save myui/2019e3c1d90745b5e70b to your computer and use it in GitHub Desktop.

https://github.com/myui/hivemall/wiki/KDDCup-2012-track-2-CTR-prediction-dataset

create table training_libsvmfmt
  ROW FORMAT DELIMITED 
    LINES TERMINATED BY "\n"
  STORED AS TEXTFILE
AS
select 
  concat(label," ",features[1],":1.0 ",features[2],":1.0 ",features[3],":1.0 ",features[4],":1.0 ",features[5],":1.0 ",features[6],":1.0 ",features[7],":1.0 ",features[8],":1.0 ",features[9],":1.0 ",features[10],":1.0 ",features[11],":1.0 ",features[12],":1.0") as line
from
  (select 
      label, sort_array(features) as features
  from 
      training_orcfile
  ) t;
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0) / 1000000000.0 + " seconds")
    result
}

time {
val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:9000/user/hive/warehouse/kdd12track2.db/training_libsvmfmt",  multiclass = false, numFeatures = 16777217, minPartitions = 64).coalesce(128)
val model1 = LogisticRegressionWithSGD.train(training, numIterations = 1)
val rdd1 = sc.parallelize(model1.weights.toArray.zipWithIndex).map{case (x, i) => s"${i+1},$x"}
rdd1.saveAsTextFile("hdfs://dm01:9000/user/hive/warehouse/kdd12track2.db/lrmodel1_spark")
}
create external table lrmodel1_spark (
  feature int, 
  weight double
)
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 STORED AS TEXTFILE
 LOCATION '/user/hive/warehouse/kdd12track2.db/lrmodel1_spark';

drop table lr_predict_spark1;
create table lr_predict_spark1
  ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY "\t"
    LINES TERMINATED BY "\n"
  STORED AS TEXTFILE
as
select
  t.rowid, 
  sigmoid(sum(m.weight)) as prob
from 
  testing_exploded  t LEFT OUTER JOIN
  lrmodel1_spark m ON (t.feature = m.feature)
group by 
  t.rowid
order by 
  rowid ASC;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment