Skip to content

Instantly share code, notes, and snippets.

View KKcorps's full-sized avatar
🏠
Working from home

Kartik Khare KKcorps

🏠
Working from home
View GitHub Profile
@KKcorps
KKcorps / greeter.proto
Created September 9, 2019 16:14
greeter.proto
syntax = "proto3";
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
@KKcorps
KKcorps / pom.xml
Last active September 9, 2019 16:32
Maven Proto Snippet
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.19.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.6.1</protobuf.version>
<protoc.version>3.6.1</protoc.version>
<!-- required for jdk9 -->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<build>
@KKcorps
KKcorps / GreeterImpl.java
Created September 9, 2019 16:54
GreeterImpl.java
private class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
//Build Proto Messages Object
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
//Send the response, you can send multiple objects which will be trasmitted in streaming fashion
responseObserver.onNext(reply);
@KKcorps
KKcorps / Client.java
Last active September 10, 2019 04:22
public class Client {
private final ManagedChannel channel;
private final GreeterBlockingStub blockingStub;
private final GreeterStub asyncStub;
public Client(String host, String port){
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); //remove .usePlainText for secure connections
blockingStub = GreeterGrpc.newBlockingStub(channel); // use it to make blocking calls
asyncStub = GreeterGrpc.newStub(channel); // or use this to make async calls
}
@KKcorps
KKcorps / server.java
Created September 10, 2019 04:32
server.java
public GreeterServer(int port) throws IOException {
this(ServerBuilder.forPort(port), port);
}
public GreeterServer(ServerBuilder<?> serverBuilder, int port) {
this.port = port;
server = serverBuilder.addService(new GreeterImpl()).build();
}
public void start() throws IOException {
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaConsumerProperties.setProperty("group.id", "test_group_id");
public class TestCheckpointJobWithBootstrap {
public static void main(String[] args) throws Exception {
bootstrapConfig();
... rest same as previous snippet ...
}
static void bootstrapConfig() throws IOException {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint existingSavepoint = Savepoint.load(executionEnvironment, "oldSavepointPath", new MemoryStateBackend());
class ConfigBootstrapper extends KeyedStateBootstrapFunction<String, TestConfig> {
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
@Override
public void processElement(TestConfig testConfig, Context context) throws Exception {
class ClassifyData extends RichFlatMapFunction<TestData, LabeledTestData>{
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
bootstrapConfig();
//Rest same as previous code
}
}
static void bootstrapConfig() throws IOException {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();