Skip to content

Instantly share code, notes, and snippets.

@hellojinjie
Last active December 27, 2015 08:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hellojinjie/7300093 to your computer and use it in GitHub Desktop.
Save hellojinjie/7300093 to your computer and use it in GitHub Desktop.
TrafficGenerator.scala
import scala.collection.mutable.LinkedList
import scala.collection.mutable.ListBuffer
import scala.compat._
import java.io._
import java.net._
import java.util._
import scala.util.Random
import java.nio.charset.Charset
import java.util.concurrent._
import java.util.concurrent.atomic._
import com.typesafe.config._
object TrafficGenerator {
val conf = ConfigFactory.load();
val clientCount = conf.getInt("tg.clientCount")
val receiverURL = conf.getStringList("tg.receiverURL")
val sendThreadNum = conf.getInt("tg.sendThreadNum")
val mode = conf.getString("tg.mode")
val siteIDs = conf.getStringList("tg.siteID")
val sender = if ("immortal".equals(mode)) new ImmortalViewSender else new SineViewSender
val rand = new Random
val queue = new Array[ListBuffer[Client]](30)
val messageQueue = new LinkedBlockingQueue[Client]
val sendPermit = new Semaphore(0)
val roundFinish = new Semaphore(0)
var roundStartSecond = Platform.currentTime
var tStartSecond =Platform.currentTime / 1000
def main(args: Array[String]) {
println("Run in mode " + mode + " with client count " + clientCount + " sendThreadNum " + sendThreadNum)
println("receiverURL " + receiverURL)
println("site " + siteIDs)
generateClient
1 to sendThreadNum foreach { num => new Thread(new Grunt(num)).start}
while (true) {
tStartSecond = Platform.currentTime / 1000
sender.send
val tEndSecond = Platform.currentTime / 1000
val lag = tStartSecond + 30 - tEndSecond
if (lag > 0) {
println("Ah. I'm " + lag + " seconds advanced, I'm going to have a rest.")
TimeUnit.SECONDS.sleep(lag)
} else if (lag == 0 ){
println("Yeah, just in time.")
} else {
println("Oops, I'm " + (-lag) + " seconds lag, I am trying...")
}
}
}
def generateClient = {
for (i <- 0 until 29 ) {
queue(i) = new ListBuffer[Client]
for (j <- 1 to clientCount / 30) {
val client = new Client
client.init
queue(i) append (client)
}
}
queue(29) = new ListBuffer[Client]
for (k <- 1 to (clientCount - (clientCount / 30) * 29)) {
val client = new Client
client.init
queue(29) append (client)
}
}
def getRandomIP: String = {
return "" + (rand.nextInt(254) + 1) + "." + (rand.nextInt(254) + 1) + "." + (rand.nextInt(254) + 1) + "." + (rand.nextInt(254) + 1)
}
}
trait Sender {
def send
def beforeRound {
TrafficGenerator.sendPermit.release(TrafficGenerator.sendThreadNum)
TrafficGenerator.roundStartSecond = Platform.currentTime
}
def endRound(size: Integer) {
1 to TrafficGenerator.sendThreadNum foreach { _ => {
val syncClient = new Client
syncClient.nextSecond = true
TrafficGenerator.messageQueue.put(syncClient)
}
}
TrafficGenerator.roundFinish.acquire(TrafficGenerator.sendThreadNum)
val roundEndSecond = Platform.currentTime
if (roundEndSecond / 1000 == TrafficGenerator.roundStartSecond / 1000) {
TimeUnit.MILLISECONDS.sleep((1000 - roundEndSecond + TrafficGenerator.roundStartSecond - 8))
}
println("" + (Platform.currentTime / 1000)+ " send " + size)
}
}
// y = midline / 2 * sin(x / 900 * PI) + midline
class SineViewSender extends Sender {
var firstSecond = 0l
override def send {
if (firstSecond == 0) {
firstSecond = Platform.currentTime / 1000
}
val x = Platform.currentTime / 1000 - firstSecond
val y = TrafficGenerator.clientCount / 2 * math.sin(x.toDouble / 600.0 * math.Pi) + TrafficGenerator.clientCount
val roundCount = (y / 30).toInt
for (i <- TrafficGenerator.queue) {
beforeRound
val delta = roundCount - i.size
if (delta > 0) {
for (j <- 1 to delta) {
val client = new Client
client.init
i append (client)
}
} else if (delta < 0) {
val index = TrafficGenerator.rand.nextInt(roundCount)
for (clientToStop <- i.view(index, index - delta)) {
clientToStop.stop
TrafficGenerator.messageQueue.put(clientToStop)
}
i.remove(index, -delta)
}
for (client <- i) {
TrafficGenerator.messageQueue.put(client)
}
endRound(i.size)
}
}
}
class ImmortalViewSender extends Sender {
override def send {
for (i <- TrafficGenerator.queue) {
beforeRound
for (client <- i) {
TrafficGenerator.messageQueue.put(client)
}
endRound(i.size)
}
}
}
class Grunt(num: Integer) extends Runnable {
var socket = new Socket()
val r = new URL(TrafficGenerator.receiverURL.get(num % TrafficGenerator.receiverURL.size))
val address = new InetSocketAddress(r.getHost(), r.getPort())
val headBytes = Array.concat(("POST " + r.getPath() + " HTTP/1.1\n").getBytes(), ("Host: " + r.getHost() + "\n").getBytes(), "Content-Type: application/x-www-form-urlencoded\n".getBytes())
def run() = {
println("I'am sending to " + r)
try {
socket.connect(address)
} catch {
case e: Exception => {
println("Error while connect to " + address.toString)
}
}
while (true) {
TrafficGenerator.sendPermit.acquire
var flag = true
while (flag) {
val client = TrafficGenerator.messageQueue.take
if (client.nextSecond) {
TrafficGenerator.roundFinish.release
flag = false
} else {
try {
send(client)
} catch {
case e: Exception => {
println("Error while sending to " + address.toString)
TrafficGenerator.messageQueue.put(client)
TimeUnit.SECONDS.sleep(1)
try {
socket = new Socket()
socket.connect(address)
} catch {
case e: Exception => {
println("Error while reconnect to " + address.toString)
}
}
}
}
}
}
}
}
def send(client: Client) = {
try {
var url = client.urlPartial + "&msgID=" + client.msgID + "&bytesLoaded=" + client.bytesLoaded + "&playTime=" + client.playtime
client.msgID = client.msgID + 1
client.playtime = client.playtime + 1
client.bytesLoaded += client.bytesLoadedDelta
val body = url.getBytes(Charset.forName("UTF-8"))
val os = socket.getOutputStream()
os.write(headBytes)
os.write(("Content-Length: " + body.length + "\n\n").getBytes())
os.write(body)
os.write("\n".getBytes())
// we should not flush here, message object is too small
//os.flush()
val buffer = new Array[Byte](1024)
socket.getInputStream().read(buffer)
} catch {
case e: Exception => {
println(e.getMessage)
throw new RuntimeException(e)
}
}
}
}
class Client {
def init() = {
val content = Dimension.contents(TrafficGenerator.rand.nextInt(Dimension.contents.length))
urlPartial = "appType=" + Dimension.appTypes(TrafficGenerator.rand.nextInt(Dimension.appTypes.length))
urlPartial += "&bitrate=" + Dimension.bitrates(TrafficGenerator.rand.nextInt(Dimension.bitrates.length))
urlPartial += "&streamLength=11057"
urlPartial += "&streamURL=" + content.streamURL
urlPartial += "&progType=home"
urlPartial += "&streamDescription=" + content.streamDescription
urlPartial += "&dropFrameCount=1&os=Mac OS 10.6.8&productID=nhlgc&eventType=HEARTBEAT&gameDate=10/12/2013&"
urlPartial += "siteID=" + TrafficGenerator.siteIDs.get(TrafficGenerator.rand.nextInt(TrafficGenerator.siteIDs.size))
urlPartial += "&windowMode=full normal&homeTeam=VAN&browserVersion=Safari 5.1&startupTime=1698&player=MAC 11,9,900,117&streamType=0&awayTeam=MON&bandwidth=12876&gameID=2013020073&updateInterval=30000&mockupIp="
urlPartial += TrafficGenerator.getRandomIP
urlPartial += ("&viewID=" + UUID.randomUUID.toString)
urlPartial += ("&clientID=" + UUID.randomUUID.toString)
urlPartial += ("&cdnName=" + Dimension.cdnNames(TrafficGenerator.rand.nextInt(Dimension.cdnNames.length)))
urlPartial += ("&bytesLoadedDelta=100000")
}
def stop {
urlPartial = urlPartial.replace("HEARTBEAT", "STOP")
}
var nextSecond = false
var urlPartial = ""
var msgID = 0
var bytesLoaded = 0
var bytesLoadedDelta = 100000
var playtime = 1
}
case class Content (
gameDate: String = "2013-11-04",
gameID: Long = 1,
homeTeam: String = "HDP",
awayTeam: String = "CDH",
streamURL: String = "http://CDHvsHDP.live.m3u8",
streamDescription: String = "CDH -- HDP",
productID: String = "hd",
programID: Long = 3,
streamType: Short = 0
)
object Dimension {
val bitrates = Array[Integer](1000, 2000, 3000, 3000, 4000, 5000)
val appTypes = Array[String]("desktop","desktop", "iphone", "android_phone", "ipad", "ipad", "ipad", "android_pad", "xbox")
val cdnNames = Array[String]("nlds12.cdnl3nl.ooo.com", "cdnak", "cdnak", "cdncd", "cdncd", "cdncd", "cdncd")
val contents = Array[Content](
new Content(),
new Content(streamDescription="JIN vs KJI", streamURL="http://JINvsKJI.live.m3u8", programID=1),
new Content(streamDescription="JIE vs KJI", streamURL="http://JHNvsKJI.live.m3u8", programID=14),
new Content(streamDescription="JRN vs KJI", streamURL="http://HINvsKJI.live.m3u8", programID=10),
new Content(streamDescription="JFN vs KJI", streamURL="http://JQNvsKJI.live.m3u8", programID=15),
new Content(streamDescription="DIN vs KJI", streamURL="http://JTNvsKJI.live.m3u8", programID=17),
new Content(streamDescription="VIN vs KJI", streamURL="http://JQNvsKJI.live.m3u8", programID=41),
new Content(streamDescription="JXN vs KJI", streamURL="http://JWNvsKJI.live.m3u8", programID=71),
new Content(streamDescription="DDN vs KJI", streamURL="http://JGNvsKJI.live.m3u8", programID=81),
new Content(streamDescription="AIN vs KJI", streamURL="http://DINvsKJI.live.m3u8", programID=21),
new Content(streamDescription="GIN vs KJI", streamURL="http://FFNvsKJI.live.m3u8", programID=1),
new Content(streamDescription="RIN vs KJI", streamURL="http://JFNvsKJI.live.m3u8", programID=187),
new Content(streamDescription="EIN vs KJI", streamURL="http://JFNvsKJI.live.m3u8", programID=144),
new Content(streamDescription="HIN vs KJI", streamURL="http://EINvsKJI.live.m3u8", programID=143),
new Content(streamDescription="JIN vs ERF", streamURL="http://JINvsERF.live.m3u8", programID=1),
new Content(streamDescription="JIE vs ERF", streamURL="http://JHNvsERF.live.m3u8", programID=14),
new Content(streamDescription="JRN vs ERF", streamURL="http://HINvsERF.live.m3u8", programID=10),
new Content(streamDescription="JFN vs ERF", streamURL="http://JQNvsERF.live.m3u8", programID=15),
new Content(streamDescription="DIN vs ERF", streamURL="http://JTNvsERF.live.m3u8", programID=17),
new Content(streamDescription="VIN vs ERF", streamURL="http://JQNvsERF.live.m3u8", programID=41),
new Content(streamDescription="JXN vs ERF", streamURL="http://JWNvsERF.live.m3u8", programID=71),
new Content(streamDescription="DDN vs ERF", streamURL="http://JGNvsERF.live.m3u8", programID=81),
new Content(streamDescription="AIN vs ERF", streamURL="http://DINvsERF.live.m3u8", programID=21),
new Content(streamDescription="GIN vs ERF", streamURL="http://FFNvsERF.live.m3u8", programID=1),
new Content(streamDescription="RIN vs ERF", streamURL="http://JFNvsERF.live.m3u8", programID=187),
new Content(streamDescription="EIN vs ERF", streamURL="http://JFNvsERF.live.m3u8", programID=144),
new Content(streamDescription="HIN vs ERF", streamURL="http://EINvsERF.live.m3u8", programID=143),
new Content(streamDescription="JIN vs EWD", streamURL="http://JINvsEWD.live.m3u8", programID=6),
new Content(streamDescription="JIE vs EWD", streamURL="http://JHNvsEWD.live.m3u8", programID=64),
new Content(streamDescription="JRN vs EWD", streamURL="http://HINvsEWD.live.m3u8", programID=60),
new Content(streamDescription="JFN vs EWD", streamURL="http://JQNvsEWD.live.m3u8", programID=65),
new Content(streamDescription="DIN vs EWD", streamURL="http://JTNvsEWD.live.m3u8", programID=67),
new Content(streamDescription="VIN vs EWD", streamURL="http://JQNvsEWD.live.m3u8", programID=46),
new Content(streamDescription="JXN vs EWD", streamURL="http://JWNvsEWD.live.m3u8", programID=76),
new Content(streamDescription="DDN vs EWD", streamURL="http://JGNvsEWD.live.m3u8", programID=86),
new Content(streamDescription="AIN vs EWD", streamURL="http://DINvsEWD.live.m3u8", programID=26),
new Content(streamDescription="GIN vs EWD", streamURL="http://FFNvsEWD.live.m3u8", programID=6),
new Content(streamDescription="RIN vs EWD", streamURL="http://JFNvsEWD.live.m3u8", programID=687),
new Content(streamDescription="EIN vs EWD", streamURL="http://JFNvsEWD.live.m3u8", programID=644),
new Content(streamDescription="HIN vs EWD", streamURL="http://EINvsEWD.live.m3u8", programID=643),
new Content(streamDescription="JIN vs EWD", streamURL="http://JINvsEWD.vod.m3u8", programID=6),
new Content(streamDescription="JIE vs EWD", streamURL="http://JHNvsEWD.vod.m3u8", programID=64),
new Content(streamDescription="JRN vs EWD", streamURL="http://HINvsEWD.vod.m3u8", programID=60),
new Content(streamDescription="JFN vs EWD", streamURL="http://JQNvsEWD.vod.m3u8", programID=65),
new Content(streamDescription="DIN vs EWD", streamURL="http://JTNvsEWD.vod.m3u8", programID=67),
new Content(streamDescription="VIN vs EWD", streamURL="http://JQNvsEWD.vod.m3u8", programID=46),
new Content(streamDescription="JXN vs EWD", streamURL="http://JWNvsEWD.vod.m3u8", programID=76),
new Content(streamDescription="DDN vs EWD", streamURL="http://JGNvsEWD.vod.m3u8", programID=86),
new Content(streamDescription="AIN vs EWD", streamURL="http://DINvsEWD.vod.m3u8", programID=26),
new Content(streamDescription="GIN vs EWD", streamURL="http://FFNvsEWD.vod.m3u8", programID=6),
new Content(streamDescription="RIN vs EWD", streamURL="http://JFNvsEWD.vod.m3u8", programID=687),
new Content(streamDescription="EIN vs EWD", streamURL="http://JFNvsEWD.vod.m3u8", programID=644),
new Content(streamDescription="HIN vs EWD", streamURL="http://EINvsEWD.vod.m3u8", programID=643)
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment