本ハンズオンでは、以下のサンプルを補完する形で、GlueにおいてRDBMS同士でデータを受け渡す例を紹介します。
Joining, Filtering, and Loading Relational Data with AWS Glue - aws-glue-samples - awslabs
RDS for PostgreSQL をマネジメントコンソールより作成します。本記事では手順を省略します。以下も参考にしてください。パラメータグループにおいて、log_statementsをallにしておくと、どのようなSQLが発行されたかを確認することができます。
PostgreSQL DB インスタンスを作成して PostgreSQL DB インスタンスのデータベースに接続する
このRDSをソース及びターゲットとします。サンプルスキーマ(dvdrental)を用意します。 PostgreSQL Sample Database
$ unzip dvdrental.zip
$ pg_restore -U <ユーザー名> -h <RDSエンドポイント(postgres-test.XXXX.ap-northeast-1.rds.amazonaws.com)> -d <データベース名> ../dvdrental.tar
GlueのConnectionを作成します。Glue Consoleより Add Connection を押し、RDS for PostgreSQL を選択して、RDS向けのコネクションを作ります。ソースとターゲットで共用します。
GlueからRDSを使用するには、RDSが属するセキュリティグループを自己参照可能にする必要があります。
Setting Up a VPC to Connect to JDBC Data Stores
もしまだなければ、当該セキュリティグループに対して All TCP を自分のセキュリティグループから許可するインバウンドルールを追加してください。
GlueがS3にアクセスできるよう、RDSが属するルートテーブルに対してS3 Endpointを追加して下さい。
次に、AWSGlueServiceRole及びAmazonS3ReadOnlyAccessをアタッチしたロールを作成します。
Glue Consoleから Development Pointを作成します。名前は sample などで構いません。作成したコネクションを使用してサブネットを特定します。途中、SSH public key を求められますが、プライベートキーではなく、パブリックキーを登録するよう注意して下さい。
以下のサンプルを実施し、S3からのソースをジョインし、RDSに格納するまでを実施します。最後の格納では手順1のRDSを指定します。
Joining, Filtering, and Loading Relational Data with AWS Glue - aws-glue-samples - awslabs
RDSに対するCrawlerを作成します。Include Pathを次のようにしてpublicスキーマのテーブルをすべてカタログ化します。
Include Path: <DB名>/public/%
DevPointのプロパティから接続情報を確認し、DevPointに接続してスクリプトを実行します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
city の DynamicFrame の例
city = glueContext.create_dynamic_frame_from_catalog(database='test01', table_name='<DB名>_public_city')
XSDB6の例外が発生する場合は次の回避策を実行します。toDF() isn't working on the shell
spark.stop()
glueContext = GlueContext(SparkContext.getOrCreate())
DynamicFrameを作成した時点ではデータにはアクセスされておらず、実際にDynamicFrameがデータを必要とした時点でアクセスされます。例えば
print city.count()
を実行すると以下のSQLが発行されます。
SELECT "city_id","city","country_id","last_update" FROM "public".city
必要な変更を行ったら、RDSへ書き込みます。以下では city01 という名前のテーブルに書き込んでいます。
glueContext.write_dynamic_frame.from_jdbc_conf(frame = city, catalog_connection = "test01", connection_options = {"dbtable": "city01", "database": "<DB名>"})
この書き込みは単純に insert で実施され、既存データのチェックは実施されません。
DevPoint、及び RDS を削除します。