Skip to content

Instantly share code, notes, and snippets.

@shanemhansen
Last active August 12, 2022 01:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shanemhansen/034b958bb3c5cfa4f53874a0f98e2f02 to your computer and use it in GitHub Desktop.
Save shanemhansen/034b958bb3c5cfa4f53874a0f98e2f02 to your computer and use it in GitHub Desktop.
package ai.shane;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollSocketChannel;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
import java.util.List;
/**
* This app can be run after building a suitable jar by running /usr/bin/time jar -cp $WHATEVER.jar ai.shane.App 10 500
*
*/
public class App {
private static AtomicLong msgCount = new AtomicLong();
public static void main(String[] args) {
// Apologies for bad docks.
// args[0] is threadpool size. 5 is enough for 500 topics in my testing
// args[1] is number of publishers. I precreated 500 or so topics. You will need to have topic1 though topic+args[1] topics in pubsub.
String projectId = "gcp-project";
List<Future<String>> msgIds = new ArrayList<Future<String>>();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(Integer.parseInt(args[0]));
FixedExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
// NioEventLoopGroup loop = new NioEventLoopGroup(1, executor);
EpollEventLoopGroup loop = new EpollEventLoopGroup(4, executor);
InstantiatingGrpcChannelProvider channelProvider = PublisherStubSettings.defaultGrpcTransportProviderBuilder()
.setExecutorProvider(executorProvider)
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(
ManagedChannelBuilder managedChannelBuilder) {
NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder) managedChannelBuilder
.executor(loop);
nettyChannelBuilder.eventLoopGroup(loop);
nettyChannelBuilder.channelType(
EpollSocketChannel.class); // Use EPoll if available, if using EPoll update
// above line to use EPollEventLoopGroup
return nettyChannelBuilder;
}
})
.build();
try {
final int count = Integer.parseInt(args[1]) + 1;
List<Future<Publisher>> plist = new ArrayList<Future<Publisher>>();
// Note this creates $count publishers talking to $count topics using the naming convention
// topic1..n. You will need to create these if you are testing.
for (int i = 1; i < count; i++) {
plist.add(createPublisher(projectId, "topic" + i, executorProvider, channelProvider));
}
System.out.println("creating publishers in parallel using low fixed number of threads.");
List<Publisher> pubs = new ArrayList<>();
for (Future<Publisher> p : plist) {
pubs.add(p.get());
}
System.out.println("created publishers. Sending messages async.");
for(Publisher p: pubs) {
msgIds.add(publishAsync(p, "message", executorProvider));
}
System.out.println("Resolving message futures. Getting message IDs.");
// wait for everything
for (Future<String> msgId : msgIds) {
msgId.get();
}
System.out.println("shutdown event loops and executors");
loop.shutdownGracefully().get();
// have to shutdown the epoll executor
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("sent " + msgCount.get() + " messages.");
} catch (Exception e) {
e.printStackTrace();
}
}
public static Future<String> publishAsync(Publisher p, String message, ExecutorProvider executorProvider)
throws IOException, ExecutionException, InterruptedException {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the
// topic)
ApiFuture<String> msgId = p.publish(pubsubMessage);
ApiFutures.addCallback(msgId, new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable t) {
// TODO actual error handling
t.printStackTrace();
}
@Override
public void onSuccess(String result) {
msgCount.incrementAndGet();
}
}, executorProvider.getExecutor());
return msgId;
}
public static Future<Publisher> createPublisher(String projectId, String topicId, ExecutorProvider executorProvider,
InstantiatingGrpcChannelProvider channelProvider)
throws IOException {
TopicName topicName = TopicName.of(projectId, topicId);
return executorProvider.getExecutor().submit(() -> {
return Publisher.newBuilder(topicName).setExecutorProvider(executorProvider)
.setChannelProvider(channelProvider)
.build();
});
}
}
package ai.shane;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.api.core.ApiFunction;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollSocketChannel;
public class Slim {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
String topicId = "topic1";
String projectId = "gcp-project";
// A fixed sized threadpool is not necessarily recommended, but here for demonstration purposes.
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5); // 5 is plenty for 500 producers
FixedExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
EpollEventLoopGroup loop = new EpollEventLoopGroup(4, executor);
InstantiatingGrpcChannelProvider channelProvider = PublisherStubSettings.defaultGrpcTransportProviderBuilder()
.setExecutorProvider(executorProvider)
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(
ManagedChannelBuilder managedChannelBuilder) {
NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder) managedChannelBuilder
.executor(loop);
nettyChannelBuilder.eventLoopGroup(loop);
nettyChannelBuilder.channelType(
EpollSocketChannel.class); // Use EPoll if available, if using EPoll update
// above line to use EPollEventLoopGroup
return nettyChannelBuilder;
}
})
.build();
TopicName topicName = TopicName.of(projectId, topicId);
// the calls to setExecutorProvider and setChannelProvider are key
// to managing threadcount and sharing resources.
Publisher p = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider)
.setChannelProvider(channelProvider)
.build();
ByteString data = ByteString.copyFromUtf8("hello, world!");
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
p.publish(pubsubMessage).get();
loop.shutdownGracefully().get();
// have to shutdown the epoll executor
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment