Last active
April 9, 2018 01:47
-
-
Save kempei/88ccbda6e4a8d358199d29c0a3ea9d3f to your computer and use it in GitHub Desktop.
AWS Data Pipeline によるデータベースの差分コピー ref: https://qiita.com/kempe/items/49ff0232a88c6fffff97
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"access-id": "AKIAIXXXXXXXXXXXXXXX", | |
"private-key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create or replace function ins_tab(v_start_num integer, v_count integer) returns void as $$ | |
declare | |
maxid integer; | |
v_id integer; | |
begin | |
for v_id in v_start_num .. v_start_num+v_count-1 loop | |
insert into t_source (id, value, last_modified) | |
values (v_id, 'INSERT', current_timestamp) | |
on conflict on constraint t_source_pkey do | |
update set value = 'UPDATE' | |
, last_modified = current_timestamp; | |
end loop; | |
end; | |
$$ LANGUAGE plpgsql; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create table t_source ( | |
id numeric(50,0) primary key, | |
value text, | |
last_modified timestamp | |
); | |
create index idx_t_source_lm on t_source (last_modified); | |
create table t_dest ( | |
id numeric(50,0) primary key, | |
value text, | |
last_modified timestamp | |
); | |
create index idx_t_dest_lm on t_dest (last_modified); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
NAME=rds-to-rds-by-wg | |
UQID=rdstordswg | |
PIPELINE_DEF=file://rdstords.json | |
PIPELINE_PARAMETER_DEF=file://rdstords-parameters.json | |
PIPELINE_PARAMETER_VALUE=file://rdstords-values.json | |
echo "INFO: creating..." | |
COMMAND="aws datapipeline create-pipeline --name ${NAME} --unique-id ${UQID}" | |
echo ${COMMAND} | |
PIPELINE_ID=`$COMMAND | jq ".pipelineId" | sed "s/\"//g"` | |
echo "INFO: created: ${PIPELINE_ID}" | |
echo "INFO: deploying..." | |
COMMAND="aws datapipeline put-pipeline-definition --pipeline-id ${PIPELINE_ID} --pipeline-definition ${PIPELINE_DEF} --parameter-objects ${PIPELINE_PARAMETER_DEF} --parameter-values-uri ${PIPELINE_PARAMETER_VALUE}" | |
echo ${COMMAND} | |
PUT_RESULT=`${COMMAND}` | |
ERRORED=`echo ${PUT_RESULT} | jq ".errored" | sed "s/\"//g"` | |
if [ "${ERRORED}" = "true" ]; then | |
echo ${PUT_RESULT} | python -m json.tool | |
exit 1 | |
elif [ "${ERRORED}" != "false" ]; then | |
echo ${PUT_RESULT} | |
exit 1 | |
fi | |
echo "INFO: activating..." | |
COMMAND="aws datapipeline activate-pipeline --pipeline-id ${PIPELINE_ID}" | |
echo ${COMMAND} | |
${COMMAND} | |
if [ $? -ne 0 ]; then exit 1; fi | |
echo "INFO: completed." |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ psql -U ***** -h postgres-test.****************.ap-northeast-1.rds.amazonaws.com -d postgres_test | |
psql (9.6.6) | |
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off) | |
Type "help" for help. | |
postgres_test=> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ./dpdeploy.sh | |
INFO: creating... | |
aws datapipeline create-pipeline --name rds-to-rds-by-wg --unique-id rdstordswg | |
INFO: created: df-05080758GGYTM3CCZ0R | |
INFO: deploying... | |
aws datapipeline put-pipeline-definition --pipeline-id df-05080758GGYTM3CCZ0R --pipeline-definition file://rdstords.json --parameter-objects file://rdstords-parameters.json --parameter-values-uri file://rdstords-values.json | |
INFO: activating... | |
aws datapipeline activate-pipeline --pipeline-id df-05080758GGYTM3CCZ0R | |
INFO: completed. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
postgres_test=> select ins_tab(1, 10000); | |
ins_tab | |
--------- | |
(1 行) | |
postgres_test=> select count(*) from t_source; | |
count | |
------- | |
10000 | |
(1 行) | |
postgres_test=> select count(*) from t_dest; | |
count | |
------- | |
0 | |
(1 行) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ aws datapipeline activate-pipeline --pipeline-id df-05080758GGYTM3CCZ0R | |
$ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ aws datapipeline list-runs --pipeline-id df-05080758GGYTM3CCZ0R | |
Name Scheduled Start Status | |
ID Started Ended | |
--------------------------------------------------------------------------------------------------- | |
(省略) | |
13. DestinationRDSTable 2018-03-15T09:43:27 FINISHED | |
@DestinationRDSTable_2018-03-15T09:43:27 2018-03-15T09:43:29 2018-03-15T09:43:35 | |
14. RDSToRDSCopyActivity 2018-03-15T09:43:27 FINISHED | |
@RDSToRDSCopyActivity_2018-03-15T09:43:27 2018-03-15T09:43:29 2018-03-15T09:43:34 | |
15. SourceRDSTable 2018-03-15T09:43:27 FINISHED | |
@SourceRDSTable_2018-03-15T09:43:27 2018-03-15T09:43:29 2018-03-15T09:43:30 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
postgres_test=> select count(*) from t_dest; | |
count | |
------- | |
10000 | |
(1 行) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
postgres_test=> select ins_tab(1, 100000); | |
ins_tab | |
--------- | |
(1 行) | |
postgres_test=> select count(*) from t_source; | |
count | |
-------- | |
100000 | |
(1 行) | |
postgres_test=> select count(*) from t_dest; | |
count | |
------- | |
10000 | |
(1 行) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ aws datapipeline activate-pipeline --pipeline-id df-05080758GGYTM3CCZ0R | |
$ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ aws datapipeline list-runs --pipeline-id df-05080758GGYTM3CCZ0R | |
Name Scheduled Start Status | |
ID Started Ended | |
--------------------------------------------------------------------------------------------------- | |
(省略) | |
16. DestinationRDSTable 2018-03-15T09:51:47 WAITING_ON_DEPENDENCIES | |
@DestinationRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49 | |
17. RDSToRDSCopyActivity 2018-03-15T09:51:47 RUNNING | |
@RDSToRDSCopyActivity_2018-03-15T09:51:47 2018-03-15T09:51:49 | |
18. SourceRDSTable 2018-03-15T09:51:47 FINISHED | |
@SourceRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:51:49 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ aws datapipeline list-runs --pipeline-id df-05080758GGYTM3CCZ0R | |
Name Scheduled Start Status | |
ID Started Ended | |
--------------------------------------------------------------------------------------------------- | |
(省略) | |
16. DestinationRDSTable 2018-03-15T09:51:47 FINISHED | |
@DestinationRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:52:05 | |
17. RDSToRDSCopyActivity 2018-03-15T09:51:47 FINISHED | |
@RDSToRDSCopyActivity_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:52:04 | |
18. SourceRDSTable 2018-03-15T09:51:47 FINISHED | |
@SourceRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:51:49 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
postgres_test=> select count(*) from t_dest; | |
count | |
-------- | |
100000 | |
(1 行) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19. DestinationRDSTable 2018-03-15T09:57:27 FINISHED | |
@DestinationRDSTable_2018-03-15T09:57:27 2018-03-15T09:57:29 2018-03-15T09:59:45 | |
20. RDSToRDSCopyActivity 2018-03-15T09:57:27 FINISHED | |
@RDSToRDSCopyActivity_2018-03-15T09:57:27 2018-03-15T09:57:29 2018-03-15T09:59:44 | |
21. SourceRDSTable 2018-03-15T09:57:27 FINISHED | |
@SourceRDSTable_2018-03-15T09:57:27 2018-03-15T09:57:30 2018-03-15T09:57:30 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
postgres_test=> \i cretab.sql | |
CREATE TABLE | |
CREATE INDEX | |
CREATE TABLE | |
CREATE INDEX | |
postgres_test=> \i crefunc.sql | |
CREATE FUNCTION |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
postgres_test=> select ins_tab(1, 5); | |
ins_tab | |
--------- | |
(1 行) | |
postgres_test=> select count(*) from t_source; | |
count | |
------- | |
5 | |
(1 行) | |
postgres_test=> select ins_tab(3, 5); | |
ins_tab | |
--------- | |
(1 行) | |
postgres_test=> select * from t_source; | |
id | value | last_modified | |
----+--------+---------------------------- | |
1 | INSERT | 2018-03-15 05:32:32.686245 | |
2 | INSERT | 2018-03-15 05:32:32.686245 | |
3 | UPDATE | 2018-03-15 05:32:55.760427 | |
4 | UPDATE | 2018-03-15 05:32:55.760427 | |
5 | UPDATE | 2018-03-15 05:32:55.760427 | |
6 | INSERT | 2018-03-15 05:32:55.760427 | |
7 | INSERT | 2018-03-15 05:32:55.760427 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ java -version | |
openjdk version "1.8.0_161" | |
OpenJDK Runtime Environment (build 1.8.0_161-b14) | |
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ curl https://s3.amazonaws.com/datapipeline-us-east-1/us-east-1/software/latest/TaskRunner/TaskRunner-1.0.jar -O | |
% Total % Received % Xferd Average Speed Time Time Time Current | |
Dload Upload Total Spent Left Speed | |
100 44.3M 100 44.3M 0 0 2498k 0 0:00:18 0:00:18 --:--:-- 3448k |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ./runTaskRunner.sh | |
log4j:WARN No appenders could be found for logger (amazonaws.datapipeline.objects.PluginModule). | |
log4j:WARN Please initialize the log4j system properly. | |
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. | |
Starting log pusher... | |
Log Pusher Started. Region: ap-northeast-1, LogUri: s3://xxxxx-logs/datapipeline-error-logs | |
Build info: commit=unknown, timestamp=2017-11-01 03:35:38 UTC | |
Initializing drivers... | |
Starting task runner... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"parameters": [ | |
{ | |
"id": "myRegion", | |
"type": "String", | |
"description": "Region" | |
}, | |
{ | |
"id": "myRDSInstanceName", | |
"type": "String", | |
"description": "RDS Instance Name" | |
}, | |
{ | |
"id": "myRDSUsername", | |
"type": "String", | |
"description": "RDS PostgreSQL username" | |
}, | |
{ | |
"id": "*myRDSPassword", | |
"type": "String", | |
"description": "RDS PostgreSQL password" | |
}, | |
{ | |
"id": "myErrorLogS3Bucket", | |
"type": "String", | |
"description": "Data Pipeline error log destination" | |
}, | |
{ | |
"id": "myWorkGroup", | |
"type": "String", | |
"description": "Work Group" | |
}, | |
{ | |
"id": "mySourceTable", | |
"type": "String", | |
"description": "Source Table" | |
}, | |
{ | |
"id": "mySourceSelectQuery", | |
"type": "String", | |
"description": "Query for selecting source table" | |
}, | |
{ | |
"id": "myDestTable", | |
"type": "String", | |
"description": "Destination Table" | |
}, | |
{ | |
"id": "myDestInsertQuery", | |
"type": "String", | |
"description": "Query for inserting destination table" | |
} | |
] | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"values": { | |
"myRegion": "ap-northeast-1", | |
"myRDSInstanceName": "<Your RDS Instance Name>", | |
"myRDSUsername": "<Your RDS User Name>", | |
"*myRDSPassword": "<Your RDS Password>", | |
"myErrorLogS3Bucket": "s3://<Your S3 Bucket and Path>", | |
"myWorkGroup": "rdstordswg", | |
"mySourceTable": "t_source", | |
"myDestTable": "t_dest", | |
"mySourceSelectQuery": "select id, value, last_modified from t_source where last_modified >= current_timestamp - interval '1 hours'", | |
"myDestInsertQuery": "insert into t_dest (id, value, last_modified) values (cast(? as integer), ?, cast(? as timestamp)) on conflict on constraint t_dest_pkey do update set value = excluded.value, last_modified = cast(excluded.last_modified as timestamp)" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"objects": [ | |
{ | |
"id": "Default", | |
"name": "Default", | |
"pipelineLogUri": "#{myErrorLogS3Bucket}", | |
"failureAndRerunMode": "CASCADE", | |
"resourceRole": "DataPipelineDefaultResourceRole", | |
"role": "DataPipelineDefaultRole", | |
"scheduleType": "ondemand" | |
}, | |
{ | |
"id": "rds_postgres", | |
"name": "rds_postgres", | |
"type": "RdsDatabase", | |
"region": "#{myRegion}", | |
"rdsInstanceId": "#{myRDSInstanceName}", | |
"username": "#{myRDSUsername}", | |
"*password": "#{*myRDSPassword}" | |
}, | |
{ | |
"id": "SourceRDSTable", | |
"name": "SourceRDSTable", | |
"type": "SqlDataNode", | |
"database": { | |
"ref": "rds_postgres" | |
}, | |
"table": "#{mySourceTable}", | |
"selectQuery": "#{mySourceSelectQuery}" | |
}, | |
{ | |
"id": "DestinationRDSTable", | |
"name": "DestinationRDSTable", | |
"type": "SqlDataNode", | |
"database": { | |
"ref": "rds_postgres" | |
}, | |
"table": "#{myDestTable}", | |
"insertQuery": "#{myDestInsertQuery}" | |
}, | |
{ | |
"id": "RDSToRDSCopyActivity", | |
"name": "RDSToRDSCopyActivity", | |
"type": "CopyActivity", | |
"output": { | |
"ref": "DestinationRDSTable" | |
}, | |
"input": { | |
"ref": "SourceRDSTable" | |
}, | |
"workerGroup": "#{myWorkGroup}" | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
WORKERGROUP=rdstordswg | |
LOGURI=s3://<your bucket>/datapipeline-error-logs | |
REGION=ap-northeast-1 | |
java -jar TaskRunner-1.0.jar --config credentials.json --workerGroup=${WORKERGROUP} --region=${REGION} --logUri=${LOGURI} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment