Skip to content

Instantly share code, notes, and snippets.

@ogibayashi
Created March 24, 2016 09:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ogibayashi/402153bcd79138c35d6a to your computer and use it in GitHub Desktop.
Save ogibayashi/402153bcd79138c35d6a to your computer and use it in GitHub Desktop.
Distinct count test code for Apache Flink
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.{ WatchType}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
object DistinctCountTest {
def main(args: Array[String]) {
val fileName = args(0)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.timeWindowAll(Time.of(60, TimeUnit.SECONDS))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
.fold(Set[String]()){(r,i) => { r + i}}
.map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
input print
env.execute("DistinctCount")
}
}

To run the program

./bin/flink run -c DistinctCountTest  ~/opt/flink-1.0.0/work/quickstart/target/quickstart-1.0-SNAPSHOT.jar ~/tmp/input.txt

Add some lines to the file.

% cat put.sh
echo "aaa"
echo "aaa"
sleep 10
echo "bbb"
sleep 60
echo "ccc"

% sh put.sh >> ~/tmp/input.txt 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment