Created
September 5, 2011 08:36
-
-
Save anandnalya/1194444 to your computer and use it in GitHub Desktop.
A basic WordReceiver, SentenceReceiver application in Yahoo S4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<beans xmlns="http://www.springframework.org/schema/beans" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://www.springframework.org/schema/beans | |
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"> | |
<bean id="wordCatcher" class="test.s4.WordReceiverPE"> | |
<property name="dispatcher" ref="dispatcher"/> | |
<property name="keys"> | |
<!-- Listen for both words and sentences --> | |
<list> | |
<value>RawWords *</value> | |
<value>Sentence *</value> | |
</list> | |
</property> | |
</bean> | |
<bean id="sentenceCatcher" class="test.s4.SentenceReceiverPE"> | |
<property name="keys"> | |
<list> | |
<value>Sentence *</value> | |
</list> | |
</property> | |
</bean> | |
<bean id="dispatcher" class="io.s4.dispatcher.Dispatcher" | |
init-method="init"> | |
<property name="partitioners"> | |
<list> | |
<!-- Partition on senteceId which is "1" for all sentences. --> | |
<ref bean="sentenceIdPartitioner" /> | |
</list> | |
</property> | |
<property name="eventEmitter" ref="commLayerEmitter" /> | |
<property name="loggerName" value="s4" /> | |
</bean> | |
<bean id="sentenceIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner"> | |
<property name="streamNames"> | |
<list> | |
<value>Sentence</value> | |
</list> | |
</property> | |
<property name="hashKey"> | |
<list> | |
<value>sentenceId</value> | |
</list> | |
</property> | |
<property name="hasher" ref="hasher" /> | |
<property name="debug" value="true" /> | |
</bean> | |
</beans> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test.s4; | |
public class Sentence { | |
private String string; | |
public Sentence(){ | |
// default constructor | |
} | |
public Sentence(String string) { | |
this.string = string; | |
} | |
public String getString() { | |
return string; | |
} | |
public void setString(String message) { | |
this.string = message; | |
} | |
public String getSentenceId(){ | |
// all sentences have the same key | |
return "1"; | |
} | |
public void setSentenceId(String id){ | |
// do nothing | |
} | |
@Override | |
public String toString() { | |
return "Sentence [string=" + string + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test.s4; | |
import io.s4.processor.AbstractPE; | |
/** | |
* This class receives Sentence events and print them on stdout. | |
*/ | |
public class SentenceReceiverPE extends AbstractPE { | |
/** | |
* Print received sentence event. | |
* @param sentence | |
*/ | |
public void processEvent(Sentence sentence){ | |
System.out.println("Received Sentence: " + sentence); | |
} | |
@Override | |
public void output() { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public String getId() { | |
return this.getClass().getName(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test.s4.generator; | |
import io.s4.client.Driver; | |
import io.s4.client.Message; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import java.io.Reader; | |
public class TestMessageSender { | |
public static void main(String[] args) { | |
if (args.length < 1) { | |
System.err.println("No host name specified"); | |
System.exit(1); | |
} | |
String hostName = args[0]; | |
if (args.length < 2) { | |
System.err.println("No port specified"); | |
System.exit(1); | |
} | |
int port = -1; | |
try { | |
port = Integer.parseInt(args[1]); | |
} catch (NumberFormatException nfe) { | |
System.err.println("Bad port number specified: " + args[1]); | |
System.exit(1); | |
} | |
if (args.length < 3) { | |
System.err.println("No stream name specified"); | |
System.exit(1); | |
} | |
String streamName = args[2]; | |
if (args.length < 4) { | |
System.err.println("No class name specified"); | |
System.exit(1); | |
} | |
String clazz = args[3]; | |
Driver d = new Driver(hostName, port); | |
Reader inputReader = null; | |
BufferedReader br = null; | |
try { | |
if (!d.init()) { | |
System.err.println("Driver initialization failed"); | |
System.exit(1); | |
} | |
if (!d.connect()) { | |
System.err.println("Driver initialization failed"); | |
System.exit(1); | |
} | |
inputReader = new InputStreamReader(System.in); | |
br = new BufferedReader(inputReader); | |
for (String inputLine = null; (inputLine = br.readLine()) != null;) { | |
String string = "{\"string\":\""+inputLine+"\"}"; | |
System.out.println("sending " + string); | |
Message m = new Message(streamName, clazz, string); | |
d.send(m); | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
finally { | |
try { d.disconnect(); } catch (Exception e) {} | |
try { br.close(); } catch (Exception e) {} | |
try { inputReader.close(); } catch (Exception e) {} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test.s4; | |
public class Word { | |
private String string; | |
public String getString() { | |
return string; | |
} | |
public void setString(String message) { | |
this.string = message; | |
} | |
@Override | |
public String toString() { | |
return "Word [string=" + string + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test.s4; | |
import io.s4.dispatcher.Dispatcher; | |
import io.s4.processor.AbstractPE; | |
/** | |
* This class receive word and sentence events and print them to stdout | |
*/ | |
public class WordReceiverPE extends AbstractPE { | |
private StringBuilder builder = new StringBuilder(); | |
/** | |
* Dispatcher that will dispatch events on <code>Sentence *</code> stream. | |
*/ | |
private Dispatcher dispatcher; | |
public Dispatcher getDispatcher() { | |
return dispatcher; | |
} | |
public void setDispatcher(Dispatcher dispatcher) { | |
this.dispatcher = dispatcher; | |
} | |
/** | |
* Process word events. This prints out the received word and also builds | |
* the sentence that will be dispatched once we found the end of the | |
* sentence identified by <code>.</code> | |
* | |
* @param word | |
* Word received on <code>RawWords *</code> stream. | |
*/ | |
public void processEvent(Word word) { | |
System.out.println("Received: " + word); | |
// keep building the sentence | |
builder.append(' ').append(word.getString().trim()); | |
if (builder.toString().endsWith(".")) { | |
System.err.print("End of sentence found"); | |
// dispatch a Sentence event | |
dispatcher.dispatchEvent("Sentence", new Sentence(builder.toString())); | |
// reset buffer. | |
builder.setLength(0); | |
} | |
} | |
/** | |
* Process sentence events. This method demonstrated that one class can | |
* recieve multiple type of events on different Streams. | |
* | |
* @param sentence | |
* Sentence recieved on <code>Sentence *<code> stream. | |
*/ | |
public void processEvent(Sentence sentence) { | |
System.out.println("Received Sentence(WordReceiverPE) : " + sentence); | |
} | |
@Override | |
public void output() { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public String getId() { | |
return this.getClass().getName(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment