Skip to content

Instantly share code, notes, and snippets.

@ctippur
Created July 28, 2017 20:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ctippur/9f0900b1719793d0c67f5bb143d16ec8 to your computer and use it in GitHub Desktop.
Save ctippur/9f0900b1719793d0c67f5bb143d16ec8 to your computer and use it in GitHub Desktop.
kstream to ktable join
package com.test.argos_streams.task;
/**
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import java.util.*;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
* write data to a sink (output) topic.
*
* In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input"
* and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
*
* Before running this example you must create the input topic and the output topic (e.g. via
* bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/
public class ArgosStreamTask {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
Map<String, Object> serdeProps = new HashMap<>();
KStreamBuilder builder = new KStreamBuilder();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serde<String> stringSerde = Serdes.String();
final KStream<String, JsonNode> raw = builder.stream(stringSerde, jsonSerde, "raw");
final KTable<String, JsonNode> cache = builder.table( stringSerde, jsonSerde, "cache", "local-cache");
final KStream<String, JsonNode> parser = raw.leftJoin(cache, new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
@Override
public JsonNode apply(JsonNode raw, JsonNode cache) {
System.out.println("IN Apply");
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
try {
return jNode.put("L1", cache.get("L1").textValue());
}catch(NullPointerException npe){
System.out.println("IN npe");
return null;
}
//return jNode.put("source", "world");
//return jNode;
}
});
parser.to(stringSerde, jsonSerde, "parser");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
parser.foreach(new ForeachAction<String, JsonNode>() {
@Override
public void apply(String key, JsonNode value) {
System.out.println(key + ": " + value);
if (value == null){
System.out.println("null match");
ReadOnlyKeyValueStore<String, Long> keyValueStore =
null;
try {
keyValueStore = IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", QueryableStoreTypes.keyValueStore(), streams);
} catch (InterruptedException e) {
e.printStackTrace();
}
KeyValueIterator kviterator = keyValueStore.range("test_nod","test_node");
}
}
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
@dguy
Copy link

dguy commented Jul 29, 2017

streams.start() needs to be after parser.foreach(..) It is not clear why you want to look up the store in your foreach?
If you want to query the store you could instead do something like:

class TheProcessor extends AbstractProcessor<String, JsonNode> {
   KeyValueStore<String, JsonNode> store;  
   public void init(ProcessorContext context) {
        super.init(context);
        store = (KeyValueStore<String, JsonNode>) context.getStateStore("local-cache");
    }

   void process(String key, JsonNode value) {
         KeyValueIterator iterator = store.range(...,...);
   }
}
parser.process(() -> new TheProcessor(), "local-cache");

@ctippur
Copy link
Author

ctippur commented Aug 4, 2017

dguy -
Getting a weird syntax error at:
store = (KeyValueStore<String, JsonNode> context.getStateStore("local-cache");
')' expected and not a statement.
The syntax looks cryptic to me. Can you please comment on it?
Thanks,
S

@dguy
Copy link

dguy commented Aug 7, 2017

@ctippur, i just updated it there was a missing closing )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment