Skip to content

Instantly share code, notes, and snippets.

@ShigeoTejima
Last active August 9, 2023 19:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ShigeoTejima/330de9f695c81d745017735bea4acf4a to your computer and use it in GitHub Desktop.
Save ShigeoTejima/330de9f695c81d745017735bea4acf4a to your computer and use it in GitHub Desktop.
Coding according to trace ProcessChangeEventHeader in pub-sub-api
// Demo.java
import com.google.protobuf.ByteString;
import com.salesforce.eventbus.protobuf.*;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.Utf8;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class Demo {
public static void main(String[] args) {
String loginEndpoint = System.getenv("loginEndpoint");
String tenantId = System.getenv("tenantId");
String accessToken = System.getenv("accessToken");
String topicName = System.getenv("topicName");
Objects.requireNonNull(loginEndpoint, "loginEndpoint required in environment variable.");
Objects.requireNonNull(tenantId, "tenantId required in environment variable.");
Objects.requireNonNull(accessToken, "accessToken required in environment variable.");
Objects.requireNonNull(topicName, "topicName required in environment variable.");
Demo demo = new Demo(loginEndpoint, tenantId, accessToken, topicName);
demo.startApp();
demo.stopApp();
}
private final Subscriber subscriber;
private final String topicName;
Demo() {
// for hello
this.subscriber = null;
this.topicName = null;
}
Demo(String loginEndpoint, String tenantId, String accessToken, String topicName) {
this.subscriber = new Subscriber(loginEndpoint, tenantId, accessToken);
this.topicName = topicName;
}
public String hello() {
return "Hello";
}
void startApp() {
this.subscriber.startSubscription(
new StreamObserver<>() {
@Override
public void onNext(FetchResponse fetchResponse) {
for (ConsumerEvent ce : fetchResponse.getEventsList()) {
try {
Schema schema = subscriber.getSchema(ce.getEvent().getSchemaId());
GenericRecord eventPayload = deserialize(schema, ce.getEvent().getPayload());
System.out.println(eventPayload.toString());
List<String> changedFields = getFieldListFromBitmap(schema, (GenericData.Record) eventPayload.get("ChangeEventHeader"), "changedFields");
if (!changedFields.isEmpty()) {
System.out.println("=== Changed Fields ===");
changedFields.forEach(System.out::println);
System.out.println("======================");
}
System.out.println();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
throwable.printStackTrace();
subscriber.isActive.set(false);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
subscriber.isActive.set(false);
}
private GenericRecord deserialize(Schema schema, ByteString payload) throws IOException {
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
ByteArrayInputStream in = new ByteArrayInputStream(payload.toByteArray());
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
return reader.read(null, decoder);
}
private List<String> getFieldListFromBitmap(Schema schema, GenericData.Record eventHeader, String fieldName) {
GenericData.Array<Utf8> utf8value = (GenericData.Array<Utf8>)eventHeader.get(fieldName);
List<String> values = utf8value.stream().map(v -> v.toString()).collect(Collectors.toList());
expandBitmap(schema, values);
return values;
}
private void expandBitmap(Schema schema, List<String> values) {
if (Objects.isNull(values) || values.isEmpty()) {
return;
}
if (values.get(0).startsWith("0x")) {
String bitMap = values.get(0);
values.addAll(0, fieldNamesFromBitmap(schema, bitMap));
values.remove(bitMap);
}
if (values.get(values.size() - 1).contains("-")) {
for (ListIterator<String> itr = values.listIterator(); itr.hasNext();) {
String[] bitmapMapString = itr.next().split("-");
if (bitmapMapString.length < 2) {
continue;
}
Schema.Field parentField = schema.getFields().get(Integer.valueOf(bitmapMapString[0]));
Schema childSchema = getValueSchema(parentField.schema());
if (childSchema.getType().equals(Schema.Type.RECORD)) {
int nestedSize = childSchema.getFields().size();
String parentFieldName = parentField.name();
List<String> fullFiledNames = fieldNamesFromBitmap(childSchema, bitmapMapString[1]).stream()
.map(col -> parentFieldName + "." + col)
.collect(Collectors.toList());
if (!fullFiledNames.isEmpty()) {
itr.remove();
if (fullFiledNames.size() == nestedSize) {
itr.add(parentFieldName);
} else {
fullFiledNames.stream()
.forEach(itr::add);
}
}
}
}
}
}
private List<String> fieldNamesFromBitmap(Schema schema, String bitmap) {
BitSet bitSet = convertHexStringToBitSet(bitmap);
List<String> fieldNames = bitSet.stream()
.mapToObj(pos -> schema.getFields().get(pos).name())
.collect(Collectors.toList());
return fieldNames;
}
private BitSet convertHexStringToBitSet(String hex) {
String s = hex.substring(2);
int len = s.length();
byte[] bytes = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
bytes[i / 2] = (byte)((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16));
}
len /= 2;
byte[] reversedBytes = new byte[len];
for (int i = 0; i < len; i++) {
reversedBytes[i] = bytes[len - i - 1];
}
return BitSet.valueOf(reversedBytes);
}
private Schema getValueSchema(Schema schema) {
if (schema.getType() == Schema.Type.UNION) {
List<Schema> types = schema.getTypes();
if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL) {
return types.get(1);
} else if (types.size() == 2 && types.get(0).getType() == Schema.Type.STRING) {
return schema.getTypes().get(1);
} else if (types.size() == 3 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.STRING) {
return schema.getTypes().get(2);
}
}
return schema;
}
},
5,
this.topicName,
ReplayPreset.LATEST,
null);
}
void stopApp() {
this.subscriber.close();
}
static class Subscriber {
public static AtomicBoolean isActive = new AtomicBoolean(false);
private final ManagedChannel channel;
private final PubSubGrpc.PubSubStub asyncStub;
private final PubSubGrpc.PubSubBlockingStub blockingStub;
private final Map<String, Schema> schemaCache;
private StreamObserver<FetchRequest> serverStream;
Subscriber(String loginEndpoint, String tenantId, String accessToken) {
String grpcHost = "api.pubsub.salesforce.com";
int grpcPort = 7443;
this.channel = ManagedChannelBuilder.forAddress(grpcHost, grpcPort).build();
SessionTokenService sessionTokenService = new SessionTokenService();
APISessionCredentials apiSessionCredentials = sessionTokenService.loginWithAccessToken(loginEndpoint, tenantId, accessToken);
this.asyncStub = PubSubGrpc.newStub(channel).withCallCredentials(apiSessionCredentials);
this.blockingStub = PubSubGrpc.newBlockingStub(channel).withCallCredentials(apiSessionCredentials);
this.schemaCache = new ConcurrentHashMap<>();
isActive.set(true);
}
void startSubscription(
StreamObserver<FetchResponse> responseObserver,
int providedBatchSize,
String providedTopicName,
ReplayPreset providedReplayPreset,
ByteString providedReplayId
) {
this.serverStream = this.asyncStub.subscribe(responseObserver);
FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder()
.setNumRequested(providedBatchSize)
.setTopicName(providedTopicName)
.setReplayPreset(providedReplayPreset);
if (providedReplayPreset == ReplayPreset.CUSTOM) {
fetchRequestBuilder.setReplayId((providedReplayId));
}
serverStream.onNext(fetchRequestBuilder.build());
while (isActive.get()) {
synchronized (this) {
try {
this.wait(5_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
void close() {
if (Objects.nonNull(this.serverStream)) {
this.serverStream.onCompleted();
}
try {
this.channel.shutdown().awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Schema getSchema(String schemaId) {
return schemaCache.computeIfAbsent(schemaId, id -> {
SchemaRequest request = SchemaRequest.newBuilder().setSchemaId(id).build();
String schemaJson = this.blockingStub.getSchema(request).getSchemaJson();
return new Schema.Parser().parse(schemaJson);
});
}
}
static class SessionTokenService {
APISessionCredentials loginWithAccessToken(String loginEndpoint, String tenantId, String accessToken) {
return new APISessionCredentials(loginEndpoint, tenantId, accessToken);
}
}
static class APISessionCredentials extends CallCredentials {
private final String instanceUrl;
private final String tenantId;
private final String accessToken;
APISessionCredentials(String instanceUrl, String tenantId, String accessToken) {
this.instanceUrl = instanceUrl;
this.tenantId = tenantId;
this.accessToken = accessToken;
}
@Override
public void applyRequestMetadata(RequestInfo requestInfo, Executor executor, MetadataApplier metadataApplier) {
Metadata metadata = new Metadata();
metadata.put(keyOf("instanceUrl"), instanceUrl);
metadata.put(keyOf("tenantId"), tenantId);
metadata.put(keyOf("accessToken"), accessToken);
metadataApplier.apply(metadata);
}
@Override
public void thisUsesUnstableApi() {
// nop
}
private Metadata.Key<String> keyOf(String name) {
return Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER);
}
}
}
<!-- pom.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.57.1</grpc.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.23.4:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment