Skip to content

Instantly share code, notes, and snippets.

@rjurney
Created April 19, 2013 06:40
Show Gist options
  • Save rjurney/5418546 to your computer and use it in GitHub Desktop.
Save rjurney/5418546 to your computer and use it in GitHub Desktop.
TF-IDF.pig uses tfidf.maro.pig to compute TF-IDF scores for the lyric words. After that, classify.pig does a naive bayesian classification using the funcs.py Jython UDF. I spliced TF-IDF in where previously there was MPE. Note: lyrics are top 5,000 words only.
register /me/Software/elephant-bird/pig/target/elephant-bird-pig-3.0.6-SNAPSHOT.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
set elephantbird.jsonloader.nestedLoad 'true'
set default_parallel 4
/* Remove files from previous runs */
rmf /tmp/prior_words.txt
rmf /tmp/prior_genres.txt
rmf /tmp/p_word_given_genre.txt
rmf /tmp/p_genre_given_word.txt
rmf /tmp/test_result.txt
rmf /tmp/final.txt
rmf /tmp/scores_by_genre.txt
rmf /tmp/guess_category.txt
rmf /tmp/genre_lyrics.txt
/* Register Jython UDFs */
REGISTER 'funcs.py' USING jython AS funcs;
/* Load the lyrics TRAINING data - that we'll use to train our classifier, with ElephantBird's JsonLoader as nested maps */
lyrics_train = LOAD 'data/mxm_dataset_train.json' USING com.twitter.elephantbird.pig.load.JsonLoader() as json:map[];
lyrics = FOREACH lyrics_train GENERATE (chararray)$0#'track_id' as track_id,
$0#'word_counts' as word_counts:bag{t:tuple(m:map[])};
/* Load the Genre dataset, to join against the lyrics data for our training labels */
genres = LOAD 'data/msd_genre_dataset.csv' USING PigStorage(',') AS (genre:chararray,
track_id:chararray,
artist_name:chararray,
title:chararray,
loudness:chararray,
tempo:double,
time_signature:double,
key:double,
mode:double,
duration:double,
avg_timbre1:double,
avg_timbre2:double,
avg_timbre3:double,
avg_timbre4:double,
avg_timbre5:double,
avg_timbre6:double,
avg_timbre7:double,
avg_timbre8:double,
avg_timbre9:double,
avg_timbre10:double,
avg_timbre11:double,
avg_timbre12:double,
var_timbre1:double,
var_timbre2:double,
var_timbre3:double,
var_timbre4:double,
var_timbre5:double,
var_timbre6:double,
var_timbre7:double,
var_timbre8:double,
var_timbre9:double,
var_timbre10:double,
var_timbre11:double,
var_timbre12:double);
/* Trim un-needed fields and join lyrics and genres, then trim again*/
genre_artist = FOREACH genres GENERATE genre, track_id;
genre_lyrics = JOIN genre_artist BY track_id, lyrics BY track_id;
genre_lyrics = FOREACH genre_lyrics GENERATE genre as genre,
word_counts as word_counts;
/* Project word_counts so that we get a genre/word pair for each word, then remove/flatten inner Map (from elephant-bird) */
flat_lyrics = FOREACH genre_lyrics GENERATE genre,
FLATTEN(word_counts);
flatter_lyrics = FOREACH flat_lyrics GENERATE (chararray)$1#'word' AS word:chararray,
(int)$1#'total' AS total:int,
genre;
STORE flatter_lyrics into '/tmp/genre_lyrics.txt';
/* Get the prior probability of each genre: P(genre) by taking the total times this word appeared in this genre, */
/* divided by the total words in this genre */
per_genre_totals = FOREACH (GROUP flatter_lyrics BY genre) GENERATE group AS genre,
SUM(flatter_lyrics.total) AS genre_total;
all_doc_total = FOREACH (GROUP flatter_lyrics ALL) GENERATE COUNT_STAR(flatter_lyrics) as all_total;
p_genre = FOREACH per_genre_totals GENERATE genre,
(double)genre_total/(double)all_doc_total.all_total as probability;
/* Now rejoin per-genre totals to lyrics */
with_genre_totals = JOIN flatter_lyrics BY genre, per_genre_totals BY genre;
/* with_genre_totals: { flatter_lyrics::genre: chararray,
flatter_lyrics::total: int,
flatter_lyrics::word: chararray,
per_genre_totals::genre: chararray,
per_genre_totals::genre_total: long} */
/* Trim fields */
with_genre_totals = FOREACH with_genre_totals GENERATE flatter_lyrics::genre as genre,
flatter_lyrics::word as word,
flatter_lyrics::total as word_count,
per_genre_totals::genre_total as genre_total;
/* Calculate P(word|genre) */
p_word_given_genre = FOREACH (GROUP with_genre_totals BY (genre, word))
GENERATE FLATTEN(group) as (genre, word),
(double)SUM(with_genre_totals.word_count)/(double)MAX(with_genre_totals.genre_total) as probability;
--STORE p_word_given_genre INTO '/tmp/p_word_given_genre.txt';
/* We will get P(genre|word) using Naive Bayes: P(A|B) = P(B|A)P(A)/P(B) or P(genre|word) = P(word|genre)P(genre)/P(word) */
/* Join in P(genre) and P(word) to do our maths */
with_p_genre = JOIN p_word_given_genre BY genre, p_genre BY genre;
/* P(word|genre) * P(genre) */
/* raw_p_genre_given_word = FOREACH with_p_genre GENERATE p_word_given_genre::word as word,
p_word_given_genre::genre as genre,
(p_word_given_genre::probability * p_genre::probability) as probability; */
/* HACK: Insert TF-IDF scores to replace probabilities */
raw_p_genre_given_word = LOAD '/tmp/tf_idf_scores.txt' AS (genre:chararray, word:chararray, probability:double);
/* Now group by word and sort genres, for access */
p_genre_given_word = FOREACH (GROUP raw_p_genre_given_word BY word) {
sorted = order raw_p_genre_given_word BY genre;
GENERATE group as word, sorted.(genre, probability) as probabilities;
};
STORE p_genre_given_word INTO '/tmp/p_genre_given_word.txt' using JsonStorage();
/* Now load the test data to compute against, to see the accuracy of our classifier. */
lyrics_test_json = LOAD 'data/mxm_dataset_train.json' USING com.twitter.elephantbird.pig.load.JsonLoader() as json:map[];
lyrics_test = foreach lyrics_test_json generate (chararray)$0#'track_id' as track_id:chararray,
$0#'word_counts' as word_counts:bag{t:tuple(m:map[])};
/* Join genre with test lyrics */
genre_test_lyrics = JOIN genre_artist BY track_id, lyrics_test BY track_id;
genre_test_lyrics = FOREACH genre_test_lyrics GENERATE lyrics_test::track_id as track_id,
genre as genre,
word_counts as word_counts;
/* Flatten the bag out to one pair of track_id/word with genre. */
flat_test_lyrics = FOREACH genre_test_lyrics GENERATE track_id,
genre,
FLATTEN(word_counts);
actual_test_lyrics = FOREACH flat_test_lyrics GENERATE track_id,
genre as actual_genre,
(int)$2#'total' AS total:int,
(chararray)$2#'word' AS word:chararray;
/* Now join our prediction to our bag of values, per word. */
probs_and_test = JOIN p_genre_given_word BY word, actual_test_lyrics BY word;
/* What we have now is the genre probabilities for any given word, as well as the actual genre for a given track_id.
If we group by track_id, we will have gathered the probability for genre membership for each word in a song under
the track_id, which is to say we will have gathered all sets of conditional probabilities for that track. Having done
so, we are in a position to evaluate the probabilities together, to pick which genre is most likely (our prediction),
and to compare it to the actual genre in the test data.
/*
probs_and_test: {
p_genre_given_word::word: chararray,
p_genre_given_word::probabilities: {
(genre: chararray,probability: double)
},
actual_test_lyrics::track_id: chararray,
actual_test_lyrics::actual_genre: chararray,
actual_test_lyrics::total: int,
actual_test_lyrics::word: chararray}
*/
by_track = GROUP probs_and_test BY track_id;
/*
by_track: {
group: chararray,
probs_and_test: {
(
p_genre_given_word::word: chararray,
p_genre_given_word::probabilities: {
(genre: chararray,probability: double)
},
actual_test_lyrics::track_id: chararray,
actual_test_lyrics::actual_genre: chararray,
actual_test_lyrics::total: int,
actual_test_lyrics::word: chararray
)
}
}
*/
test_result = FOREACH by_track GENERATE group AS track_id, funcs.calculate(probs_and_test);
test_result = FOREACH test_result GENERATE track_id, FLATTEN($1);
test_result = FOREACH test_result GENERATE answer,
actual_genre,
((answer == actual_genre) ? 1 : 0) as result;
STORE test_result into '/tmp/test_result.txt';
/* Overall success rate */
final = FOREACH (GROUP test_result BY result) GENERATE group AS result, COUNT_STAR(test_result) AS total;
STORE final INTO '/tmp/final.txt';
/* Success rate by actual genre */
scores_by_genre = FOREACH (GROUP test_result BY (actual_genre, result)) GENERATE FLATTEN(group) AS (actual_genre, result),
COUNT_STAR(test_result) AS total;
STORE scores_by_genre into '/tmp/scores_by_genre.txt';
/* Guess rate by genre */
guess_category = FOREACH (GROUP test_result BY (answer, result)) GENERATE FLATTEN(group) AS (answer, result),
COUNT_STAR(test_result) AS total;
STORE guess_category INTO '/tmp/guess_category.txt';
# probs_and_test: {
# (
# p_genre_given_word::word: chararray,
# p_genre_given_word::probabilities: {
# (genre: chararray,probability: double)
# },
# actual_test_lyrics::track_id: chararray,
# actual_test_lyrics::actual_genre: chararray,
# actual_test_lyrics::total: int,
# actual_test_lyrics::word: chararray
# )
# }
from collections import defaultdict
@outputSchema("tuple:(answer:chararray, actual_genre:chararray)")
def calculate(bag):
# For all tuples in the bag...
result = {}
actual_genre = ''
answer = ''
for tup in bag:
word = tup[0]; probs = tup[1]; track_id = tup[2];
actual_genre = tup[3]; total = tup[4];
# Loop through the probability tuples, multiplying each in series
for ptup in probs:
genre = ptup[0]; prob = ptup[1]
result.setdefault(genre, 1)
result[genre] *= prob
# Pick the top probability after multiplying
answer = sorted(result, key=result.get, reverse=True)[0]
return (answer, actual_genre)
register /me/Software/elephant-bird/pig/target/elephant-bird-pig-3.0.6-SNAPSHOT.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
set elephantbird.jsonloader.nestedLoad 'true'
set default_parallel 4
/* Macros are powerful! */
import 'tfidf.macro.pig';
rmf /tmp/term_counts.txt
rmf /tmp/tf_idf_scores.txt
rmf /tmp/top_scores.txt
/* Load the lyrics TRAINING data - that we'll use to train our classifier, with ElephantBird's JsonLoader as nested maps */
lyrics_train = LOAD 'data/mxm_dataset_train.json' USING com.twitter.elephantbird.pig.load.JsonLoader() as json:map[];
lyrics = FOREACH lyrics_train GENERATE (chararray)$0#'track_id' as track_id,
$0#'word_counts' as word_counts:bag{t:tuple(m:map[])};
/* Load the Genre dataset, to join against the lyrics data for our training labels */
genres = LOAD 'data/msd_genre_dataset.csv' USING PigStorage(',') AS (genre:chararray,
track_id:chararray,
artist_name:chararray,
title:chararray,
loudness:chararray,
tempo:double,
time_signature:double,
key:double,
mode:double,
duration:double,
avg_timbre1:double,
avg_timbre2:double,
avg_timbre3:double,
avg_timbre4:double,
avg_timbre5:double,
avg_timbre6:double,
avg_timbre7:double,
avg_timbre8:double,
avg_timbre9:double,
avg_timbre10:double,
avg_timbre11:double,
avg_timbre12:double,
var_timbre1:double,
var_timbre2:double,
var_timbre3:double,
var_timbre4:double,
var_timbre5:double,
var_timbre6:double,
var_timbre7:double,
var_timbre8:double,
var_timbre9:double,
var_timbre10:double,
var_timbre11:double,
var_timbre12:double);
/* Trim un-needed fields and join lyrics and genres, then trim again*/
genre_artist = FOREACH genres GENERATE genre, track_id;
genre_lyrics = JOIN genre_artist BY track_id,
lyrics BY track_id;
genre_lyrics = FOREACH genre_lyrics GENERATE genre as genre,
word_counts as word_counts;
/* Project word_counts so that we get a genre/word pair for each word, then remove/flatten inner Map (from elephant-bird) */
flat_lyrics = FOREACH genre_lyrics GENERATE genre,
FLATTEN(word_counts);
/* Calculate the term count per document */
genre_word_totals = FOREACH flat_lyrics GENERATE genre,
(chararray)$1#'word' AS token:chararray,
(int)$1#'total' AS doc_total:int;
genre_word_totals = foreach (group genre_word_totals by (genre, token)) generate FLATTEN(group) as (genre, token),
SUM(genre_word_totals.doc_total) as doc_total;
term_counts = foreach (group genre_word_totals by genre) {
sorted = order genre_word_totals by doc_total desc;
top_20 = limit sorted 20;
generate group as genre, top_20;
}
store term_counts into '/tmp/term_counts.txt';
my_tf_idf_scores = tf_idf(genre_word_totals, 'genre', 'token');
store my_tf_idf_scores into '/tmp/tf_idf_scores.txt';
top_scores = foreach (group my_tf_idf_scores by genre) {
sorted = order my_tf_idf_scores by score desc;
top_ten = limit sorted 50;
generate group as genre, top_ten.(token, score);
}
store top_scores into '/tmp/top_scores.txt';
DEFINE tf_idf(in_relation, id_field, text_field) RETURNS out_relation {
/* Calculate the term count per document */
doc_word_totals = foreach (group $in_relation by ($id_field, $text_field)) generate
FLATTEN(group) as ($id_field, token),
COUNT_STAR($in_relation) as doc_total;
/* Calculate the document size */
pre_term_counts = foreach (group doc_word_totals by $id_field) generate
group AS $id_field,
FLATTEN(doc_word_totals.($text_field, doc_total)) as ($text_field, doc_total),
SUM(doc_word_totals.doc_total) as doc_size;
/* Calculate the TF */
term_freqs = foreach pre_term_counts generate $id_field as $id_field,
$text_field as $text_field,
((double)doc_total / (double)doc_size) AS term_freq;
/* Get count of documents using each $text_field, for idf */
text_field_usages = foreach (group term_freqs by $text_field) generate
FLATTEN(term_freqs) as ($id_field, $text_field, term_freq),
COUNT_STAR(term_freqs) as num_docs_with_$text_field;
/* Get document count */
just_ids = foreach $in_relation generate $id_field;
ndocs = foreach (group just_ids all) generate COUNT_STAR(just_ids) as total_docs;
/* Note the use of Pig Scalars to calculate idf */
$out_relation = foreach text_field_usages {
idf = LOG((double)ndocs.total_docs/(double)num_docs_with_$text_field);
tf_idf = (double)term_freq * idf;
generate $id_field as $id_field,
$text_field as token,
(double)tf_idf as score:double;
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment