Skip to content

Instantly share code, notes, and snippets.

@KasunDon
Created July 12, 2018 10:18
Show Gist options
  • Save KasunDon/9d5eb911221e10b669c910fb0ff7e01a to your computer and use it in GitHub Desktop.
Save KasunDon/9d5eb911221e10b669c910fb0ff7e01a to your computer and use it in GitHub Desktop.
EmbededPubSub for Java Integration Test
package com.kasundon.pubsub;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class EmbededPubSub {
private final static Logger LOGGER = LoggerFactory.getLogger(EmbededPubSub.class);
private final static int PUBSUB_PORT = 5189;
private final static String PROJECT_ID = "my-project-id";
private final static String TOPIC_ID = "my-test-topic";
private final static String SUBSCRIPTION_ID = "my-subscription-id";
public static final GenericContainer PUBSUB_CONTAINER =
new GenericContainer("google/cloud-sdk:latest")
.withExposedPorts(PUBSUB_PORT)
.withCommand(
"/bin/sh",
"-c",
String.format(
"gcloud beta emulators pubsub start --project %s --host-port=0.0.0.0:%d",
PROJECT_ID,
PUBSUB_PORT
)
)
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*started.*$"));
private String serviceHostname;
private ManagedChannel channel;
private TransportChannelProvider channelProvider;
private CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
public void start() {
PUBSUB_CONTAINER.start();
serviceHostname = String.format("127.0.0.1:%d", PUBSUB_CONTAINER.getMappedPort(PUBSUB_PORT));
channel = ManagedChannelBuilder
.forTarget(serviceHostname)
.usePlaintext(true)
.build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
createTopicAndSubscription(TOPIC_ID);
LOGGER.debug(" @@ PubSub service successfully started!");
}
public void stop() {
try {
channel.awaitTermination(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//noop
}
PUBSUB_CONTAINER.stop();
}
public void createTopicAndSubscription(
String topicName
) {
ProjectTopicName projectTopicName = ProjectTopicName.of(PROJECT_ID, topicName);
ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID);
SubscriptionAdminSettings subscriptionAdminSettings;
try {
TopicAdminClient topicAdminClient = TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
Topic topic = topicAdminClient.createTopic(projectTopicName);
LOGGER.debug(" @@ Topic created : {} ", topic.getName());
subscriptionAdminSettings = SubscriptionAdminSettings
.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient
.create(subscriptionAdminSettings);
Subscription subscription =
subscriptionAdminClient
.createSubscription(projectSubscriptionName, projectTopicName, PushConfig.getDefaultInstance(), 0);
LOGGER.debug(" @@ Subscription created : {} ", subscription.getName());
} catch (IOException | AlreadyExistsException e) {
throw new RuntimeException(e);
}
}
public void publish(
String message
) {
try {
ProjectTopicName topic = ProjectTopicName.of(PROJECT_ID, TOPIC_ID);
Publisher publisher =
Publisher.newBuilder(topic)
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
publisher.publish(pubsubMessage);
publisher.shutdown();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void subscriber(
Consumer<String> consumer,
long timeoutMs
) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
ProjectSubscriptionName subscription = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID);
MessageReceiver receiver = (message, ackReplyConsumer) -> {
consumer.accept(message.getData().toStringUtf8());
//ackReplyConsumer.ack();
};
Subscriber subscriber = null;
try {
subscriber =
Subscriber
.newBuilder(subscription, receiver)
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
LOGGER.error(" @@ failure detected : ", failure);
}
}, MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
Thread.sleep(timeoutMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
}
});
}
public String getServiceHostname() {
if (serviceHostname == null) {
throw new IllegalStateException("Embedded PubSub Service not started yet.");
}
return String.format("http://%s", serviceHostname);
}
public String getFullyQualifiedTopicName() {
return String.format("projects/%s/topics/%s", PROJECT_ID, TOPIC_ID);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment