Skip to content

Instantly share code, notes, and snippets.

@io7m
Created April 12, 2018 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save io7m/ddc4e94e22b56dbcbe41ef10dcc5b3ca to your computer and use it in GitHub Desktop.
Save io7m/ddc4e94e22b56dbcbe41ef10dcc5b3ca to your computer and use it in GitHub Desktop.
Aeron MDC
package com.io7m.aeron_guide.scratchpad;
import io.aeron.Aeron;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.ConcurrentPublication;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.Header;
import org.agrona.BufferUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import static java.nio.charset.StandardCharsets.UTF_8;
public final class ExampleMDC3
{
private static final Logger LOG =
LoggerFactory.getLogger(ExampleMDC3.class);
private ExampleMDC3()
{
}
public static void main(
final String[] args)
throws Exception
{
final Aeron server_aeron = serverAeron();
final Aeron client_aeron1 = clientAeron(1);
final Aeron client_aeron2 = clientAeron(2);
final ConcurrentPublication server_pub =
server_aeron.addPublication(
new ChannelUriStringBuilder()
.reliable(Boolean.TRUE)
.media("udp")
.controlMode("dynamic")
.controlEndpoint("127.0.0.1:10000")
.endpoint("127.0.0.1:10001")
.build(),
23);
final Subscription client_sub0 =
client_aeron1.addSubscription(
new ChannelUriStringBuilder()
.reliable(Boolean.TRUE)
.media("udp")
.controlEndpoint("127.0.0.1:10000")
.endpoint("127.0.0.2:10001")
.build(),
23);
final Subscription client_sub1 =
client_aeron2.addSubscription(
new ChannelUriStringBuilder()
.reliable(Boolean.TRUE)
.media("udp")
.controlEndpoint("127.0.0.1:10000")
.endpoint("127.0.0.3:10001")
.build(),
23);
final ConcurrentPublication server_pub2 =
server_aeron.addPublication(
new ChannelUriStringBuilder()
.reliable(Boolean.TRUE)
.media("udp")
.controlMode("dynamic")
.controlEndpoint("127.0.0.1:10000")
.endpoint("127.0.0.1:10001")
.build(),
23);
final UnsafeBuffer buffer =
new UnsafeBuffer(BufferUtil.allocateDirectAligned(2048, 16));
while (true) {
if (server_pub.isConnected()) {
sendMessage(server_pub, buffer, "server_pub1 " + nowTimestamp());
}
if (server_pub2.isConnected()) {
sendMessage(server_pub2, buffer, "server_pub2 " + nowTimestamp());
}
readMessages("client_sub0", client_sub0);
readMessages("client_sub1", client_sub1);
Thread.sleep(1000L);
}
}
private static Aeron serverAeron()
{
final MediaDriver.Context server_media_context =
new MediaDriver.Context()
.dirDeleteOnStart(true)
.aeronDirectoryName("/tmp/example-server");
final Aeron.Context server_aeron_context =
new Aeron.Context()
.aeronDirectoryName("/tmp/example-server");
final MediaDriver server_media_driver =
MediaDriver.launch(server_media_context);
return Aeron.connect(server_aeron_context);
}
private static Aeron clientAeron(
final int index)
{
final MediaDriver.Context client_media_context =
new MediaDriver.Context()
.dirDeleteOnStart(true)
.aeronDirectoryName("/tmp/example-client-" + index);
final Aeron.Context client_aeron_context =
new Aeron.Context()
.aeronDirectoryName("/tmp/example-client-" + index);
final MediaDriver client_media_driver =
MediaDriver.launch(client_media_context);
return Aeron.connect(client_aeron_context);
}
private static void readMessages(
final String receiver,
final Subscription sub)
throws InterruptedException
{
if (sub.isConnected()) {
for (int index = 0; index < 10; ++index) {
sub.poll((buffer, offset, length, header) -> onMessage(receiver,
buffer,
offset,
length,
header), 10);
Thread.sleep(100L);
}
}
}
private static String nowTimestamp()
{
return ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
}
private static void onMessage(
final String receiver,
final DirectBuffer buffer,
final int offset,
final int length,
final Header header)
{
final byte[] buf = new byte[length];
buffer.getBytes(offset, buf);
final String message = new String(buf, UTF_8);
LOG.debug("onMessage: [{}] {}: {}", Integer.valueOf(header.sessionId()), receiver, message);
}
private static boolean sendMessage(
final Publication pub,
final UnsafeBuffer buffer,
final String text)
throws IOException
{
LOG.debug("send: [{}] {}", Integer.valueOf(pub.sessionId()), text);
final byte[] value = text.getBytes(UTF_8);
buffer.putBytes(0, value);
long result = 0L;
for (int index = 0; index < 5; ++index) {
result = pub.offer(buffer, 0, text.length());
if (result < 0L) {
try {
Thread.sleep(100L);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
continue;
}
return true;
}
throw new IOException("Could not send: error code " + result);
}
}
@io7m
Copy link
Author

io7m commented Apr 12, 2018

13:44:17.086 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: send: [389785209] server_pub2 2018-04-12T13:44:17.086Z[UTC]
13:44:17.112 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: onMessage: [389785209] client_sub0: server_pub1 2018-04-12T13:44:16.894Z[UTC]
13:44:17.113 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: onMessage: [389785209] client_sub0: server_pub2 2018-04-12T13:44:17.086Z[UTC]
13:44:18.116 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: onMessage: [389785209] client_sub1: server_pub1 2018-04-12T13:44:16.894Z[UTC]
13:44:18.117 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: onMessage: [389785209] client_sub1: server_pub2 2018-04-12T13:44:17.086Z[UTC]
13:44:20.120 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: send: [389785209] server_pub1 2018-04-12T13:44:20.119Z[UTC]
13:44:20.120 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: send: [389785209] server_pub2 2018-04-12T13:44:20.12Z[UTC]
13:44:20.221 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: onMessage: [389785209] client_sub0: server_pub1 2018-04-12T13:44:20.119Z[UTC]
13:44:20.222 DEBUG [main] com.io7m.aeron_guide.scratchpad.ExampleMDC3: onMessage: [389785209] client_sub0: server_pub2 2018-04-12T13:44:20.12Z[UTC]```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment