Skip to content

Instantly share code, notes, and snippets.

@kempei
Last active April 9, 2018 01:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kempei/88ccbda6e4a8d358199d29c0a3ea9d3f to your computer and use it in GitHub Desktop.
Save kempei/88ccbda6e4a8d358199d29c0a3ea9d3f to your computer and use it in GitHub Desktop.
AWS Data Pipeline によるデータベースの差分コピー ref: https://qiita.com/kempe/items/49ff0232a88c6fffff97
{
"access-id": "AKIAIXXXXXXXXXXXXXXX",
"private-key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
create or replace function ins_tab(v_start_num integer, v_count integer) returns void as $$
declare
maxid integer;
v_id integer;
begin
for v_id in v_start_num .. v_start_num+v_count-1 loop
insert into t_source (id, value, last_modified)
values (v_id, 'INSERT', current_timestamp)
on conflict on constraint t_source_pkey do
update set value = 'UPDATE'
, last_modified = current_timestamp;
end loop;
end;
$$ LANGUAGE plpgsql;
create table t_source (
id numeric(50,0) primary key,
value text,
last_modified timestamp
);
create index idx_t_source_lm on t_source (last_modified);
create table t_dest (
id numeric(50,0) primary key,
value text,
last_modified timestamp
);
create index idx_t_dest_lm on t_dest (last_modified);
#!/bin/sh
NAME=rds-to-rds-by-wg
UQID=rdstordswg
PIPELINE_DEF=file://rdstords.json
PIPELINE_PARAMETER_DEF=file://rdstords-parameters.json
PIPELINE_PARAMETER_VALUE=file://rdstords-values.json
echo "INFO: creating..."
COMMAND="aws datapipeline create-pipeline --name ${NAME} --unique-id ${UQID}"
echo ${COMMAND}
PIPELINE_ID=`$COMMAND | jq ".pipelineId" | sed "s/\"//g"`
echo "INFO: created: ${PIPELINE_ID}"
echo "INFO: deploying..."
COMMAND="aws datapipeline put-pipeline-definition --pipeline-id ${PIPELINE_ID} --pipeline-definition ${PIPELINE_DEF} --parameter-objects ${PIPELINE_PARAMETER_DEF} --parameter-values-uri ${PIPELINE_PARAMETER_VALUE}"
echo ${COMMAND}
PUT_RESULT=`${COMMAND}`
ERRORED=`echo ${PUT_RESULT} | jq ".errored" | sed "s/\"//g"`
if [ "${ERRORED}" = "true" ]; then
echo ${PUT_RESULT} | python -m json.tool
exit 1
elif [ "${ERRORED}" != "false" ]; then
echo ${PUT_RESULT}
exit 1
fi
echo "INFO: activating..."
COMMAND="aws datapipeline activate-pipeline --pipeline-id ${PIPELINE_ID}"
echo ${COMMAND}
${COMMAND}
if [ $? -ne 0 ]; then exit 1; fi
echo "INFO: completed."
$ psql -U ***** -h postgres-test.****************.ap-northeast-1.rds.amazonaws.com -d postgres_test
psql (9.6.6)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
Type "help" for help.
postgres_test=>
$ ./dpdeploy.sh
INFO: creating...
aws datapipeline create-pipeline --name rds-to-rds-by-wg --unique-id rdstordswg
INFO: created: df-05080758GGYTM3CCZ0R
INFO: deploying...
aws datapipeline put-pipeline-definition --pipeline-id df-05080758GGYTM3CCZ0R --pipeline-definition file://rdstords.json --parameter-objects file://rdstords-parameters.json --parameter-values-uri file://rdstords-values.json
INFO: activating...
aws datapipeline activate-pipeline --pipeline-id df-05080758GGYTM3CCZ0R
INFO: completed.
postgres_test=> select ins_tab(1, 10000);
ins_tab
---------
(1 行)
postgres_test=> select count(*) from t_source;
count
-------
10000
(1 行)
postgres_test=> select count(*) from t_dest;
count
-------
0
(1 行)
$ aws datapipeline activate-pipeline --pipeline-id df-05080758GGYTM3CCZ0R
$
$ aws datapipeline list-runs --pipeline-id df-05080758GGYTM3CCZ0R
Name Scheduled Start Status
ID Started Ended
---------------------------------------------------------------------------------------------------
(省略)
13. DestinationRDSTable 2018-03-15T09:43:27 FINISHED
@DestinationRDSTable_2018-03-15T09:43:27 2018-03-15T09:43:29 2018-03-15T09:43:35
14. RDSToRDSCopyActivity 2018-03-15T09:43:27 FINISHED
@RDSToRDSCopyActivity_2018-03-15T09:43:27 2018-03-15T09:43:29 2018-03-15T09:43:34
15. SourceRDSTable 2018-03-15T09:43:27 FINISHED
@SourceRDSTable_2018-03-15T09:43:27 2018-03-15T09:43:29 2018-03-15T09:43:30
postgres_test=> select count(*) from t_dest;
count
-------
10000
(1 行)
postgres_test=> select ins_tab(1, 100000);
ins_tab
---------
(1 行)
postgres_test=> select count(*) from t_source;
count
--------
100000
(1 行)
postgres_test=> select count(*) from t_dest;
count
-------
10000
(1 行)
$ aws datapipeline activate-pipeline --pipeline-id df-05080758GGYTM3CCZ0R
$
$ aws datapipeline list-runs --pipeline-id df-05080758GGYTM3CCZ0R
Name Scheduled Start Status
ID Started Ended
---------------------------------------------------------------------------------------------------
(省略)
16. DestinationRDSTable 2018-03-15T09:51:47 WAITING_ON_DEPENDENCIES
@DestinationRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49
17. RDSToRDSCopyActivity 2018-03-15T09:51:47 RUNNING
@RDSToRDSCopyActivity_2018-03-15T09:51:47 2018-03-15T09:51:49
18. SourceRDSTable 2018-03-15T09:51:47 FINISHED
@SourceRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:51:49
$ aws datapipeline list-runs --pipeline-id df-05080758GGYTM3CCZ0R
Name Scheduled Start Status
ID Started Ended
---------------------------------------------------------------------------------------------------
(省略)
16. DestinationRDSTable 2018-03-15T09:51:47 FINISHED
@DestinationRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:52:05
17. RDSToRDSCopyActivity 2018-03-15T09:51:47 FINISHED
@RDSToRDSCopyActivity_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:52:04
18. SourceRDSTable 2018-03-15T09:51:47 FINISHED
@SourceRDSTable_2018-03-15T09:51:47 2018-03-15T09:51:49 2018-03-15T09:51:49
postgres_test=> select count(*) from t_dest;
count
--------
100000
(1 行)
19. DestinationRDSTable 2018-03-15T09:57:27 FINISHED
@DestinationRDSTable_2018-03-15T09:57:27 2018-03-15T09:57:29 2018-03-15T09:59:45
20. RDSToRDSCopyActivity 2018-03-15T09:57:27 FINISHED
@RDSToRDSCopyActivity_2018-03-15T09:57:27 2018-03-15T09:57:29 2018-03-15T09:59:44
21. SourceRDSTable 2018-03-15T09:57:27 FINISHED
@SourceRDSTable_2018-03-15T09:57:27 2018-03-15T09:57:30 2018-03-15T09:57:30
postgres_test=> \i cretab.sql
CREATE TABLE
CREATE INDEX
CREATE TABLE
CREATE INDEX
postgres_test=> \i crefunc.sql
CREATE FUNCTION
postgres_test=> select ins_tab(1, 5);
ins_tab
---------
(1 行)
postgres_test=> select count(*) from t_source;
count
-------
5
(1 行)
postgres_test=> select ins_tab(3, 5);
ins_tab
---------
(1 行)
postgres_test=> select * from t_source;
id | value | last_modified
----+--------+----------------------------
1 | INSERT | 2018-03-15 05:32:32.686245
2 | INSERT | 2018-03-15 05:32:32.686245
3 | UPDATE | 2018-03-15 05:32:55.760427
4 | UPDATE | 2018-03-15 05:32:55.760427
5 | UPDATE | 2018-03-15 05:32:55.760427
6 | INSERT | 2018-03-15 05:32:55.760427
7 | INSERT | 2018-03-15 05:32:55.760427
$ java -version
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)
$ curl https://s3.amazonaws.com/datapipeline-us-east-1/us-east-1/software/latest/TaskRunner/TaskRunner-1.0.jar -O
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 44.3M 100 44.3M 0 0 2498k 0 0:00:18 0:00:18 --:--:-- 3448k
$ ./runTaskRunner.sh
log4j:WARN No appenders could be found for logger (amazonaws.datapipeline.objects.PluginModule).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Starting log pusher...
Log Pusher Started. Region: ap-northeast-1, LogUri: s3://xxxxx-logs/datapipeline-error-logs
Build info: commit=unknown, timestamp=2017-11-01 03:35:38 UTC
Initializing drivers...
Starting task runner...
{
"parameters": [
{
"id": "myRegion",
"type": "String",
"description": "Region"
},
{
"id": "myRDSInstanceName",
"type": "String",
"description": "RDS Instance Name"
},
{
"id": "myRDSUsername",
"type": "String",
"description": "RDS PostgreSQL username"
},
{
"id": "*myRDSPassword",
"type": "String",
"description": "RDS PostgreSQL password"
},
{
"id": "myErrorLogS3Bucket",
"type": "String",
"description": "Data Pipeline error log destination"
},
{
"id": "myWorkGroup",
"type": "String",
"description": "Work Group"
},
{
"id": "mySourceTable",
"type": "String",
"description": "Source Table"
},
{
"id": "mySourceSelectQuery",
"type": "String",
"description": "Query for selecting source table"
},
{
"id": "myDestTable",
"type": "String",
"description": "Destination Table"
},
{
"id": "myDestInsertQuery",
"type": "String",
"description": "Query for inserting destination table"
}
]
}
{
"values": {
"myRegion": "ap-northeast-1",
"myRDSInstanceName": "<Your RDS Instance Name>",
"myRDSUsername": "<Your RDS User Name>",
"*myRDSPassword": "<Your RDS Password>",
"myErrorLogS3Bucket": "s3://<Your S3 Bucket and Path>",
"myWorkGroup": "rdstordswg",
"mySourceTable": "t_source",
"myDestTable": "t_dest",
"mySourceSelectQuery": "select id, value, last_modified from t_source where last_modified >= current_timestamp - interval '1 hours'",
"myDestInsertQuery": "insert into t_dest (id, value, last_modified) values (cast(? as integer), ?, cast(? as timestamp)) on conflict on constraint t_dest_pkey do update set value = excluded.value, last_modified = cast(excluded.last_modified as timestamp)"
}
}
{
"objects": [
{
"id": "Default",
"name": "Default",
"pipelineLogUri": "#{myErrorLogS3Bucket}",
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"scheduleType": "ondemand"
},
{
"id": "rds_postgres",
"name": "rds_postgres",
"type": "RdsDatabase",
"region": "#{myRegion}",
"rdsInstanceId": "#{myRDSInstanceName}",
"username": "#{myRDSUsername}",
"*password": "#{*myRDSPassword}"
},
{
"id": "SourceRDSTable",
"name": "SourceRDSTable",
"type": "SqlDataNode",
"database": {
"ref": "rds_postgres"
},
"table": "#{mySourceTable}",
"selectQuery": "#{mySourceSelectQuery}"
},
{
"id": "DestinationRDSTable",
"name": "DestinationRDSTable",
"type": "SqlDataNode",
"database": {
"ref": "rds_postgres"
},
"table": "#{myDestTable}",
"insertQuery": "#{myDestInsertQuery}"
},
{
"id": "RDSToRDSCopyActivity",
"name": "RDSToRDSCopyActivity",
"type": "CopyActivity",
"output": {
"ref": "DestinationRDSTable"
},
"input": {
"ref": "SourceRDSTable"
},
"workerGroup": "#{myWorkGroup}"
}
]
}
#!/bin/sh
WORKERGROUP=rdstordswg
LOGURI=s3://<your bucket>/datapipeline-error-logs
REGION=ap-northeast-1
java -jar TaskRunner-1.0.jar --config credentials.json --workerGroup=${WORKERGROUP} --region=${REGION} --logUri=${LOGURI}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment