Skip to content

Instantly share code, notes, and snippets.

@andresanches
Created June 12, 2023 21:12
Show Gist options
  • Save andresanches/dbd7a605d0d86ee19362f02c418d1c85 to your computer and use it in GitHub Desktop.
Save andresanches/dbd7a605d0d86ee19362f02c418d1c85 to your computer and use it in GitHub Desktop.
package com.turo.security.authorization.spicedb;
import com.authzed.api.v1.Core.ObjectReference;
import com.authzed.api.v1.Core.Relationship;
import com.authzed.api.v1.Core.RelationshipUpdate;
import com.authzed.api.v1.Core.RelationshipUpdate.Operation;
import com.authzed.api.v1.Core.SubjectReference;
import com.authzed.api.v1.PermissionService.CheckPermissionRequest;
import com.authzed.api.v1.PermissionService.CheckPermissionResponse;
import com.authzed.api.v1.PermissionService.Consistency;
import com.authzed.api.v1.PermissionService.DeleteRelationshipsRequest;
import com.authzed.api.v1.PermissionService.DeleteRelationshipsResponse;
import com.authzed.api.v1.PermissionService.ExpandPermissionTreeRequest;
import com.authzed.api.v1.PermissionService.ExpandPermissionTreeResponse;
import com.authzed.api.v1.PermissionService.LookupResourcesRequest;
import com.authzed.api.v1.PermissionService.LookupResourcesResponse;
import com.authzed.api.v1.PermissionService.LookupSubjectsRequest;
import com.authzed.api.v1.PermissionService.LookupSubjectsResponse;
import com.authzed.api.v1.PermissionService.ReadRelationshipsRequest;
import com.authzed.api.v1.PermissionService.ReadRelationshipsResponse;
import com.authzed.api.v1.PermissionService.RelationshipFilter;
import com.authzed.api.v1.PermissionService.SubjectFilter;
import com.authzed.api.v1.PermissionService.WriteRelationshipsRequest;
import com.authzed.api.v1.PermissionService.WriteRelationshipsResponse;
import com.authzed.api.v1.PermissionsServiceGrpc;
import com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub;
import com.authzed.api.v1.SchemaServiceGrpc;
import com.authzed.api.v1.SchemaServiceGrpc.SchemaServiceBlockingStub;
import com.authzed.api.v1.SchemaServiceOuterClass.ReadSchemaRequest;
import com.authzed.api.v1.SchemaServiceOuterClass.ReadSchemaResponse;
import com.authzed.api.v1.SchemaServiceOuterClass.WriteSchemaRequest;
import com.authzed.api.v1.SchemaServiceOuterClass.WriteSchemaResponse;
import com.authzed.grpcutil.BearerToken;
import com.google.api.client.util.Preconditions;
import com.google.common.base.Stopwatch;
import com.newrelic.api.agent.NewRelic;
import com.turo.dunlop.platform.commons.metrics.MetricsSupport.TagLabel;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.Range;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
/**
* This SpiceDB client wrapper implements write/read replica logic for operations that can tolerate
* eventually consistent reads. Requests configured with `minimize_latency` are submitted to the
* read-only replica instance, if one is configured.
*
* <p>For the select few operations that require strong consistency the request is always submitted
* to the write replica; see the list of such operations at
* https://authzed.com/docs/reference/api-consistency.
*
* <p>This client conflates operations from both {@link com.authzed.api.v1.PermissionsServiceGrpc}
* and {@link com.authzed.api.v1.SchemaServiceGrpc}.
*
* <p>Usage:
*
* <pre>{@code
* final String writeReplicaHostname = "spicedbwrite.com.rr.mu";
* final int writeReplicaPort = 50051;
* final String writeReplicaBearerToken = "RDid7HLAXhBTINDfmzO9AFZCgRKHovTqeLKKlhBC";
*
* final String readReplicaHostname = "spicedbread.com.rr.mu";
* final int readReplicaPort = 50051;
* final String readReplicaBearerToken = "AFZCgRKHovTqeLKKlhBCRDid7HLAXhBTINDfmzO9";
*
* final boolean useTls = true;
*
* final SpiceDbClient spiceDbClient =
* new SpiceDbClientImpl.Builder(writeReplicaHostname, writeReplicaPort, writeReplicaBearerToken)
* .withReadReplica(readReplicaHostname, readReplicaPort, readReplicaBearerToken)
* .withTls(useTls)
* .build();
* }</pre>
*/
public class SpiceDbClientImpl implements SpiceDbClient {
private final PermissionsServiceBlockingStub permissionsWriteReplica;
private final Optional<PermissionsServiceBlockingStub> maybePermissionsReadReplica;
private final SchemaServiceBlockingStub schemaWriteReplica;
private final Optional<SchemaServiceBlockingStub> maybeSchemaReadReplica;
public static final long DEFAULT_READ_DEADLINE_IN_MILLIS = 1_500;
public static final long DEFAULT_WRITE_DEADLINE_IN_MILLIS = 4_000;
public static final String DEFAULT_CLIENT_NAME = "default_client";
public static final Collection<String> WRITE_OPERATIONS =
new HashSet<>(Arrays.asList("WriteRelationships", "DeleteRelationships", "WriteSchema"));
private SpiceDbClientImpl(
final PermissionsServiceBlockingStub permissionsWriteReplica,
final SchemaServiceBlockingStub schemaWriteReplica,
final Optional<PermissionsServiceBlockingStub> maybePermissionsReadReplica,
final Optional<SchemaServiceBlockingStub> maybeSchemaReadReplica) {
this.permissionsWriteReplica = permissionsWriteReplica;
this.maybePermissionsReadReplica = maybePermissionsReadReplica;
this.schemaWriteReplica = schemaWriteReplica;
this.maybeSchemaReadReplica = maybeSchemaReadReplica;
}
/**
* Wrapper for {@link
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#readRelationships}
*/
@Override
public Iterator<ReadRelationshipsResponse> readRelationships(
final ReadRelationshipsRequest request) {
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency())
.readRelationships(request);
}
/**
* Create a {@link com.authzed.api.v1.PermissionService.WriteRelationshipsRequest} based on the
* provided resource, relation, and subject.
*/
@Override
public WriteRelationshipsRequest buildWriteRelationshipsRequest(
final String resourceType,
final String resourceId,
final String relation,
final String subjectType,
final String subjectId) {
return WriteRelationshipsRequest.newBuilder()
.addUpdates(
RelationshipUpdate.newBuilder()
.setOperation(Operation.OPERATION_TOUCH)
.setRelationship(
Relationship.newBuilder()
.setResource(
ObjectReference.newBuilder()
.setObjectType(resourceType)
.setObjectId(resourceId)
.build())
.setRelation(relation)
.setSubject(
SubjectReference.newBuilder()
.setObject(
ObjectReference.newBuilder()
.setObjectType(subjectType)
.setObjectId(subjectId)
.build())
.build())
.build())
.build())
.build();
}
/**
* Wrapper for {@link
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#writeRelationships}
*
* @implNote This operation is always submitted to the write replica.
*/
@Override
public WriteRelationshipsResponse writeRelationships(final WriteRelationshipsRequest request) {
return permissionsWriteReplica.writeRelationships(request);
}
/**
* Create a {@link com.authzed.api.v1.PermissionService.DeleteRelationshipsRequest} based on the
* provided resource, relation, and subject.
*/
@Override
public DeleteRelationshipsRequest buildDeleteRelationshipsRequest(
final String resourceType,
final String resourceId,
final String relation,
final String subjectType,
final String subjectId) {
final RelationshipFilter.Builder relationshipFilterBuilder =
RelationshipFilter.newBuilder().setResourceType(resourceType);
if (!StringUtils.isEmpty(resourceId)) {
relationshipFilterBuilder.setOptionalResourceId(resourceId);
}
if (!StringUtils.isEmpty(relation)) {
relationshipFilterBuilder.setOptionalRelation(relation);
}
if (!StringUtils.isEmpty(subjectType) || !StringUtils.isEmpty(subjectId)) {
final SubjectFilter.Builder subjectFilterBuilder = SubjectFilter.newBuilder();
if (!StringUtils.isEmpty(subjectType)) {
subjectFilterBuilder.setSubjectType(subjectType);
}
if (!StringUtils.isEmpty(subjectId)) {
subjectFilterBuilder.setOptionalSubjectId(subjectId);
}
relationshipFilterBuilder.setOptionalSubjectFilter(subjectFilterBuilder);
}
return DeleteRelationshipsRequest.newBuilder()
.setRelationshipFilter(relationshipFilterBuilder)
.build();
}
/**
* Wrapper for {@link
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#deleteRelationships}
*
* @implNote This operation is always submitted to the write replica.
*/
@Override
public DeleteRelationshipsResponse deleteRelationships(final DeleteRelationshipsRequest request) {
return permissionsWriteReplica.deleteRelationships(request);
}
/**
* Wrapper for {@link
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#checkPermission}
*/
@Override
public CheckPermissionResponse checkPermission(final CheckPermissionRequest request) {
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency())
.checkPermission(request);
}
/**
* Wrapper for {@link
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#expandPermissionTree}
*/
@Override
public ExpandPermissionTreeResponse expandPermissionTree(
final ExpandPermissionTreeRequest request) {
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency())
.expandPermissionTree(request);
}
/**
* Create a {@link com.authzed.api.v1.PermissionService.LookupResourcesRequest} based on the
* provided resource, permission, and subject.
*/
@Override
public LookupResourcesRequest buildLookupResourcesRequest(
final String resourceType,
final String permission,
final String subjectType,
final String subjectId) {
return LookupResourcesRequest.newBuilder()
.setResourceObjectType(resourceType)
.setPermission(permission)
.setSubject(
SubjectReference.newBuilder()
.setObject(
ObjectReference.newBuilder()
.setObjectType(subjectType)
.setObjectId(subjectId)
.build()))
.build();
}
/**
* Wrapper for {@link
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#lookupResources}
*/
@Override
public Iterator<LookupResourcesResponse> lookupResources(final LookupResourcesRequest request) {
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency())
.lookupResources(request);
}
/**
* Create a {@link com.authzed.api.v1.PermissionService.LookupSubjectsRequest} based on the
* provided subject, permission, and resource.
*/
@Override
public LookupSubjectsRequest buildLookupSubjectsRequest(
final String subjectType,
final String permission,
final String resourceType,
final String resourceId) {
return LookupSubjectsRequest.newBuilder()
.setSubjectObjectType(subjectType)
.setPermission(permission)
.setResource(
ObjectReference.newBuilder()
.setObjectType(resourceType)
.setObjectId(resourceId)
.build())
.build();
}
/**
* Wrapper for {@link
* com.authzed.api.v1.PermissionsServiceGrpc.PermissionsServiceBlockingStub#lookupSubjects}
*/
@Override
public Iterator<LookupSubjectsResponse> lookupSubjects(final LookupSubjectsRequest request) {
return getPermissionsServiceReplicaForRequestConsistency(request.getConsistency())
.lookupSubjects(request);
}
/**
* Wrapper for {@link com.authzed.api.v1.SchemaServiceGrpc.SchemaServiceBlockingStub#readSchema}
*
* @implNote This operation is always submitted to the write replica.
*/
@Override
public ReadSchemaResponse readSchema(final ReadSchemaRequest request) {
return schemaWriteReplica.readSchema(request);
}
/**
* Wrapper for {@link com.authzed.api.v1.SchemaServiceGrpc.SchemaServiceBlockingStub#writeSchema}
*
* @implNote This operation is always submitted to the write replica.
*/
@Override
public WriteSchemaResponse writeSchema(final WriteSchemaRequest request) {
return schemaWriteReplica.writeSchema(request);
}
/**
* Getter for the underlying instance of PermissionsServiceBlockingStub for the write replica.
*
* <p>Use this in case there is an operation that is not yet implemented in this wrapper.
*/
@Override
public PermissionsServiceBlockingStub getPermissionsWriteReplicaClient() {
return this.permissionsWriteReplica;
}
/**
* Getter for the underlying instance of PermissionsServiceBlockingStub for the read replica.
*
* <p>Use this in case there is an operation that is not yet implemented in this wrapper.
*/
@Override
public Optional<PermissionsServiceBlockingStub> getPermissionsReadReplicaClient() {
return this.maybePermissionsReadReplica;
}
/**
* Getter for the underlying instance of SchemaServiceBlockingStub for the write replica.
*
* <p>Use this in case there is an operation that is not yet implemented in this wrapper.
*/
@Override
public SchemaServiceBlockingStub getSchemaWriteReplicaClient() {
return this.schemaWriteReplica;
}
/**
* Getter for the underlying instance of SchemaServiceBlockingStub for the read replica.
*
* <p>Use this in case there is an operation that is not yet implemented in this wrapper.
*/
@Override
public Optional<SchemaServiceBlockingStub> getSchemaReadReplica() {
return this.maybeSchemaReadReplica;
}
/**
* Helper method to determine which replica should handle the request based on its consistency.
*
* @return PermissionsServiceBlockingStub that points to the appropriate replica instance.
*/
private PermissionsServiceBlockingStub getPermissionsServiceReplicaForRequestConsistency(
final Consistency consistency) {
return Optional.ofNullable(consistency)
.filter(c -> c.hasMinimizeLatency() && c.getMinimizeLatency())
.flatMap(__ -> maybePermissionsReadReplica)
.orElse(permissionsWriteReplica);
}
/**
* A GRPC client interceptor that sets sensible defaults for SpiceDB clients and publishes metrics
* for latency and errors per operation.
*
* <p>Inspired by https://techdozo.dev/grpc-interceptor-unary-interceptor-with-code-example/
*/
static class SpiceDbGrpcClientInterceptor implements ClientInterceptor {
private final String uniqueName;
private final boolean isMetricsEnabled;
private final long readDeadlineInMillis;
private final long writeDeadlineInMillis;
private static final Logger log = LoggerFactory.getLogger(SpiceDbClientImpl.class);
public SpiceDbGrpcClientInterceptor(
final String uniqueName,
final boolean isMetricsEnabled,
final long readDeadlineInMillis,
final long writeDeadlineInMillis) {
super();
this.uniqueName = uniqueName;
this.isMetricsEnabled = isMetricsEnabled;
this.readDeadlineInMillis = readDeadlineInMillis;
this.writeDeadlineInMillis = writeDeadlineInMillis;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions,
final Channel next) {
final String operation = method.getBareMethodName();
final Stopwatch stopwatch = Stopwatch.createStarted();
final long deadline =
SpiceDbClientImpl.WRITE_OPERATIONS.contains(operation)
? writeDeadlineInMillis
: readDeadlineInMillis;
final CallOptions callOptionsWithDeadline =
callOptions.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptionsWithDeadline)) {
@Override
public void start(
final ClientCall.Listener<RespT> responseListener, final Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(final Status status, final Metadata trailers) {
stopwatch.stop();
final String statusCode = status.getCode().toString();
final Long latency = stopwatch.elapsed(TimeUnit.MILLISECONDS);
final Optional<String> maybeReason = Optional.ofNullable(status.getDescription());
final Optional<String> maybeCause =
Optional.ofNullable(status.getCause()).map(Throwable::toString);
log.debug(
"SpiceDbClient ({}): Operation: {} Status: {} Latency: {} ms. Reason: {}."
+ " Exception: {}.",
uniqueName,
operation,
statusCode,
latency,
maybeReason.orElse(null),
maybeCause.orElse(null));
recordCustomEvent(
uniqueName, operation, statusCode, latency, maybeReason, maybeCause);
super.onClose(status, trailers);
}
},
headers);
}
};
}
private void recordCustomEvent(
final String uniqueName,
final String operation,
final String statusCode,
final Long elapsedTime,
final Optional<String> maybeReason,
final Optional<String> maybeCause) {
if (!isMetricsEnabled) {
return;
}
final Map<String, String> attributes = new HashMap<>();
attributes.put(TagLabel.NAME.getKey(), uniqueName);
attributes.put(TagLabel.OPERATION.getKey(), operation);
attributes.put(TagLabel.RESULT.getKey(), statusCode);
attributes.put(TagLabel.ELAPSED_TIME.getKey(), String.valueOf(elapsedTime));
maybeReason.ifPresent(reason -> attributes.put(TagLabel.REASON.getKey(), reason));
maybeCause.ifPresent(cause -> attributes.put(TagLabel.FAILURE_CAUSE.getKey(), cause));
NewRelic.getAgent().getInsights().recordCustomEvent("spicedb_client_metrics", attributes);
}
}
/**
* Use this class to build a SpiceDbClient. At a minimum the hostname, port and bearer token for
* the write replica must be provided.
*
* <p>If a read replica is available then use the {@link Builder#withReadReplica} method to
* configure it.
*
* <p>If the SpiceDB instance uses SSL/TLS encryption then use the {@link Builder#withTls} to
* configure the client instance. Note that this setting defaults to plain text.
*/
public static class Builder {
private final String writeReplicaHostname;
private final int writeReplicaPort;
private final String writeReplicaBearerToken;
private Optional<String> maybeReadReplicaHostname = Optional.empty();
private Optional<Integer> maybeReadReplicaPort = Optional.empty();
private Optional<String> maybeReadReplicaBearerToken = Optional.empty();
private Optional<String> maybeUniqueNameForMetrics = Optional.empty();
private Optional<Long> maybeReadDeadlineInMillis = Optional.empty();
private Optional<Long> maybeWriteDeadlineInMillis = Optional.empty();
private boolean isTlsEnabled;
public Builder(
final String writeReplicaHostname,
final int writeReplicaPort,
final String writeReplicaBearerToken) {
this.writeReplicaHostname = writeReplicaHostname;
this.writeReplicaPort = writeReplicaPort;
this.writeReplicaBearerToken = writeReplicaBearerToken;
}
/**
* Only use this if your SpiceDb instance is self-hosted and has more than one read replica.
*
* @param readReplicaHostname The fully qualified domain name (FQDN) for the server
* @param readReplicaPort The TCP port for the server
* @param readReplicaBearerToken The bearer token for the replica
*/
public SpiceDbClientImpl.Builder withReadReplica(
final String readReplicaHostname,
final int readReplicaPort,
final String readReplicaBearerToken) {
this.maybeReadReplicaHostname = Optional.of(readReplicaHostname);
this.maybeReadReplicaPort = Optional.of(readReplicaPort);
this.maybeReadReplicaBearerToken = Optional.of(readReplicaBearerToken);
return this;
}
/**
* Set this to true if your SpiceDB instance uses TLS encryption. Defaults to false (plaintext).
*
* <p>Note: TLS is required when using AuthZed SaaS instances.
*/
public SpiceDbClientImpl.Builder withTls(final boolean isEnabled) {
this.isTlsEnabled = isEnabled;
return this;
}
/**
* Set this to a unique name for your permissions system (aka spicedb tenant) for which metrics
* will be published. Examples: "hosting_team", "payments, "gearbox", etc.
*
* <p>The SpiceDbClient will not publish metrics by default if built without this option.
*/
public SpiceDbClientImpl.Builder withMetrics(final String uniqueName) {
this.maybeUniqueNameForMetrics = Optional.of(uniqueName);
return this;
}
/**
* The maximum amount of time each read request to SpiceDB should wait for a response.
*
* <p>If not set it defaults to the value in {@link
* SpiceDbClientImpl#DEFAULT_READ_DEADLINE_IN_MILLIS}.
*/
public SpiceDbClientImpl.Builder withReadDeadline(final long timeInMillis) {
this.maybeReadDeadlineInMillis = Optional.of(timeInMillis);
return this;
}
/**
* The maximum amount of time each write request to SpiceDB should wait for a response.
*
* <p>If not set it defaults to the value in {@link
* SpiceDbClientImpl#DEFAULT_WRITE_DEADLINE_IN_MILLIS}.
*/
public SpiceDbClientImpl.Builder withWriteDeadline(final long timeInMillis) {
this.maybeWriteDeadlineInMillis = Optional.of(timeInMillis);
return this;
}
public SpiceDbClient build() {
// Left = Permissions Service client, Right = Schema Service client
final Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub> writeReplicaClients =
validateAndBuildWriteReplicaClients();
final Optional<Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub>>
readReplicaClients = maybeValidateAndBuildReadReplicaClients();
return new SpiceDbClientImpl(
writeReplicaClients.getLeft(),
writeReplicaClients.getRight(),
readReplicaClients.map(Pair::getLeft),
readReplicaClients.map(Pair::getRight));
}
private Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub>
validateAndBuildWriteReplicaClients() {
// Ensure all required parameters for the write replica are valid.
Preconditions.checkArgument(
writeReplicaHostname != null, "writeReplicaHostname cannot be null");
Preconditions.checkArgument(
Range.between(1, 65_535).contains(writeReplicaPort), "writeReplicaPort is invalid");
Preconditions.checkArgument(
writeReplicaBearerToken != null, "writeReplicaBearerToken cannot be null");
// Create a single shared long-lived channel for both Permissions and Schema write replicas
final ManagedChannel writeReplicaChannel =
buildManagedChannel(writeReplicaHostname, writeReplicaPort, isTlsEnabled);
final PermissionsServiceBlockingStub permissionsServiceWriteReplica =
PermissionsServiceGrpc.newBlockingStub(writeReplicaChannel)
.withCallCredentials(new BearerToken(writeReplicaBearerToken));
final SchemaServiceGrpc.SchemaServiceBlockingStub schemaServiceWriteReplica =
SchemaServiceGrpc.newBlockingStub(writeReplicaChannel)
.withCallCredentials(new BearerToken(writeReplicaBearerToken));
return Pair.of(permissionsServiceWriteReplica, schemaServiceWriteReplica);
}
private Optional<Pair<PermissionsServiceBlockingStub, SchemaServiceBlockingStub>>
maybeValidateAndBuildReadReplicaClients() {
// If any of the read replica parameters are present then ensure they are all present and
// valid before building Grpc Stubs.
if (maybeReadReplicaHostname.isPresent()
|| maybeReadReplicaPort.isPresent()
|| maybeReadReplicaBearerToken.isPresent()) {
Preconditions.checkArgument(
maybeReadReplicaHostname.isPresent(), "readReplicaHostname is invalid or missing");
Preconditions.checkArgument(
maybeReadReplicaPort
.filter(port -> Range.between(1, 65_535).contains(port))
.isPresent(),
"readReplicaPort is invalid or missing");
Preconditions.checkArgument(
maybeReadReplicaBearerToken.isPresent(),
"readReplicaBearerToken is invalid is missing");
}
return maybeReadReplicaHostname.flatMap(
readReplicaHostName ->
maybeReadReplicaPort.flatMap(
readReplicaPort ->
maybeReadReplicaBearerToken.map(
readReplicaBearerToken -> {
// Create a single shared long-lived channel for both Permissions and
// Schema read replicas
final ManagedChannel readReplicaChannel =
buildManagedChannel(
readReplicaHostName, readReplicaPort, isTlsEnabled);
final PermissionsServiceBlockingStub permissionsServiceReadReplica =
PermissionsServiceGrpc.newBlockingStub(readReplicaChannel)
.withCallCredentials(new BearerToken(readReplicaBearerToken));
final SchemaServiceBlockingStub schemaServiceReadReplica =
SchemaServiceGrpc.newBlockingStub(readReplicaChannel)
.withCallCredentials(new BearerToken(readReplicaBearerToken));
return Pair.of(permissionsServiceReadReplica, schemaServiceReadReplica);
})));
}
private ManagedChannel buildManagedChannel(
final String hostname, final int port, final boolean useTransportSecurity) {
ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress(hostname, port);
if (useTransportSecurity) {
channelBuilder = channelBuilder.useTransportSecurity();
} else {
channelBuilder = channelBuilder.usePlaintext();
}
final String uniqueName =
maybeUniqueNameForMetrics.orElse(SpiceDbClientImpl.DEFAULT_CLIENT_NAME);
// will only publish metrics if a unique name for the client is provided.
final boolean isMetricsEnabled = maybeUniqueNameForMetrics.isPresent();
final long readDeadline =
this.maybeReadDeadlineInMillis.orElse(SpiceDbClientImpl.DEFAULT_READ_DEADLINE_IN_MILLIS);
final long writeDeadline =
this.maybeWriteDeadlineInMillis.orElse(
SpiceDbClientImpl.DEFAULT_WRITE_DEADLINE_IN_MILLIS);
channelBuilder =
channelBuilder.intercept(
new SpiceDbGrpcClientInterceptor(
uniqueName, isMetricsEnabled, readDeadline, writeDeadline));
return channelBuilder.build();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment