Skip to content

Instantly share code, notes, and snippets.

@binshuohu
Last active November 21, 2015 10:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save binshuohu/26b0ce7844a713f8f83f to your computer and use it in GitHub Desktop.
Save binshuohu/26b0ce7844a713f8f83f to your computer and use it in GitHub Desktop.
A simple akka streams example

下面是如何使用 Akka Streams 过滤日志并汇总上传总量的代码示例

[2015-11-21 15:00:55] api=+AppDown,seq=3065477251,uin=249615433,appid=25,client_seq=635410220,client_ver=30013,client_net=30013,client_ip=221.1.115.136,caller=10.149.24.231,step=1,step_ip=10.149.20.140,step_port=30025,ret=0,retmsg=Success,ccd_delay=0,delay=61,fid=0971ea82-d031-4edd-b487-a78a4a1564b3,version=1448089142,sha=d358632365598d9b8d27fd7544e12385797a6c82,fsize=7783848
[2015-11-21 15:00:55] api=+AppUp,seq=3065477325,uin=1109610795,appid=25,client_seq=56929653,client_ver=30001,client_net=3,ret=-10112,retmsg=Apply upload file error,ccd_delay=0,delay=27,sha=596bbe882c4f01409847cc071492710927e2fa44,fsize=24465,attrbit=0,name=14-31-13-91fa5ae736d12f2e442d15ed4fc2d5628535682e.jpg,in_up_ip=0.0.0.0,out_up_ip=0.0.0.0,confilict_key=NULL
[2015-11-21 15:00:55] api=+BatchQueryDir,seq=3065477311,uin=133998050,appid=25,client_seq=38111776,client_ver=30013,client_net=0
package sample.stream
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.io.Implicits._
import akka.stream.io._
import akka.util.ByteString
import scala.util.{ Success, Failure }
object Cycle {
def sumUploadVolume: Flow[String, Long, Unit] = {
val filter = Flow[String].filter(_.contains("api=+AppUp")) //过滤出日志中和上传相关的条目
val mapper = Flow[String].map { l =>
val regex = """.*fsize=(\d+).*""".r //抽取文件大小字段并 map 成整数类型
l match {
case regex(size) => size.toLong
}
}
filter.via(mapper).via(Flow[Long].fold(0L)(_ + _)) //通过 fold ,计算出总量
}
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val t1 = System.nanoTime()
val file = new File(args(0))
val result: Future[Long] = Source.synchronousFile(file) //从文件生成一个 source,此 Source 的元素类型里面是 bytestring,无法区分单条日志的边界
.via(Framing.delimiter( //通过成帧,以换行符作为的边界
ByteString("\n"),
maximumFrameLength = 1024,
allowTruncation = true))
.map(_.utf8String) //将 bytestring 映射成 string
.via(sumUploadVolume) //通过上面定义的 Flow,算出总上传量
.runWith(Sink.head) //因为上面的 flow 用了 fold,所以只会生成一个元素,那么用 head 获取这一个元素就足够了
result onComplete {
case Success(x) => {
val t2 = System.nanoTime()
println(s"total upload is $x")
println(s"time elapsed: ${(t2 - t1)/ 1000000} ms")
system.shutdown()
}
case Failure(e) =>
println(s"error is $e")
system.shutdown()
}
}
}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import org.scalatest._
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.testkit.scaladsl._
import sample.stream.Cycle
class StreamSpec extends FlatSpec with Matchers {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
"Empty log" should "have zero upload volume" in {
val (pub, sub) = TestSource.probe[String]
.via(Cycle.sumUploadVolume)
.toMat(TestSink.probe[Long])(Keep.both)
.run()
sub.request(1)
pub.sendComplete()
sub.expectNext(0L)
sub.expectComplete()
}
"Non upload log" should "have zero upload volume" in {
val (pub, sub) = TestSource.probe[String]
.via(Cycle.sumUploadVolume)
.toMat(TestSink.probe[Long])(Keep.both)
.run()
sub.request(1)
pub.sendNext("api=+AppDown,fsize=1024")
pub.sendComplete()
sub.expectNext(0)
sub.expectComplete()
}
"upload log" should "have upload volume" in {
val (pub, sub) = TestSource.probe[String]
.via(Cycle.sumUploadVolume)
.toMat(TestSink.probe[Long])(Keep.both)
.run()
sub.request(1)
pub.sendNext("api=+AppUp,fsize=1024")
pub.sendNext("api=+AppUp,fsize=1024")
pub.sendNext("api=+AppDown,fsize=1024")
pub.sendComplete()
sub.expectNext(2048)
sub.expectComplete()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment