Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active December 8, 2022 18:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/6f4493042331f051f1651b8cd3da1e53 to your computer and use it in GitHub Desktop.
Save ottomata/6f4493042331f051f1651b8cd3da1e53 to your computer and use it in GitHub Desktop.
flink-kubernetes-operator python example
FROM docker-registry.wikimedia.org/flink:1.16.0-37
# add python script
USER root
RUN mkdir -p /srv/flink_app && ls
ADD python_demo.py /srv/flink_app/python_demo.py
USER flink
# To deploy this:
# kubectl apply -f ./python-example.yaml
# To undeploy:
# kubectl delete flinkdeployment python-example
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink-app0
name: python-example
spec:
image: pyflink_demo:wmf_flink39
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
# jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.0.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/srv/flink_app/python_demo.py"]
parallelism: 1
upgradeMode: stateless
import logging
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
def python_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)""")
t_env.execute_sql("""
CREATE TABLE print_table (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'print'
)""")
t_env.execute_sql("""
INSERT INTO print_table SELECT * FROM orders""")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
python_demo()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment