Skip to content

Instantly share code, notes, and snippets.

@ctippur
Created July 28, 2017 20:49
Show Gist options
  • Save ctippur/9f0900b1719793d0c67f5bb143d16ec8 to your computer and use it in GitHub Desktop.
Save ctippur/9f0900b1719793d0c67f5bb143d16ec8 to your computer and use it in GitHub Desktop.
kstream to ktable join
package com.test.argos_streams.task;
/**
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import java.util.*;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
* write data to a sink (output) topic.
*
* In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
* and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
*
* Before running this example you must create the input topic and the output topic (e.g. via
* bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/
public class ArgosStreamTask {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
Map<String, Object> serdeProps = new HashMap<>();
KStreamBuilder builder = new KStreamBuilder();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serde<String> stringSerde = Serdes.String();
final KStream<String, JsonNode> raw = builder.stream(stringSerde, jsonSerde, "raw");
final KTable<String, JsonNode> cache = builder.table( stringSerde, jsonSerde, "cache", "local-cache");
final KStream<String, JsonNode> parser = raw.leftJoin(cache, new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
@Override
public JsonNode apply(JsonNode raw, JsonNode cache) {
System.out.println("IN Apply");
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
try {
return jNode.put("L1", cache.get("L1").textValue());
}catch(NullPointerException npe){
System.out.println("IN npe");
return null;
}
//return jNode.put("source", "world");
//return jNode;
}
});
parser.to(stringSerde, jsonSerde, "parser");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
parser.foreach(new ForeachAction<String, JsonNode>() {
@Override
public void apply(String key, JsonNode value) {
System.out.println(key + ": " + value);
if (value == null){
System.out.println("null match");
ReadOnlyKeyValueStore<String, Long> keyValueStore =
null;
try {
keyValueStore = IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", QueryableStoreTypes.keyValueStore(), streams);
} catch (InterruptedException e) {
e.printStackTrace();
}
KeyValueIterator kviterator = keyValueStore.range("test_nod","test_node");
}
}
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
@dguy
Copy link

dguy commented Aug 7, 2017

@ctippur, i just updated it there was a missing closing )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment