Skip to content

Instantly share code, notes, and snippets.

@kshvakov
Created May 5, 2016 08:34
Show Gist options
  • Save kshvakov/5c7f3c405fc41d122841b7ebff200e2b to your computer and use it in GitHub Desktop.
Save kshvakov/5c7f3c405fc41d122841b7ebff200e2b to your computer and use it in GitHub Desktop.
$ time ./try_advisory_lock
Waiting
[5] Done, processed 1274 rows.
[7] Done, processed 1236 rows.
[2] Done, processed 1252 rows.
[4] Done, processed 1237 rows.
[8] Done, processed 1257 rows.
[6] Done, processed 1257 rows.
[3] Done, processed 1244 rows.
[1] Done, processed 1243 rows.
Done!
real 0m1.228s
user 0m1.104s
sys 0m1.024s
$ time ./skip_locked
Waiting
[5] Done, processed 1233 rows.
[7] Done, processed 1273 rows.
[8] Done, processed 1260 rows.
[6] Done, processed 1249 rows.
[1] Done, processed 1228 rows.
[4] Done, processed 1306 rows.
[2] Done, processed 1220 rows.
[3] Done, processed 1231 rows.
Done!
real 0m1.146s
user 0m1.008s
sys 0m1.072s
package main
import (
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"crypto/md5"
"database/sql"
"fmt"
"log"
"sync"
"time"
)
type Item struct {
Job string `db:"job"`
Priority int16 `db:"priority"`
AddedOn time.Time `db:"added_on"`
}
const query = `
select *
from queue
order by priority desc, added_on asc limit 1 for update skip locked
`
func main() {
connect := sqlx.MustConnect("postgres", "host=127.0.0.1 user=kshvakov dbname=postgres")
var wg sync.WaitGroup
wg.Add(8)
for i := 1; i <= 8; i++ {
go func(num int) {
var processed int
for {
var item Item
tx, _ := connect.Beginx()
if err := tx.Get(&item, query); err != nil {
if err != sql.ErrNoRows {
log.Fatal(err)
}
break
}
processItem(&item)
tx.Exec("delete from queue where priority = $1 and added_on = $2", item.Priority, item.AddedOn)
tx.Commit()
processed++
}
fmt.Printf("[%d] Done, processed %d rows.\n", num, processed)
wg.Done()
}(i)
}
fmt.Println("Waiting")
wg.Wait()
fmt.Println("Done!")
}
func processItem(item *Item) string {
return fmt.Sprintf("%x", md5.Sum([]byte(item.Job)))
}
package main
import (
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"crypto/md5"
"database/sql"
"fmt"
"log"
"sync"
"time"
)
type Item struct {
Job string `db:"job"`
Priority int16 `db:"priority"`
AddedOn time.Time `db:"added_on"`
}
const query = `
select *
from queue
where pg_try_advisory_xact_lock(123, hashtext( priority::text || added_on::text ) )
order by priority desc, added_on asc limit 1 for update
`
func main() {
connect := sqlx.MustConnect("postgres", "host=127.0.0.1 user=kshvakov dbname=postgres")
var wg sync.WaitGroup
wg.Add(8)
for i := 1; i <= 8; i++ {
go func(num int) {
var processed int
for {
var item Item
tx, _ := connect.Beginx()
if err := tx.Get(&item, query); err != nil {
if err != sql.ErrNoRows {
log.Fatal(err)
}
break
}
processItem(&item)
tx.Exec("delete from queue where priority = $1 and added_on = $2", item.Priority, item.AddedOn)
tx.Commit()
processed++
}
fmt.Printf("[%d] Done, processed %d rows.\n", num, processed)
wg.Done()
}(i)
}
fmt.Println("Waiting")
wg.Wait()
fmt.Println("Done!")
}
func processItem(item *Item) string {
return fmt.Sprintf("%x", md5.Sum([]byte(item.Job)))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment