Skip to content

Instantly share code, notes, and snippets.

@johanandren
Last active November 4, 2016 11:58
Show Gist options
  • Save johanandren/41b096c9ee647863c6c04959be548b25 to your computer and use it in GitHub Desktop.
Save johanandren/41b096c9ee647863c6c04959be548b25 to your computer and use it in GitHub Desktop.
Sample of streaming log entries from a local logfile to an AMQP broker
package streams;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.contrib.FileTailSource;
import akka.stream.contrib.amqp.*;
import akka.stream.javadsl.Framing;
import akka.util.ByteString;
import scala.concurrent.duration.FiniteDuration;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* Requires a running amqp-server and the dependencies
* akka-streams 2.4.8+, akka-streams-contrib 0.3 and akka-streams-contrib-amqp 0.3
*/
public class LogsToAmqpSample {
public static void main(String[] args) throws Exception {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final AmqpSinkSettings settings = AmqpSinkSettings.create(
DefaultAmqpConnection.getInstance(),
Optional.empty(),
Optional.of("logs"),
Collections.singletonList(QueueDeclaration.create("logs"))
);
final Path logfile = FileSystems.getDefault().getPath("/var/log/system.log");
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS);
final int maxLineLength = 4096;
FileTailSource.createLines(fs.getPath(path), maxLineLength, pollingInterval)
.to(AmqpSink.simple(settings))
.run(materializer);
System.out.println("Enter any key to stop streaming log to rabbitmq");
System.in.read();
system.terminate();
}
}
@johanandren
Copy link
Author

johanandren commented Nov 4, 2016

Requires Alpakka

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment