Skip to content

Instantly share code, notes, and snippets.

@kempei
Last active March 2, 2018 00:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kempei/94a16036fe7cb911ad6f289428d6ba21 to your computer and use it in GitHub Desktop.
Save kempei/94a16036fe7cb911ad6f289428d6ba21 to your computer and use it in GitHub Desktop.

Joining, Filtering with AWS Glue, and RDS

本ハンズオンでは、以下のサンプルを補完する形で、GlueにおいてRDBMS同士でデータを受け渡す例を紹介します。

Joining, Filtering, and Loading Relational Data with AWS Glue - aws-glue-samples - awslabs

1. Prepare RDS

RDS for PostgreSQL をマネジメントコンソールより作成します。本記事では手順を省略します。以下も参考にしてください。パラメータグループにおいて、log_statementsをallにしておくと、どのようなSQLが発行されたかを確認することができます。

PostgreSQL DB インスタンスを作成して PostgreSQL DB インスタンスのデータベースに接続する

このRDSをソース及びターゲットとします。サンプルスキーマ(dvdrental)を用意します。 PostgreSQL Sample Database

$ unzip dvdrental.zip
$ pg_restore -U <ユーザー名> -h <RDSエンドポイント(postgres-test.XXXX.ap-northeast-1.rds.amazonaws.com)> -d <データベース名> ../dvdrental.tar

2. Make connections

GlueのConnectionを作成します。Glue Consoleより Add Connection を押し、RDS for PostgreSQL を選択して、RDS向けのコネクションを作ります。ソースとターゲットで共用します。

3. Make DevPoint (複数名実施の場合は1名のみが実施)

GlueからRDSを使用するには、RDSが属するセキュリティグループを自己参照可能にする必要があります。

Setting Up a VPC to Connect to JDBC Data Stores

もしまだなければ、当該セキュリティグループに対して All TCP を自分のセキュリティグループから許可するインバウンドルールを追加してください。

GlueがS3にアクセスできるよう、RDSが属するルートテーブルに対してS3 Endpointを追加して下さい。

次に、AWSGlueServiceRole及びAmazonS3ReadOnlyAccessをアタッチしたロールを作成します。

Glue Consoleから Development Pointを作成します。名前は sample などで構いません。作成したコネクションを使用してサブネットを特定します。途中、SSH public key を求められますが、プライベートキーではなく、パブリックキーを登録するよう注意して下さい。

4. Joining, Filtering with AWS Glue

以下のサンプルを実施し、S3からのソースをジョインし、RDSに格納するまでを実施します。最後の格納では手順1のRDSを指定します。

Joining, Filtering, and Loading Relational Data with AWS Glue - aws-glue-samples - awslabs

5. RDS Source

RDSに対するCrawlerを作成します。Include Pathを次のようにしてpublicスキーマのテーブルをすべてカタログ化します。

Include Path: <DB名>/public/%

DevPointのプロパティから接続情報を確認し、DevPointに接続してスクリプトを実行します。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

city の DynamicFrame の例

city = glueContext.create_dynamic_frame_from_catalog(database='test01', table_name='<DB名>_public_city')

XSDB6の例外が発生する場合は次の回避策を実行します。toDF() isn't working on the shell

spark.stop()
glueContext = GlueContext(SparkContext.getOrCreate())

DynamicFrameを作成した時点ではデータにはアクセスされておらず、実際にDynamicFrameがデータを必要とした時点でアクセスされます。例えば

print city.count()

を実行すると以下のSQLが発行されます。

SELECT "city_id","city","country_id","last_update" FROM "public".city

必要な変更を行ったら、RDSへ書き込みます。以下では city01 という名前のテーブルに書き込んでいます。

glueContext.write_dynamic_frame.from_jdbc_conf(frame = city, catalog_connection = "test01", connection_options = {"dbtable": "city01", "database": "<DB名>"})

この書き込みは単純に insert で実施され、既存データのチェックは実施されません。

X. Delete RDS Schema

DevPoint、及び RDS を削除します。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment