Last active
October 19, 2022 02:59
-
-
Save wirelessr/f1f87e1164b9d8e07507bfbcaaa89c33 to your computer and use it in GitHub Desktop.
PyFlink Connect Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging, sys | |
from pyflink.common import WatermarkStrategy, Row | |
from pyflink.common.serialization import Encoder | |
from pyflink.common.typeinfo import Types | |
from pyflink.datastream import StreamExecutionEnvironment | |
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource | |
from pyflink.datastream.execution_mode import RuntimeExecutionMode | |
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext, MapFunction, CoMapFunction, CoFlatMapFunction | |
from pyflink.datastream.state import MapStateDescriptor, ValueStateDescriptor | |
from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema | |
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer | |
class SellTotal(CoFlatMapFunction): | |
def open(self, runtime_context: RuntimeContext): | |
state_desc = MapStateDescriptor('map', Types.LONG(), Types.STRING()) | |
self.state = runtime_context.get_map_state(state_desc) | |
cnt_desc = ValueStateDescriptor('cnt', Types.LONG()) | |
self.cnt_state = runtime_context.get_state(cnt_desc) | |
# (id, name) | |
def flat_map1(self, value): | |
self.state.put(value[0], value[1]) | |
#return Row(value[0], f"update {value[1]}", 0) | |
# (id, cnt) | |
def flat_map2(self, value): | |
cnt = self.cnt_state.value() or 0 | |
total = cnt + value[1] | |
self.cnt_state.update(total) | |
if not self.state.contains(value[0]): | |
name = "NONAME" | |
else: | |
name = self.state.get(value[0]) | |
#return Row(value[0], name, total) | |
yield Row(value[0], name, total) | |
def sell_total_demo(env): | |
type_info1 = Types.ROW([Types.LONG(), Types.STRING()]) | |
ds1 = env.from_collection( | |
[(1, 'apple'), (2, 'banana'), (3, 'cherry'), (4, 'durian'), (6, 'fig'), (7, 'grape')], | |
type_info=type_info1) | |
type_info2 = Types.ROW([Types.LONG(), Types.LONG()]) | |
ds2 = env.from_collection( | |
[(1, 1), (2, 3), (3, 5), (1, 5), (5, 100), (6, 66), (1, 10)], | |
type_info=type_info2) | |
output_type_info = Types.ROW([Types.LONG(), Types.STRING(), Types.LONG()]) | |
serialization_schema = JsonRowSerializationSchema.Builder() \ | |
.with_type_info(output_type_info) \ | |
.build() | |
kafka_producer = FlinkKafkaProducer( | |
topic='TempResults', | |
serialization_schema=serialization_schema, | |
producer_config={'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'} | |
) | |
connected_ds = ds1.connect(ds2) | |
connected_ds.key_by(lambda a: a[0], lambda a: a[0]).flat_map(SellTotal(), output_type_info).add_sink(kafka_producer) | |
env.execute() | |
if __name__ == '__main__': | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") | |
env = StreamExecutionEnvironment.get_execution_environment() | |
env.add_jars("file:////home/ec2-user/flink-1.15.2/opt/flink-sql-connector-kafka-1.15.2.jar") | |
print("connected demo gogo") | |
sell_total_demo(env) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment