This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
// The greeting service definition. | |
service Greeter { | |
// Sends a greeting | |
rpc SayHello (HelloRequest) returns (HelloReply) {} | |
} | |
// The request message containing the user's name. | |
message HelloRequest { | |
string name = 1; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<grpc.version>1.19.0</grpc.version><!-- CURRENT_GRPC_VERSION --> | |
<protobuf.version>3.6.1</protobuf.version> | |
<protoc.version>3.6.1</protoc.version> | |
<!-- required for jdk9 --> | |
<maven.compiler.source>1.7</maven.compiler.source> | |
<maven.compiler.target>1.7</maven.compiler.target> | |
</properties> | |
<build> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private class GreeterImpl extends GreeterGrpc.GreeterImplBase { | |
@Override | |
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) { | |
//Build Proto Messages Object | |
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); | |
//Send the response, you can send multiple objects which will be trasmitted in streaming fashion | |
responseObserver.onNext(reply); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Client { | |
private final ManagedChannel channel; | |
private final GreeterBlockingStub blockingStub; | |
private final GreeterStub asyncStub; | |
public Client(String host, String port){ | |
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); //remove .usePlainText for secure connections | |
blockingStub = GreeterGrpc.newBlockingStub(channel); // use it to make blocking calls | |
asyncStub = GreeterGrpc.newStub(channel); // or use this to make async calls | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public GreeterServer(int port) throws IOException { | |
this(ServerBuilder.forPort(port), port); | |
} | |
public GreeterServer(ServerBuilder<?> serverBuilder, int port) { | |
this.port = port; | |
server = serverBuilder.addService(new GreeterImpl()).build(); | |
} | |
public void start() throws IOException { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestCheckpointJob { | |
public static void main(String[] args) throws Exception { | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); | |
Properties kafkaConsumerProperties = new Properties(); | |
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092"); | |
kafkaConsumerProperties.setProperty("group.id", "test_group_id"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestCheckpointJobWithBootstrap { | |
public static void main(String[] args) throws Exception { | |
bootstrapConfig(); | |
... rest same as previous snippet ... | |
} | |
static void bootstrapConfig() throws IOException { | |
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); | |
ExistingSavepoint existingSavepoint = Savepoint.load(executionEnvironment, "oldSavepointPath", new MemoryStateBackend()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ConfigBootstrapper extends KeyedStateBootstrapFunction<String, TestConfig> { | |
ValueState<Integer> threshold; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class)); | |
} | |
@Override | |
public void processElement(TestConfig testConfig, Context context) throws Exception { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ClassifyData extends RichFlatMapFunction<TestData, LabeledTestData>{ | |
ValueState<Integer> threshold; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
super.open(parameters); | |
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestCheckpointJob { | |
public static void main(String[] args) throws Exception { | |
bootstrapConfig(); | |
//Rest same as previous code | |
} | |
} | |
static void bootstrapConfig() throws IOException { | |
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); |
OlderNewer