Skip to content

Instantly share code, notes, and snippets.

@anandnalya
Created September 5, 2011 08:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anandnalya/1194444 to your computer and use it in GitHub Desktop.
Save anandnalya/1194444 to your computer and use it in GitHub Desktop.
A basic WordReceiver, SentenceReceiver application in Yahoo S4
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<bean id="wordCatcher" class="test.s4.WordReceiverPE">
<property name="dispatcher" ref="dispatcher"/>
<property name="keys">
<!-- Listen for both words and sentences -->
<list>
<value>RawWords *</value>
<value>Sentence *</value>
</list>
</property>
</bean>
<bean id="sentenceCatcher" class="test.s4.SentenceReceiverPE">
<property name="keys">
<list>
<value>Sentence *</value>
</list>
</property>
</bean>
<bean id="dispatcher" class="io.s4.dispatcher.Dispatcher"
init-method="init">
<property name="partitioners">
<list>
<!-- Partition on senteceId which is "1" for all sentences. -->
<ref bean="sentenceIdPartitioner" />
</list>
</property>
<property name="eventEmitter" ref="commLayerEmitter" />
<property name="loggerName" value="s4" />
</bean>
<bean id="sentenceIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
<property name="streamNames">
<list>
<value>Sentence</value>
</list>
</property>
<property name="hashKey">
<list>
<value>sentenceId</value>
</list>
</property>
<property name="hasher" ref="hasher" />
<property name="debug" value="true" />
</bean>
</beans>
package test.s4;
public class Sentence {
private String string;
public Sentence(){
// default constructor
}
public Sentence(String string) {
this.string = string;
}
public String getString() {
return string;
}
public void setString(String message) {
this.string = message;
}
public String getSentenceId(){
// all sentences have the same key
return "1";
}
public void setSentenceId(String id){
// do nothing
}
@Override
public String toString() {
return "Sentence [string=" + string + "]";
}
}
package test.s4;
import io.s4.processor.AbstractPE;
/**
* This class receives Sentence events and print them on stdout.
*/
public class SentenceReceiverPE extends AbstractPE {
/**
* Print received sentence event.
* @param sentence
*/
public void processEvent(Sentence sentence){
System.out.println("Received Sentence: " + sentence);
}
@Override
public void output() {
// TODO Auto-generated method stub
}
@Override
public String getId() {
return this.getClass().getName();
}
}
package test.s4.generator;
import io.s4.client.Driver;
import io.s4.client.Message;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
public class TestMessageSender {
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("No host name specified");
System.exit(1);
}
String hostName = args[0];
if (args.length < 2) {
System.err.println("No port specified");
System.exit(1);
}
int port = -1;
try {
port = Integer.parseInt(args[1]);
} catch (NumberFormatException nfe) {
System.err.println("Bad port number specified: " + args[1]);
System.exit(1);
}
if (args.length < 3) {
System.err.println("No stream name specified");
System.exit(1);
}
String streamName = args[2];
if (args.length < 4) {
System.err.println("No class name specified");
System.exit(1);
}
String clazz = args[3];
Driver d = new Driver(hostName, port);
Reader inputReader = null;
BufferedReader br = null;
try {
if (!d.init()) {
System.err.println("Driver initialization failed");
System.exit(1);
}
if (!d.connect()) {
System.err.println("Driver initialization failed");
System.exit(1);
}
inputReader = new InputStreamReader(System.in);
br = new BufferedReader(inputReader);
for (String inputLine = null; (inputLine = br.readLine()) != null;) {
String string = "{\"string\":\""+inputLine+"\"}";
System.out.println("sending " + string);
Message m = new Message(streamName, clazz, string);
d.send(m);
}
} catch (IOException e) {
e.printStackTrace();
}
finally {
try { d.disconnect(); } catch (Exception e) {}
try { br.close(); } catch (Exception e) {}
try { inputReader.close(); } catch (Exception e) {}
}
}
}
package test.s4;
public class Word {
private String string;
public String getString() {
return string;
}
public void setString(String message) {
this.string = message;
}
@Override
public String toString() {
return "Word [string=" + string + "]";
}
}
package test.s4;
import io.s4.dispatcher.Dispatcher;
import io.s4.processor.AbstractPE;
/**
* This class receive word and sentence events and print them to stdout
*/
public class WordReceiverPE extends AbstractPE {
private StringBuilder builder = new StringBuilder();
/**
* Dispatcher that will dispatch events on <code>Sentence *</code> stream.
*/
private Dispatcher dispatcher;
public Dispatcher getDispatcher() {
return dispatcher;
}
public void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
/**
* Process word events. This prints out the received word and also builds
* the sentence that will be dispatched once we found the end of the
* sentence identified by <code>.</code>
*
* @param word
* Word received on <code>RawWords *</code> stream.
*/
public void processEvent(Word word) {
System.out.println("Received: " + word);
// keep building the sentence
builder.append(' ').append(word.getString().trim());
if (builder.toString().endsWith(".")) {
System.err.print("End of sentence found");
// dispatch a Sentence event
dispatcher.dispatchEvent("Sentence", new Sentence(builder.toString()));
// reset buffer.
builder.setLength(0);
}
}
/**
* Process sentence events. This method demonstrated that one class can
* recieve multiple type of events on different Streams.
*
* @param sentence
* Sentence recieved on <code>Sentence *<code> stream.
*/
public void processEvent(Sentence sentence) {
System.out.println("Received Sentence(WordReceiverPE) : " + sentence);
}
@Override
public void output() {
// TODO Auto-generated method stub
}
@Override
public String getId() {
return this.getClass().getName();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment