Skip to content

Instantly share code, notes, and snippets.

@bbstilson
Created December 9, 2020 21:45
Show Gist options
  • Save bbstilson/e12232d140f5a1119ebfdd62b7d00a8c to your computer and use it in GitHub Desktop.
Save bbstilson/e12232d140f5a1119ebfdd62b7d00a8c to your computer and use it in GitHub Desktop.
import org.allenai.sqs.reader._
import scala.collection.parallel.CollectionConverters._
import scala.concurrent.ExecutionContext
import scala.collection.parallel.ExecutionContextTaskSupport
import java.util.concurrent.Executors
import com.amazonaws.services.sqs.AmazonSQSClientBuilder
object Main {
val cores = Runtime.getRuntime().availableProcessors()
val FILE_SIZE = 100000
val PREFIX = "https://sqs.us-west-2.amazonaws.com/896129387501"
val FROM = s"$PREFIX/s2-espresso-prod"
val TO = s"$PREFIX/s2-espresso-backfill-prod"
val letters = 'a' to 'z'
val dataDir = os.pwd / "data"
val random = new scala.util.Random()
def newFile: String = List.fill(20)(letters(random.nextInt(26))).mkString
def main(args: Array[String]): Unit = {
val col = (0 to (cores * 2)).par
val tn = Thread.currentThread().getName()
col.tasksupport = new ExecutionContextTaskSupport(
ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(cores * 2)
)
)
col.foreach { n =>
val upstream = new SingleQueueReader(
FROM,
10,
AmazonSQSClientBuilder.defaultClient(),
10000
)
val downstream = new SQSUtil(TO)
var file = newFile
var fileLength = 0
var start = System.currentTimeMillis()
upstream.batches.takeWhile(_.nonEmpty).foreach { batch =>
val messages = batch.map(_.message).map { m =>
m.getMessageId() -> m.getBody()
}
// write contents to file
os.write.append(
dataDir / file,
messages.map(_._2).mkString("\n") + "\n"
)
fileLength += batch.size
// send to downstream queue
downstream.sendMessageBatch(messages)
// delete from upstream
batch.foreach(_.delete())
if (fileLength % 10000 == 0) {
val took = (System.currentTimeMillis() - start) / 1000
val tn = Thread.currentThread().getName()
println(
s"$tn - (check in) Processed $fileLength messages in $took seconds (~${fileLength / took} mps)."
)
}
if (fileLength >= FILE_SIZE) {
val took = (System.currentTimeMillis() - start) / 1000
println(
s"$tn - (file complete) Processed $fileLength messages in $took seconds (~${fileLength / took} mps)."
)
start = System.currentTimeMillis()
file = newFile
fileLength = 0
}
}
}
}
}
import com.amazonaws.services.sqs._
import com.amazonaws.services.sqs.model._
import scala.jdk.CollectionConverters._
class SQSUtil(queueUrl: String) {
private[this] val sqs = AmazonSQSClientBuilder.defaultClient()
def sendMessageBatch(
messages: List[(String, String)]
): SendMessageBatchResult = {
val entries = messages.map { case (id, body) =>
new SendMessageBatchRequestEntry(id, body)
}.asJava
sqs.sendMessageBatch(
new SendMessageBatchRequest(queueUrl).withEntries(entries)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment