Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Sample of streaming log entries from a local logfile to an AMQP broker
package streams;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.contrib.FileTailSource;
import akka.stream.contrib.amqp.*;
import akka.stream.javadsl.Framing;
import akka.util.ByteString;
import scala.concurrent.duration.FiniteDuration;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* Requires a running amqp-server and the dependencies
* akka-streams 2.4.8+, akka-streams-contrib 0.3 and akka-streams-contrib-amqp 0.3
*/
public class LogsToAmqpSample {
public static void main(String[] args) throws Exception {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final AmqpSinkSettings settings = AmqpSinkSettings.create(
DefaultAmqpConnection.getInstance(),
Optional.empty(),
Optional.of("logs"),
Collections.singletonList(QueueDeclaration.create("logs"))
);
final Path logfile = FileSystems.getDefault().getPath("/var/log/system.log");
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS);
final int maxLineLength = 4096;
FileTailSource.createLines(fs.getPath(path), maxLineLength, pollingInterval)
.to(AmqpSink.simple(settings))
.run(materializer);
System.out.println("Enter any key to stop streaming log to rabbitmq");
System.in.read();
system.terminate();
}
}
Owner

johanandren commented Nov 4, 2016 edited

Requires Alpakka

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment