Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Sample of streaming log entries from a local logfile to an AMQP broker
package streams;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.contrib.FileTailSource;
import akka.stream.contrib.amqp.*;
import akka.stream.javadsl.Framing;
import akka.util.ByteString;
import scala.concurrent.duration.FiniteDuration;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* Requires a running amqp-server and the dependencies
* akka-streams 2.4.8+, akka-streams-contrib 0.3 and akka-streams-contrib-amqp 0.3
*/
public class LogsToAmqpSample {
public static void main(String[] args) throws Exception {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final AmqpSinkSettings settings = AmqpSinkSettings.create(
DefaultAmqpConnection.getInstance(),
Optional.empty(),
Optional.of("logs"),
Collections.singletonList(QueueDeclaration.create("logs"))
);
final Path logfile = FileSystems.getDefault().getPath("/var/log/system.log");
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS);
final int maxLineLength = 4096;
FileTailSource.createLines(fs.getPath(path), maxLineLength, pollingInterval)
.to(AmqpSink.simple(settings))
.run(materializer);
System.out.println("Enter any key to stop streaming log to rabbitmq");
System.in.read();
system.terminate();
}
}
@johanandren

This comment has been minimized.

Show comment Hide comment
@johanandren

johanandren Nov 4, 2016

Requires Alpakka

Owner

johanandren commented Nov 4, 2016

Requires Alpakka

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment