Skip to content

Instantly share code, notes, and snippets.

@pwmcintyre
Created January 24, 2023 02:24
Show Gist options
  • Save pwmcintyre/30ecfc5abcd2abb7abddf41bc2c8b1e9 to your computer and use it in GitHub Desktop.
Save pwmcintyre/30ecfc5abcd2abb7abddf41bc2c8b1e9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env node
/*
This will migrate all tables of a database to a new S3 Bucket
Steps:
- lists all tables of a database, performs the following for each:
- list all objects at the table's storage location (S3 path)
- copies all objects found to new bucket
- updates table config with new S3 path
example:
AWS_PROFILE=admin AWS_REGION=us-west-2 \
DATABASE_NAME=example_database \
TARGET_BUCKET_NAME=example \
node ./glue/MoveDatabase
*/
// validate params
[
'DATABASE_NAME',
'TARGET_BUCKET_NAME',
].forEach( (name) => {
if ( !( name in process.env ) ) {
console.error(`required env var not found: ${name}`)
process.exit(1)
}
});
// deps
const url = require('url')
const AWS = require('aws-sdk')
// aws clients
const glue = new AWS.Glue()
const s3 = new AWS.S3()
// migrate function
async function migrate( DatabaseName, TargetBucket ) {
const context = {
DatabaseName,
TargetBucket,
}
// list tables for database
const tables = await glue.getTables({ DatabaseName }).promise()
.catch( error => {
error.message = `failed to get tables: ${error.message}`
throw error
})
// migrate each
tables.TableList.map( async (table) => {
const TableName = table.Name
// migrate only if location includes the old shared location
if ( table.StorageDescriptor.Location.includes(TargetBucket) ) {
console.info('already migrated', { ...context, TableName, location: table.StorageDescriptor.Location })
return
}
// parse location
const OldLocation = url.parse( table.StorageDescriptor.Location )
// list objects
const list = await s3.listObjectsV2({
Bucket: OldLocation.host,
Prefix: OldLocation.path.replace(/^[\/]+|[\/]+$/g, "") + '/', // trim leading/trailing slashes, add trailing slash
// Delimiter: '/',
}).promise()
.catch( error => {
error.message = `failed to list objects: ${error.message}`
throw error
})
if ( list.Contents.length <= 0 ) {
console.warn('no data found', { ...context, TableName })
}
// copy all objects
let progress = 0
const moveResult = list.Contents.map( async (object) => {
// copy object
await s3.copyObject({
Bucket: TargetBucket,
CopySource: `/${ OldLocation.host }/${ object.Key }`,
Key: object.Key,
}).promise()
.then( () => {
progress++
console.debug('copied', { TableName, key: object.Key, progress, total: list.Contents.length })
})
.catch( error => {
error.message = `failed to copy object: ${error.message}`
throw error
})
})
await Promise.all(moveResult)
.catch( error => {
error.message = `failed to copy objects: ${error.message}`
throw error
})
console.info('data copy success', { ...context, TableName })
// get table
const config = await glue.getTable({ DatabaseName, Name: TableName }).promise()
.catch( error => {
error.message = `failed to get table config: ${error.message}`
throw error
})
console.info('table config before', { ...context, TableName, config: JSON.stringify(config.Table) })
// update table with new location
const NewLocation = new url.URL( OldLocation.href ) // using the URL constructor allows us to modify 'host' property
NewLocation.host = TargetBucket
config.Table.StorageDescriptor.Location = NewLocation.href
// cleanup extra properties (read-only values which you can't use in "updateTable" API)
delete config.Table.DatabaseName
delete config.Table.CreateTime
delete config.Table.UpdateTime
delete config.Table.CreatedBy
delete config.Table.IsRegisteredWithLakeFormation
// do update
await glue.updateTable({
DatabaseName,
TableInput: config.Table,
}).promise()
.catch( error => {
error.message = `failed to update table config: ${error.message}`
throw error
})
console.info('table config updated', { ...context, DatabaseName, TableName, OldLocation: OldLocation.href, NewLocation: NewLocation.href })
// for humans to compare
const configAfter = await glue.getTable({ DatabaseName, Name: TableName }).promise()
.catch( error => {
error.message = `failed to get table config: ${error.message}`
throw error
})
console.info('table config after', { ...context, TableName, config: JSON.stringify(configAfter.Table) })
console.info("done")
})
}
// begin
migrate( process.env.DATABASE_NAME, process.env.TARGET_BUCKET_NAME )
.catch( error => console.error("failed to migrate", {error}) )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment