Created
October 16, 2012 17:24
-
-
Save ceteri/3900702 to your computer and use it in GitHub Desktop.
Cascading for the Impatient, Part 2 - Word Count
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
; Paul Lam | |
; https://github.com/Quantisan/Impatient | |
(ns impatient.core | |
(:use [cascalog.api] | |
[cascalog.more-taps :only (hfs-delimited)]) | |
(:require [clojure.string :as s] | |
[cascalog.ops :as c]) | |
(:gen-class)) | |
(defmapcatop split [line] | |
"reads in a line of string and splits it by regex" | |
(s/split line #"[\[\]\\\(\),.)\s]+")) | |
(defn -main [in out & args] | |
(?<- (hfs-delimited out) | |
[?word ?count] | |
((hfs-delimited in :skip-header? true) _ ?line) | |
(split ?line :> ?word) | |
(c/count ?count))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class | |
Main | |
{ | |
public static void | |
main( String[] args ) | |
{ | |
String docPath = args[ 0 ]; | |
String wcPath = args[ 1 ]; | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass( properties, Main.class ); | |
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties ); | |
// create source and sink taps | |
Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath ); | |
Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath ); | |
// specify a regex operation to split the "document" text lines into a token stream | |
Fields token = new Fields( "token" ); | |
Fields text = new Fields( "text" ); | |
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" ); | |
// only returns "token" | |
Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS ); | |
// determine the word counts | |
Pipe wcPipe = new Pipe( "wc", docPipe ); | |
wcPipe = new GroupBy( wcPipe, token ); | |
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); | |
// connect the taps, pipes, etc., into a flow | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName( "wc" ) | |
.addSource( docPipe, docTap ) | |
.addTailSink( wcPipe, wcTap ); | |
// write a DOT file and run the flow | |
Flow wcFlow = flowConnector.connect( flowDef ); | |
wcFlow.writeDOT( "dot/wc.dot" ); | |
wcFlow.complete(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- kudos to Dmitriy Ryaboy | |
docPipe = LOAD '$docPath' | |
USING PigStorage('\t', 'tagsource') AS (doc_id, text); | |
docPipe = FILTER docPipe BY doc_id != 'doc_id'; | |
-- specify regex to split "document" text lines into token stream | |
tokenPipe = FOREACH docPipe | |
GENERATE doc_id, FLATTEN(TOKENIZE(text, ' [](),.')) AS token; | |
tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*'; | |
-- determine the word counts | |
tokenGroups = GROUP tokenPipe BY token; | |
wcPipe = FOREACH tokenGroups | |
GENERATE group AS token, COUNT(tokenPipe) AS count; | |
-- output | |
STORE wcPipe INTO '$wcPath' | |
USING PigStorage('\t', 'tagsource'); | |
EXPLAIN -out dot/wc_pig.dot -dot wcPipe; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Sujit Pal | |
// http://sujitpal.blogspot.com/2012/08/scalding-for-impatient.html | |
package com.mycompany.impatient | |
import com.twitter.scalding._ | |
class Part2(args : Args) extends Job(args) { | |
val input = Tsv(args("input"), ('docId, 'text)) | |
val output = Tsv(args("output")) | |
input.read. | |
flatMap('text -> 'word) { text : String => text.split("""\s+""") }. | |
groupBy('word) { group => group.size }. | |
write(output) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Steve Severance | |
-- http://stackoverflow.com/questions/10039949/word-count-program-in-hive | |
CREATE TABLE input (line STRING); | |
LOAD DATA LOCAL INPATH 'input.tsv' | |
OVERWRITE INTO TABLE input; | |
SELECT | |
word, COUNT(*) | |
FROM input | |
LATERAL VIEW explode(split(text, ' ')) lTable AS word | |
GROUP BY word | |
; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment