Skip to content

Instantly share code, notes, and snippets.

@Charlyzzz
Created August 10, 2019 03:11
Show Gist options
  • Save Charlyzzz/3b128e87878b008688c408726d43245c to your computer and use it in GitHub Desktop.
Save Charlyzzz/3b128e87878b008688c408726d43245c to your computer and use it in GitHub Desktop.
private def cloudwatchLogs(logGroup: String, streamName: String): Source[OutputLogEvent, NotUsed] = {
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
val credentialsProvider = new AWSStaticCredentialsProvider(new ProfileCredentialsProvider("10pines").getCredentials)
val logsClient = AWSLogsClientBuilder.standard
.withCredentials(credentialsProvider)
.withRegion(Regions.US_WEST_2)
.withClientConfiguration(new ClientConfiguration())
.build
Source.unfoldAsync("") { token =>
val request = new GetLogEventsRequest(logGroup, streamName)
.withStartTime(0L)
.withEndTime(System.currentTimeMillis())
if (token.nonEmpty) request.setNextToken(token)
Future { logsClient.getLogEvents(request) }
.map { logs =>
val newToken = logs.getNextForwardToken
if (token == newToken) {
None
} else {
import scala.jdk.CollectionConverters._
val events: List[OutputLogEvent] = logs.getEvents.asScala.toList
Some((newToken, events))
}
}
}.mapConcat(identity)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment