Created
July 28, 2017 20:49
-
-
Save ctippur/9f0900b1719793d0c67f5bb143d16ec8 to your computer and use it in GitHub Desktop.
kstream to ktable join
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.test.argos_streams.task; | |
/** | |
*/ | |
/** | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import com.fasterxml.jackson.core.*; | |
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.node.JsonNodeType; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import org.apache.kafka.common.serialization.Serde; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.common.serialization.Serializer; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.kstream.*; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsConfig; | |
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import com.fasterxml.jackson.databind.node.JsonNodeFactory; | |
import org.apache.kafka.connect.json.JsonSerializer; | |
import org.apache.kafka.connect.json.JsonDeserializer; | |
import java.util.*; | |
import org.apache.kafka.streams.state.KeyValueIterator; | |
import org.apache.kafka.streams.state.QueryableStoreTypes; | |
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to | |
* write data to a sink (output) topic. | |
* | |
* In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input" | |
* and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". | |
* | |
* Before running this example you must create the input topic and the output topic (e.g. via | |
* bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via | |
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. | |
*/ | |
public class ArgosStreamTask { | |
public static void main(String[] args) throws Exception { | |
Properties props = new Properties(); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); | |
Map<String, Object> serdeProps = new HashMap<>(); | |
KStreamBuilder builder = new KStreamBuilder(); | |
final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); | |
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); | |
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); | |
final Serde<String> stringSerde = Serdes.String(); | |
final KStream<String, JsonNode> raw = builder.stream(stringSerde, jsonSerde, "raw"); | |
final KTable<String, JsonNode> cache = builder.table( stringSerde, jsonSerde, "cache", "local-cache"); | |
final KStream<String, JsonNode> parser = raw.leftJoin(cache, new ValueJoiner<JsonNode, JsonNode, JsonNode>() { | |
@Override | |
public JsonNode apply(JsonNode raw, JsonNode cache) { | |
System.out.println("IN Apply"); | |
ObjectNode jNode = JsonNodeFactory.instance.objectNode(); | |
try { | |
return jNode.put("L1", cache.get("L1").textValue()); | |
}catch(NullPointerException npe){ | |
System.out.println("IN npe"); | |
return null; | |
} | |
//return jNode.put("source", "world"); | |
//return jNode; | |
} | |
}); | |
parser.to(stringSerde, jsonSerde, "parser"); | |
KafkaStreams streams = new KafkaStreams(builder, props); | |
streams.start(); | |
parser.foreach(new ForeachAction<String, JsonNode>() { | |
@Override | |
public void apply(String key, JsonNode value) { | |
System.out.println(key + ": " + value); | |
if (value == null){ | |
System.out.println("null match"); | |
ReadOnlyKeyValueStore<String, Long> keyValueStore = | |
null; | |
try { | |
keyValueStore = IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", QueryableStoreTypes.keyValueStore(), streams); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
KeyValueIterator kviterator = keyValueStore.range("test_nod","test_node"); | |
} | |
} | |
}); | |
Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); | |
} | |
} |
dguy -
Getting a weird syntax error at:
store = (KeyValueStore<String, JsonNode> context.getStateStore("local-cache");
')' expected and not a statement.
The syntax looks cryptic to me. Can you please comment on it?
Thanks,
S
@ctippur, i just updated it there was a missing closing )
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
streams.start()
needs to be afterparser.foreach(..)
It is not clear why you want to look up the store in yourforeach
?If you want to query the store you could instead do something like: