Skip to content

Instantly share code, notes, and snippets.

@compnski
Last active April 14, 2023 03:20
Show Gist options
  • Star 16 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save compnski/a89a5e53eb308671bd6e to your computer and use it in GitHub Desktop.
Save compnski/a89a5e53eb308671bd6e to your computer and use it in GitHub Desktop.
Builds a SQL statement to hash a table in any of postgres/mysql/redshift, with the same result across the 3.
package main
import (
"flag"
"fmt"
"strings"
)
const DB_REDSHIFT = "redshift"
const DB_POSTGRES = "postgres"
const DB_MYSQL = "mysql"
type Table struct {
Name string
Schema string
Columns []Column
}
type Column struct {
Name string
Type string
NotNull bool
Encoding string
}
var (
tableName string
schemaName string
idCol string
lowerBound string
upperBound string
colList string
dbType string
tablePath string
debug bool
)
func init() {
flag.StringVar(&tableName, "table", "", "Table name.")
flag.StringVar(&schemaName, "schema", "", "Schema name.")
flag.StringVar(&colList, "cols", "", "Comma separated list of column names/type pairs, E.g. (name 1, type1, name2, type2, ...).")
flag.StringVar(&idCol, "id", "id", "Id column")
flag.StringVar(&lowerBound, "lower", "0", "Lower value of id column, exclusive, to hash.")
flag.StringVar(&upperBound, "upper", "1000000", "Upper value of id column, inclusive, to hash.")
flag.StringVar(&dbType, "dbType", DB_POSTGRES, "Type of database, only postgres / redshift are supported")
flag.BoolVar(&debug, "debug", false, "debug")
}
func main() {
flag.Parse()
if colList == "" {
colList = strings.Join(flag.Args(), " ")
}
if tableName == "" || colList == "" {
flag.Usage()
return
}
table := makeTable(tableName, schemaName, strings.Split(colList, ",")...)
fmt.Println("")
if schemaName != "" {
tablePath = fmt.Sprintf(`%s.%s`, schemaName, tableName)
} else {
tablePath = tableName
}
fmt.Println(hashAllCols(table, idCol, lowerBound, upperBound))
}
func makeTable(name, schema string, colPairs ...string) *Table {
cols := []Column{}
for i := 0; i < len(colPairs); i += 2 {
cols = append(cols, Column{Name: strings.TrimSpace(colPairs[i]), Type: strings.TrimSpace(colPairs[i+1])})
}
return &Table{Name: name, Schema: schema, Columns: cols}
}
func hashAllCols(table *Table, idCol, lowerBound, upperBound string) string {
var (
colSqls = make([]string, len(table.Columns))
rowSql string
)
switch dbType {
case DB_MYSQL:
for idx, col := range table.Columns {
colSqls[idx] = colAsString_mysql(&col)
}
rowSql = fmt.Sprintf("concat(%s)", strings.Join(colSqls, ", "))
if debug {
rowSql = fmt.Sprintf("concat_ws(',',%s)", strings.Join(colSqls, ","))
}
//colAsString = colAsString_mysql
default:
for idx, col := range table.Columns {
colSqls[idx] = colAsString_pg(&col)
}
rowSql = strings.Join(colSqls, " || ")
if debug {
rowSql = strings.Join(colSqls, " || ',' || ")
}
}
if debug {
return fmt.Sprintf(`select md5(%s), %s from %s where %s > '%s' and %s <= '%s' order by %s desc`,
strings.Replace(rowSql, "md5", "", -1), strings.Replace(rowSql, "md5", "", -1), tablePath, idCol, lowerBound, idCol, upperBound, idCol)
}
innerQuery := fmt.Sprintf(`select md5(%s) as hash from %s where %s > '%s' and %s <= '%s'`,
rowSql, tablePath, idCol, lowerBound, idCol, upperBound)
outerQuery := fmt.Sprintf("select %s from (%s) a;", getSumOfHash("hash"), innerQuery)
return outerQuery
}
func colAsString_pg(col *Column) string {
colSql := fmt.Sprintf(`"%s"`, col.Name)
if col.Type == "date" {
colSql = fmt.Sprintf("(%s - '0001-01-01'::date)", colSql)
}
if strings.Contains(col.Type, "timestamp") {
colSql = fmt.Sprintf("floor(extract(epoch from %s))", colSql)
}
if col.Type == "boolean" {
colSql = colSql + "::integer"
}
if strings.Contains(col.Type, "varchar") {
colSql = fmt.Sprintf("md5(%s)", colSql)
} else {
colSql = fmt.Sprintf("md5(%s::text)", colSql)
}
if !col.NotNull {
colSql = fmt.Sprintf("coalesce(%s, ' ')", colSql)
}
return colSql
}
func colAsString_mysql(col *Column) string {
colSql := fmt.Sprintf("%s", col.Name)
if col.Type == "date" {
colSql = fmt.Sprintf("(to_days(%s) - 366)", colSql) //366 to represent diff from Day 1, Year 1, not Year 0 which never existed.
}
if strings.Contains(col.Type, "timestamp") || col.Type == "datetime" {
colSql = fmt.Sprintf("floor(unix_timestamp(%s - interval 7 hour))", colSql)
}
colSql = fmt.Sprintf("md5(%s)", colSql)
if !col.NotNull {
colSql = fmt.Sprintf("coalesce(%s, ' ')", colSql)
}
return colSql
}
func getSumOfHash(col string) string {
var queryPart string
switch dbType {
case DB_POSTGRES:
queryPart = `sum(('x'||substring(%s,%d,8))::bit(32)::bigint)`
case DB_REDSHIFT:
queryPart = `sum(trunc(strtol(substring(%s,%d,8),16)))`
case DB_MYSQL:
queryPart = `sum(cast(conv(substring(%s,%d,8), 16, 10) as unsigned))`
}
queryParts := []string{fmt.Sprintf(queryPart, col, 1), fmt.Sprintf(queryPart, col, 9), fmt.Sprintf(queryPart, col, 17), fmt.Sprintf(queryPart, col, 25)}
return strings.Join(queryParts, ", ")
}
@aadant
Copy link

aadant commented Feb 14, 2019

queryPart = sum(trunc(strtol(substring(%s,%d,8),16))) is probably wrong

Example

select cast(trunc(cast(9009946224037101 as bigint)) as bigint);
trunc

9009946224037100
(1 row)

I would remove the trunc in Redshift

@sabitor
Copy link

sabitor commented Apr 14, 2022

Can't it be theoretical that when you sum the MD5 chunks you get identical results for two different tables?
Of course this might never happen or in very rare cases, but theoretically it could.
What is your opinion on that?

@compnski
Copy link
Author

A collision is certainly possible, but exceedingly unlikely, I'm not sure it'd ever come up.

This was my attempt at some rsync-like syncing. We never ended up using much of this matching. In the end, we mostly relied on lastModifiedAt / lastUpdatedAt timestamp to know when to pull more rows.

There also might be other (faster) hash functions implemented by now -- Redshift was quite lacking at the time, but has made good progress.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment