Skip to content

Instantly share code, notes, and snippets.

@ryandgoldenberg
Last active April 23, 2024 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryandgoldenberg/d02337fb9588afdd451fde24323eb5ea to your computer and use it in GitHub Desktop.
Save ryandgoldenberg/d02337fb9588afdd451fde24323eb5ea to your computer and use it in GitHub Desktop.
Flink Hive S3 Connection
From the jobmanager container, ran:
flink run --python /tmp/hive-dwh-3/scripts/run11.py
Exception:
org.apache.flink.connectors.hive.FlinkHiveException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.flink.connectors.hive.HiveParallelismInference.infer(HiveParallelismInference.java:98)
S3 connectivity works for filesystem connector, and plugin jar is located at
plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop-1.18.1.jar
Logs:
flink--client-3080ffd34e9b.log
flink--standalonesession-0-3080ffd34e9b.log
---
Previous issue: 'connector' = 'hive' cannot be discovered
Solution: HiveCatalog must be current catalog to discover 'connector' = 'hive'
From the jobmanager container, ran:
flink run --python /tmp/hive-dwh-3/scripts/run10.py
This failed with error message:
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='hive'
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
However, from the client logs it can be seen that the hive connector is in the classpath:
/opt/flink/lib/flink-sql-connector-hive-2.3.9_2.12-1.18.1.jar
And it looks like it has such a factory:
jar tvf /opt/flink/lib/flink-sql-connector-hive-2.3.9_2.12-1.18.1.jar | grep HiveDynamicTableFactory
6969 Tue Dec 19 22:35:10 UTC 2023 org/apache/flink/connectors/hive/HiveDynamicTableFactory.class
version: '2.1'
services:
jobmanager:
container_name: jobmanager
image: hive-dwh-3
command: jobmanager
ports:
- "8081:8081"
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
fs.s3a.aws.credentials.provider: com.amazonaws.auth.profile.ProfileCredentialsProvider
networks:
- rapid
volumes:
- type: bind
source: ~/.aws
target: /opt/flink/.aws
- type: bind
source: .
target: /tmp/hive-dwh-3
taskmanager:
container_name: taskmanager
image: hive-dwh-3
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
fs.s3a.aws.credentials.provider: com.amazonaws.auth.profile.ProfileCredentialsProvider
depends_on:
- jobmanager
networks:
- rapid
volumes:
- type: bind
source: ~/.aws
target: /opt/flink/.aws
- type: bind
source: .
target: /tmp/hive-dwh-3
client:
container_name: client
image: hive-dwh-3
command: sql-client.sh
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
fs.s3a.aws.credentials.provider: com.amazonaws.auth.profile.ProfileCredentialsProvider
depends_on:
- jobmanager
- taskmanager
networks:
- rapid
volumes:
- type: bind
source: ~/.aws
target: /opt/flink/.aws
- type: bind
source: .
target: /tmp/hive-dwh-3
networks:
rapid:
name: rapid
FROM flink:1.18.1
# JDK is required to install pyflink
RUN apt-get update -y && apt-get install -y openjdk-11-jdk
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64
# Flink 1.18.1 requires Python 3.8
# Install commands taken from here
# https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev liblzma-dev jq
RUN wget https://www.python.org/ftp/python/3.8.3/Python-3.8.3.tgz && \
tar -xvf Python-3.8.3.tgz && \
cd Python-3.8.3 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.8.3.tgz && rm -rf Python-3.8.3 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Install PyFlink
RUN python -m pip install --upgrade pip
RUN python -m pip install apache-flink==1.18.1
# Install Hadoop for Hive integration
# Set HADOOP_CLASSPATH=`hadoop classpath`
# https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/hive/overview/#dependencies
COPY build/hadoop /opt/hadoop
ENV PATH="${PATH}:/opt/hadoop/bin"
ENV HADOOP_CLASSPATH='/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*'
# S3 integration
# https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
RUN ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/
# Bugfix for SQL client bug in 1.18
# https://issues.apache.org/jira/browse/FLINK-33358
COPY sql-client.sh /opt/flink/bin/
# Connector JARs
COPY lib/flink-sql-connector-hive-2.3.9_2.12-1.18.1.jar /opt/flink/lib/
COPY lib/flink-sql-parquet-1.19.0.jar /opt/flink/lib/
# SQL client configuration
COPY conf /tmp/conf
RUN cat /tmp/conf/flink-conf.yaml >> /opt/flink/conf/flink-conf.yaml
import logging
import sys
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.table_result import TableResult
from pyflink.table.catalog import HiveCatalog
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
catalog_name = "dwh"
print("Creating dwh catalog")
hive_catalog = HiveCatalog(
catalog_name,
"default",
"/tmp/hive-dwh-3/hive/conf-dwh-prod"
)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)
print("Creating table iris_shadow")
t_env.execute_sql(f"""
CREATE TABLE default_catalog.default_database.iris_shadow LIKE rdg_test.iris_test
""")
print("Creating table iris_out")
t_env.execute_sql(f"""
CREATE TABLE default_catalog.default_database.iris_out WITH (
'connector' = 'filesystem',
'path' = 's3://<path>',
'format' = 'json'
) LIKE rdg_test.iris_test (EXCLUDING ALL)
""")
print("Switching back to default")
t_env.use_catalog("default_catalog")
print("Running insert")
t_env.execute_sql("INSERT INTO iris_out SELECT * FROM iris_shadow LIMIT 50")
import logging
import sys
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.table_result import TableResult
from pyflink.table.catalog import HiveCatalog
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
catalog_name = "dwh"
print("Creating dwh catalog")
hive_catalog = HiveCatalog(
catalog_name,
"default",
"/tmp/hive-dwh-3/hive/conf-dwh-prod"
)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)
print("Creating table iris_out")
t_env.execute_sql(f"""
CREATE TABLE default_catalog.default_database.iris_out WITH (
'connector' = 'filesystem',
'path' = 's3://aa.test.rdg/run11_out',
'format' = 'json'
) LIKE rdg_test.iris_test (EXCLUDING ALL)
""")
print("Running insert")
t_env.execute_sql("INSERT INTO default_catalog.default_database.iris_out SELECT * FROM rdg_test.iris_test LIMIT 50")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment