Skip to content

Instantly share code, notes, and snippets.

@danoyoung
Created April 3, 2012 16:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danoyoung/2293226 to your computer and use it in GitHub Desktop.
Save danoyoung/2293226 to your computer and use it in GitHub Desktop.
1. The raw input looks like this:
4302653 df0cfc4f187e6f6258fbe732ed2cbcf5 42199 152 44390 cast Actor 3 Cliff Nazarro 2010-04-28 03:51:25 2010-04-28 03:51:25
4302654 df0cfc4f187e6f6258fbe732ed2cbcf5 42199 153 541 cast Actor 1 Russell Hayden 2010-04-28 03:51:25 2010-04-28 03:51:25
4302655 df0cfc4f187e6f6258fbe732ed2cbcf5 42199 154 46074 cast Actor 2 Inez Cooper 2010-04-28 03:51:25 2010-04-28 03:51:25
2. Then the raw data is converted and stored into an Avro file with the following pig script:
set io.sort.mb 150;
set mapred.reduce.task 0;
/*
gets all the people for a franchise.
rm avro/franchise_people;
*/
REGISTER '/Users/dan.young/projects/pig/indexer/f/udfs/udf.js' USING org.apache.pig.scripting.js.JsScriptEngine as myfuncs;
franchise_people_raw = LOAD 'hdfs://localhost:9000/user/hadoop/indexer/raw_data/franchise_people.txt' AS (id:int,franchise_guid:chararray,franchise_id:int,person_id:int,
mv_person_id:int,role_type:chararray,role:chararray,character_name:chararray,ordEr:int,first_name:chararray,last_name:chararray,middle_name:chararray,created_at:chararray,
updated_at:chararray);
a = FOREACH franchise_people_raw GENERATE franchise_id, myfuncs.get_full_name(first_name,last_name) AS full_name:chararray, TRIM(role_type) AS role_type:chararray, TRIM(role) AS role:chararray;
STORE a INTO 'hdfs://127.0.0.1:9000/user/hadoop/indexer/avro/franchise_people' using org.apache.pig.piggybank.storage.avro.AvroStorage('{"index":1,"schema":{"type":"record","name":"franchise_people","fields":[{"name":"franchise_id","type":"int","default":0},{"name":"full_name","type":"string","default":"NULL"},{"name":"role_type","type":"string","default":"NULL"},{"name":"role","type":"string","default":"NULL"}]}}');
Here's the UDF (yfuncs.get_full_name) I'm using:
get_full_name.outputSchema = "full_name:chararray";
function get_full_name(first_name,last_name){
if (first_name == null){
first_name =''
} else {
first_name.replace(/^\s+|\s+$/g,"");
}
if (last_name == null){
last_name =''
} else {
last_name.replace(/^\s+|\s+$/g,"");
}
var rec = new Object;
if (first_name && last_name) {
rec.full_name = first_name + ' ' + last_name;
} else if (first_name) {
rec.full_name = first_name;
} else if (last_name) {
rec.full_name = last_name;
} else {
rec.full_name = 'NULL';
}
return {full_name:rec.full_name};
}
3.Next, the following pig script is to group the franchise_people by franchise_id where the given role_type is either 'cast' and/or 'crew'.
set io.sort.mb 150;
/*
set mapred.reduce.task 1;
gets all the people for a franchise.
rm avro/franchise_people;
*/
franchise_people = LOAD 'hdfs://127.0.0.1:9000/user/hadoop/indexer/avro/franchise_people' using org.apache.pig.piggybank.storage.avro.AvroStorage();
a = FILTER franchise_people BY (role_type == 'cast') OR (role_type == 'crew');
b = GROUP a BY (franchise_id);
/*per_franchise_id, need to get c stored in avro with the franchise_id as one field, and another
field which has the array of names.......*/
c = FOREACH b {GENERATE group AS franchise_id, a.full_name AS cast_and_crew;};
/*I tried both of these approaches but am getting errors with PiggyBank.
STORE c INTO 'hdfs://127.0.0.1:9000/user/hadoop/indexer/avro/franchise_cast_and_crew' using org.apache.pig.piggybank.storage.avro.AvroStorage('{"index":1,"schema":{"type":"record","name":"franchise_cast_and_crew","fields":[{"name":"franchise_id","type":"int"},{"name":"cast_and_crew","type":{"type":"array","items":"string"}}]}');
STORE c INTO 'hdfs://127.0.0.1:9000/user/hadoop/indexer/avro/franchise_cast_and_crew' using org.apache.pig.piggybank.storage.avro.AvroStorage();
NOTES:
grunt> set io.sort.mb 150;
grunt> /*
grunt> set mapred.reduce.task 1;
grunt> gets all the people for a franchise.
grunt> rm avro/franchise_people;
grunt> */
grunt> franchise_people = LOAD 'hdfs://127.0.0.1:9000/user/hadoop/indexer/avro/franchise_people' using org.apache.pig.piggybank.storage.avro.AvroStorage();
grunt>
grunt> a = FILTER franchise_people BY (role_type == 'cast') OR (role_type == 'crew');
grunt> b = GROUP a BY (franchise_id);
grunt>
grunt> /*per_franchise_id, need to get c stored in avro with the franchise_id as one field, and another
grunt> field which has the array of names.......*/
grunt> c = FOREACH b {GENERATE group AS franchise_id, a.full_name AS cast_and_crew;};
grunt> describe c;
c: {franchise_id: int,cast_and_crew: {(full_name: chararray)}}
grunt>
grunt> illustrate c;
.....
.....
2012-04-03 10:06:51,261 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
--------------------------------------------------------------------------------------------------------------------
| franchise_people | franchise_id:int | full_name:chararray | role_type:chararray | role:chararray |
--------------------------------------------------------------------------------------------------------------------
| | 200994 | John Hurt | cast | Actor |
| | 200994 | Armando Acosta | crew | Director |
| | 200994 | John Hurt | 0 | Actor |
--------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------
| a | franchise_id:int | full_name:chararray | role_type:chararray | role:chararray |
-----------------------------------------------------------------------------------------------------
| | 200994 | John Hurt | cast | Actor |
| | 200994 | Armando Acosta | crew | Director |
-----------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------
| b | group:int | a:bag{:tuple(franchise_id:int,full_name:chararray,role_type:chararray,role:chararray)} |
-------------------------------------------------------------------------------------------------------------------------------------
| | 200994 | {(200994, ..., Actor), (200994, ..., Director)} |
-------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------
| c | franchise_id:int | cast_and_crew:bag{:tuple(full_name:chararray)} |
-------------------------------------------------------------------------------------------
| | 200994 | {(John Hurt), (Armando Acosta)} |
-------------------------------------------------------------------------------------------
grunt>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment