Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vmihailenco/5f553d12173a5a9e64d8a3ba19c9b43d to your computer and use it in GitHub Desktop.
Save vmihailenco/5f553d12173a5a9e64d8a3ba19c9b43d to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"github.com/go-pg/pg"
"github.com/go-pg/pg/orm"
"github.com/go-pg/sharding"
)
// Parrots are sharded by AccountId, i.e. parrots with same account id are
// placed on the same shard.
type Parrot struct {
tableName string `sql:"?shard.parrots"`
Id int64
AccountId int64
Name string
Tele *Tele
Emails []string
}
type Tele struct {
tableName string `sql:"?shard.teles"`
Id int64
AccountId int64
Number int
ParrotId int64
}
func (u Parrot) String() string {
return u.Name
}
// CreateParrot picks shard by account id and creates parrot in the shard.
func CreateParrot(cluster *sharding.Cluster, parrot *Parrot) error {
return cluster.Shard(parrot.AccountId).Insert(parrot)
}
// CreateParrot picks shard by account id and creates parrot in the shard.
func CreateTele(cluster *sharding.Cluster, tele *Tele) error {
return cluster.Shard(tele.AccountId).Insert(tele)
}
// GetParrot splits shard from parrot id and fetches parrot from the shard.
func GetParrot(cluster *sharding.Cluster, id int64) (*Parrot, error) {
var parrot Parrot
err := cluster.SplitShard(id).Model(&parrot).
Column("parrot.name", "Tele").
Where("parrot.id = ?", id).
Select()
return &parrot, err
}
// GetParrots picks shard by account id and fetches parrots from the shard.
func GetParrots(cluster *sharding.Cluster, accountId int64) ([]Parrot, error) {
var parrots []Parrot
err := cluster.Shard(accountId).Model(&parrots).Where("account_id = ?", accountId).Select()
return parrots, err
}
type tmpParrot struct {
Number int
Name string
}
// select p.name, t.number from shard0.parrots AS p, shard0.teles as t where t.parrot_id = p.id
func GetShardedParrots(db *pg.DB) ([]tmpParrot, error) {
var parrots []tmpParrot
_, err := db.Query(&parrots, `select p.name, t.number from shard0.parrots AS p, shard0.teles as t where t.parrot_id = p.id`)
return parrots, err
}
// createShard creates database schema for a given shard.
func createShard(shard *pg.DB) error {
queries := []string{
`DROP SCHEMA IF EXISTS ?shard CASCADE`,
`CREATE SCHEMA ?shard`,
sqlFuncs,
// `CREATE TABLE ?shard.parrots (id bigint DEFAULT ?shard.next_id(), account_id int, name text, emails jsonb)`,
// `CREATE TABLE ?shard.teles (id bigint DEFAULT ?shard.next_id(), parrot_id bigint, account_id int, number int)`,
}
for _, q := range queries {
_, err := shard.Exec(q)
if err != nil {
return err
}
}
for _, model := range []interface{}{&Parrot{}, &Tele{}} {
err := shard.CreateTable(model, &orm.CreateTableOptions{
IfNotExists: true,
})
if err != nil {
fmt.Printf("\n CREATE TABLES ERROR > %q for %T", err, model)
}
}
return nil
}
func ExampleCluster() {
db := pg.Connect(&pg.Options{
User: "postgres",
Database: "test",
})
dbs := []*pg.DB{db} // list of physical PostgreSQL servers
nshards := 2 // 2 logical shards
// Create cluster with 1 physical server and 2 logical shards.
cluster := sharding.NewCluster(dbs, nshards)
// Create database schema for our logical shards.
for i := 0; i < nshards; i++ {
if err := createShard(cluster.Shard(int64(i))); err != nil {
fmt.Println(err)
}
}
// parrot1 will be created in shard1 because AccountId % nshards = shard1.
parrot1 := &Parrot{
Name: "parrot1",
AccountId: 1,
Emails: []string{"parrot1@domain"},
}
err := CreateParrot(cluster, parrot1)
if err != nil {
fmt.Println(err)
}
// parrot2 will be created in shard1 too because AccountId is the same.
parrot2 := &Parrot{
Name: "parrot2",
AccountId: 1,
Emails: []string{"parrot2@domain"},
}
err = CreateParrot(cluster, parrot2)
if err != nil {
fmt.Println(err)
}
// parrot3 will be created in shard0 because AccountId % nshards = shard0.
parrot3 := &Parrot{
Name: "parrot3",
AccountId: 2,
Emails: []string{"parrot3@domain"},
}
err = CreateParrot(cluster, parrot3)
if err != nil {
fmt.Println(err)
}
tele3 := &Tele{AccountId: 2, ParrotId: parrot3.Id, Number: 98776655}
err = CreateTele(cluster, tele3)
if err != nil {
fmt.Println(err)
}
parrot, err := GetParrot(cluster, parrot3.Id)
if err != nil {
fmt.Println(err)
}
parrots, err := GetParrots(cluster, 2)
if err != nil {
fmt.Println(err)
}
fmt.Println(parrot)
fmt.Println(parrots[0].Tele)
// Output: parrot1
// parrot1 parrot2
parrots3, err := GetShardedParrots(db)
if err != nil {
fmt.Println(err)
}
fmt.Println(parrots3[0])
}
const sqlFuncs = `
CREATE SEQUENCE ?shard.id_seq;
-- _next_id returns unique sortable id.
CREATE FUNCTION ?shard._next_id(tm timestamptz, shard_id int, seq_id bigint)
RETURNS bigint AS $$
DECLARE
max_shard_id CONSTANT bigint := 2048;
max_seq_id CONSTANT bigint := 4096;
id bigint;
BEGIN
shard_id := shard_id % max_shard_id;
seq_id := seq_id % max_seq_id;
id := (floor(extract(epoch FROM tm) * 1000)::bigint - ?epoch) << 23;
id := id | (shard_id << 12);
id := id | seq_id;
RETURN id;
END;
$$
LANGUAGE plpgsql
IMMUTABLE;
CREATE FUNCTION ?shard.next_id()
RETURNS bigint AS $$
BEGIN
RETURN ?shard._next_id(clock_timestamp(), ?shard_id, nextval('?shard.id_seq'));
END;
$$
LANGUAGE plpgsql;
`
func main() {
// Postgres needs permission
ExampleCluster()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment