下面是如何使用 Akka Streams 过滤日志并汇总上传总量的代码示例
Last active
November 21, 2015 10:53
-
-
Save binshuohu/26b0ce7844a713f8f83f to your computer and use it in GitHub Desktop.
A simple akka streams example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[2015-11-21 15:00:55] api=+AppDown,seq=3065477251,uin=249615433,appid=25,client_seq=635410220,client_ver=30013,client_net=30013,client_ip=221.1.115.136,caller=10.149.24.231,step=1,step_ip=10.149.20.140,step_port=30025,ret=0,retmsg=Success,ccd_delay=0,delay=61,fid=0971ea82-d031-4edd-b487-a78a4a1564b3,version=1448089142,sha=d358632365598d9b8d27fd7544e12385797a6c82,fsize=7783848 | |
[2015-11-21 15:00:55] api=+AppUp,seq=3065477325,uin=1109610795,appid=25,client_seq=56929653,client_ver=30001,client_net=3,ret=-10112,retmsg=Apply upload file error,ccd_delay=0,delay=27,sha=596bbe882c4f01409847cc071492710927e2fa44,fsize=24465,attrbit=0,name=14-31-13-91fa5ae736d12f2e442d15ed4fc2d5628535682e.jpg,in_up_ip=0.0.0.0,out_up_ip=0.0.0.0,confilict_key=NULL | |
[2015-11-21 15:00:55] api=+BatchQueryDir,seq=3065477311,uin=133998050,appid=25,client_seq=38111776,client_ver=30013,client_net=0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.stream | |
import java.io.File | |
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl._ | |
import akka.stream.io.Implicits._ | |
import akka.stream.io._ | |
import akka.util.ByteString | |
import scala.util.{ Success, Failure } | |
object Cycle { | |
def sumUploadVolume: Flow[String, Long, Unit] = { | |
val filter = Flow[String].filter(_.contains("api=+AppUp")) //过滤出日志中和上传相关的条目 | |
val mapper = Flow[String].map { l => | |
val regex = """.*fsize=(\d+).*""".r //抽取文件大小字段并 map 成整数类型 | |
l match { | |
case regex(size) => size.toLong | |
} | |
} | |
filter.via(mapper).via(Flow[Long].fold(0L)(_ + _)) //通过 fold ,计算出总量 | |
} | |
def main(args: Array[String]): Unit = { | |
implicit val system = ActorSystem() | |
import system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
val t1 = System.nanoTime() | |
val file = new File(args(0)) | |
val result: Future[Long] = Source.synchronousFile(file) //从文件生成一个 source,此 Source 的元素类型里面是 bytestring,无法区分单条日志的边界 | |
.via(Framing.delimiter( //通过成帧,以换行符作为的边界 | |
ByteString("\n"), | |
maximumFrameLength = 1024, | |
allowTruncation = true)) | |
.map(_.utf8String) //将 bytestring 映射成 string | |
.via(sumUploadVolume) //通过上面定义的 Flow,算出总上传量 | |
.runWith(Sink.head) //因为上面的 flow 用了 fold,所以只会生成一个元素,那么用 head 获取这一个元素就足够了 | |
result onComplete { | |
case Success(x) => { | |
val t2 = System.nanoTime() | |
println(s"total upload is $x") | |
println(s"time elapsed: ${(t2 - t1)/ 1000000} ms") | |
system.shutdown() | |
} | |
case Failure(e) => | |
println(s"error is $e") | |
system.shutdown() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.Keep | |
import org.scalatest._ | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
import akka.stream.testkit.scaladsl._ | |
import sample.stream.Cycle | |
class StreamSpec extends FlatSpec with Matchers { | |
implicit val system = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
"Empty log" should "have zero upload volume" in { | |
val (pub, sub) = TestSource.probe[String] | |
.via(Cycle.sumUploadVolume) | |
.toMat(TestSink.probe[Long])(Keep.both) | |
.run() | |
sub.request(1) | |
pub.sendComplete() | |
sub.expectNext(0L) | |
sub.expectComplete() | |
} | |
"Non upload log" should "have zero upload volume" in { | |
val (pub, sub) = TestSource.probe[String] | |
.via(Cycle.sumUploadVolume) | |
.toMat(TestSink.probe[Long])(Keep.both) | |
.run() | |
sub.request(1) | |
pub.sendNext("api=+AppDown,fsize=1024") | |
pub.sendComplete() | |
sub.expectNext(0) | |
sub.expectComplete() | |
} | |
"upload log" should "have upload volume" in { | |
val (pub, sub) = TestSource.probe[String] | |
.via(Cycle.sumUploadVolume) | |
.toMat(TestSink.probe[Long])(Keep.both) | |
.run() | |
sub.request(1) | |
pub.sendNext("api=+AppUp,fsize=1024") | |
pub.sendNext("api=+AppUp,fsize=1024") | |
pub.sendNext("api=+AppDown,fsize=1024") | |
pub.sendComplete() | |
sub.expectNext(2048) | |
sub.expectComplete() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment