Skip to content

Instantly share code, notes, and snippets.

@lestrrat
Created November 1, 2013 03:37
Show Gist options
  • Save lestrrat/7260649 to your computer and use it in GitHub Desktop.
Save lestrrat/7260649 to your computer and use it in GitHub Desktop.
Practicing go. Enqueues all objects in STF storage (https://github.com/stf/stf-storage) to RepairObject queue, w/o disrupting services
package main
import (
"database/sql"
"fmt"
"log"
"math"
"math/rand"
"time"
"runtime"
_ "github.com/go-sql-driver/mysql"
)
const MAX_CONCURRENCY int = 5
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
db := conenctDB("root:@tcp(127.0.0.1:3306)/stf")
defer db.Close()
var min_object_id int64 = 0
var max_object_id int64 = 0
stmt, err := db.Prepare("SELECT id FROM object WHERE id > ? LIMIT 1 OFFSET 9999")
if err != nil {
log.Fatal(fmt.Sprintf("Error preparing statement: %s", err))
}
defer stmt.Close()
// Find the current state of the queue_repair_object
queue_db := connectDB("root:@tcp(127.0.0.1:3306)/stf_queue")
defer queue_db.Close()
initial_size := getQueueSize(queue_db)
allowed_delta := int(float64(initial_size) * 0.05)
// Only allow this many goroutines to be executed at once
s := make(chan int, MAX_CONCURRENCY)
for {
rows, err := stmt.Query(min_object_id)
if err != nil {
log.Fatal(fmt.Sprintf("Error executing prepared statement: %s", err))
}
count := 0
for rows.Next() {
err := rows.Scan(&max_object_id)
if err != nil {
log.Fatal(err)
}
count++
}
if count == 0 {
break
}
// Only allow
s <- 1
// Process objects in a separate thread
go processObjects(s, min_object_id, max_object_id)
min_object_id = max_object_id
// Wait till our queue has been depleted
for {
size := getQueueSize(queue_db)
if size < initial_size || initial_size - size < allowed_delta {
break
}
log.Printf("Waiting for our queue to deplete... (size = %d, initial = %d, delta = %d)", size, initial_size, allowed_delta)
time.Sleep( time.Duration(rand.Int63n(60)) * time.Second )
}
}
}
func getQueueSize(db *sql.DB) int {
rows, err := db.Query("SELECT COUNT(*) FROM queue_repair_object")
if err != nil {
log.Fatal(err)
}
defer rows.Close()
count := 0
for rows.Next() {
err := rows.Scan(&count)
if err != nil {
log.Fatal(err)
}
}
return count
}
func connectDB(dsn string) *sql.DB {
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal(fmt.Sprintf("Error opening %s: %s", dsn, err))
}
return db
}
func processObjects(s chan int, min_object_id int64, max_object_id int64) {
log.Printf("Processing %d -> %d\n", min_object_id, max_object_id)
// Release semaphore
defer func () { <- s }()
queue_db := connectDB("root:@tcp(127.0.0.1:3306)/stf_queue")
defer queue_db.Close()
db := connectDB("root:@tcp(127.0.0.1:3306)/stf")
defer db.Close()
stmt, err := db.Prepare("SELECT id FROM object WHERE id > ? AND id <= ? LIMIT 1000")
if err != nil {
log.Fatal(err)
}
queue_stmt, err := queue_db.Prepare("INSERT INTO queue_repair_object (args) VALUES (?)")
if err != nil {
log.Fatal(err)
}
for {
rows, err := stmt.Query(min_object_id, max_object_id)
if err != nil {
log.Fatal(err)
}
count := 0
for rows.Next() {
err := rows.Scan(&min_object_id)
if err != nil {
log.Fatal(err)
}
_, err = queue_stmt.Exec(min_object_id)
if err != nil {
log.Fatal(err)
}
}
if count == 0 {
break
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment