Created
April 12, 2018 13:43
-
-
Save io7m/ddc4e94e22b56dbcbe41ef10dcc5b3ca to your computer and use it in GitHub Desktop.
Aeron MDC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.io7m.aeron_guide.scratchpad; | |
import io.aeron.Aeron; | |
import io.aeron.ChannelUriStringBuilder; | |
import io.aeron.ConcurrentPublication; | |
import io.aeron.Publication; | |
import io.aeron.Subscription; | |
import io.aeron.driver.MediaDriver; | |
import io.aeron.logbuffer.Header; | |
import org.agrona.BufferUtil; | |
import org.agrona.DirectBuffer; | |
import org.agrona.concurrent.UnsafeBuffer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.time.ZonedDateTime; | |
import java.time.format.DateTimeFormatter; | |
import static java.nio.charset.StandardCharsets.UTF_8; | |
public final class ExampleMDC3 | |
{ | |
private static final Logger LOG = | |
LoggerFactory.getLogger(ExampleMDC3.class); | |
private ExampleMDC3() | |
{ | |
} | |
public static void main( | |
final String[] args) | |
throws Exception | |
{ | |
final Aeron server_aeron = serverAeron(); | |
final Aeron client_aeron1 = clientAeron(1); | |
final Aeron client_aeron2 = clientAeron(2); | |
final ConcurrentPublication server_pub = | |
server_aeron.addPublication( | |
new ChannelUriStringBuilder() | |
.reliable(Boolean.TRUE) | |
.media("udp") | |
.controlMode("dynamic") | |
.controlEndpoint("127.0.0.1:10000") | |
.endpoint("127.0.0.1:10001") | |
.build(), | |
23); | |
final Subscription client_sub0 = | |
client_aeron1.addSubscription( | |
new ChannelUriStringBuilder() | |
.reliable(Boolean.TRUE) | |
.media("udp") | |
.controlEndpoint("127.0.0.1:10000") | |
.endpoint("127.0.0.2:10001") | |
.build(), | |
23); | |
final Subscription client_sub1 = | |
client_aeron2.addSubscription( | |
new ChannelUriStringBuilder() | |
.reliable(Boolean.TRUE) | |
.media("udp") | |
.controlEndpoint("127.0.0.1:10000") | |
.endpoint("127.0.0.3:10001") | |
.build(), | |
23); | |
final ConcurrentPublication server_pub2 = | |
server_aeron.addPublication( | |
new ChannelUriStringBuilder() | |
.reliable(Boolean.TRUE) | |
.media("udp") | |
.controlMode("dynamic") | |
.controlEndpoint("127.0.0.1:10000") | |
.endpoint("127.0.0.1:10001") | |
.build(), | |
23); | |
final UnsafeBuffer buffer = | |
new UnsafeBuffer(BufferUtil.allocateDirectAligned(2048, 16)); | |
while (true) { | |
if (server_pub.isConnected()) { | |
sendMessage(server_pub, buffer, "server_pub1 " + nowTimestamp()); | |
} | |
if (server_pub2.isConnected()) { | |
sendMessage(server_pub2, buffer, "server_pub2 " + nowTimestamp()); | |
} | |
readMessages("client_sub0", client_sub0); | |
readMessages("client_sub1", client_sub1); | |
Thread.sleep(1000L); | |
} | |
} | |
private static Aeron serverAeron() | |
{ | |
final MediaDriver.Context server_media_context = | |
new MediaDriver.Context() | |
.dirDeleteOnStart(true) | |
.aeronDirectoryName("/tmp/example-server"); | |
final Aeron.Context server_aeron_context = | |
new Aeron.Context() | |
.aeronDirectoryName("/tmp/example-server"); | |
final MediaDriver server_media_driver = | |
MediaDriver.launch(server_media_context); | |
return Aeron.connect(server_aeron_context); | |
} | |
private static Aeron clientAeron( | |
final int index) | |
{ | |
final MediaDriver.Context client_media_context = | |
new MediaDriver.Context() | |
.dirDeleteOnStart(true) | |
.aeronDirectoryName("/tmp/example-client-" + index); | |
final Aeron.Context client_aeron_context = | |
new Aeron.Context() | |
.aeronDirectoryName("/tmp/example-client-" + index); | |
final MediaDriver client_media_driver = | |
MediaDriver.launch(client_media_context); | |
return Aeron.connect(client_aeron_context); | |
} | |
private static void readMessages( | |
final String receiver, | |
final Subscription sub) | |
throws InterruptedException | |
{ | |
if (sub.isConnected()) { | |
for (int index = 0; index < 10; ++index) { | |
sub.poll((buffer, offset, length, header) -> onMessage(receiver, | |
buffer, | |
offset, | |
length, | |
header), 10); | |
Thread.sleep(100L); | |
} | |
} | |
} | |
private static String nowTimestamp() | |
{ | |
return ZonedDateTime.now().format(DateTimeFormatter.ISO_ZONED_DATE_TIME); | |
} | |
private static void onMessage( | |
final String receiver, | |
final DirectBuffer buffer, | |
final int offset, | |
final int length, | |
final Header header) | |
{ | |
final byte[] buf = new byte[length]; | |
buffer.getBytes(offset, buf); | |
final String message = new String(buf, UTF_8); | |
LOG.debug("onMessage: [{}] {}: {}", Integer.valueOf(header.sessionId()), receiver, message); | |
} | |
private static boolean sendMessage( | |
final Publication pub, | |
final UnsafeBuffer buffer, | |
final String text) | |
throws IOException | |
{ | |
LOG.debug("send: [{}] {}", Integer.valueOf(pub.sessionId()), text); | |
final byte[] value = text.getBytes(UTF_8); | |
buffer.putBytes(0, value); | |
long result = 0L; | |
for (int index = 0; index < 5; ++index) { | |
result = pub.offer(buffer, 0, text.length()); | |
if (result < 0L) { | |
try { | |
Thread.sleep(100L); | |
} catch (final InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
continue; | |
} | |
return true; | |
} | |
throw new IOException("Could not send: error code " + result); | |
} | |
} |
Author
io7m
commented
Apr 12, 2018
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment