Last active
November 4, 2016 11:58
-
-
Save johanandren/41b096c9ee647863c6c04959be548b25 to your computer and use it in GitHub Desktop.
Sample of streaming log entries from a local logfile to an AMQP broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package streams; | |
import akka.actor.ActorSystem; | |
import akka.stream.ActorMaterializer; | |
import akka.stream.Materializer; | |
import akka.stream.contrib.FileTailSource; | |
import akka.stream.contrib.amqp.*; | |
import akka.stream.javadsl.Framing; | |
import akka.util.ByteString; | |
import scala.concurrent.duration.FiniteDuration; | |
import java.nio.file.FileSystems; | |
import java.nio.file.Path; | |
import java.util.Collections; | |
import java.util.Optional; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* Requires a running amqp-server and the dependencies | |
* akka-streams 2.4.8+, akka-streams-contrib 0.3 and akka-streams-contrib-amqp 0.3 | |
*/ | |
public class LogsToAmqpSample { | |
public static void main(String[] args) throws Exception { | |
final ActorSystem system = ActorSystem.create(); | |
final Materializer materializer = ActorMaterializer.create(system); | |
final AmqpSinkSettings settings = AmqpSinkSettings.create( | |
DefaultAmqpConnection.getInstance(), | |
Optional.empty(), | |
Optional.of("logs"), | |
Collections.singletonList(QueueDeclaration.create("logs")) | |
); | |
final Path logfile = FileSystems.getDefault().getPath("/var/log/system.log"); | |
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS); | |
final int maxLineLength = 4096; | |
FileTailSource.createLines(fs.getPath(path), maxLineLength, pollingInterval) | |
.to(AmqpSink.simple(settings)) | |
.run(materializer); | |
System.out.println("Enter any key to stop streaming log to rabbitmq"); | |
System.in.read(); | |
system.terminate(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Requires Alpakka