Created
November 1, 2013 03:37
-
-
Save lestrrat/7260649 to your computer and use it in GitHub Desktop.
Practicing go. Enqueues all objects in STF storage (https://github.com/stf/stf-storage) to RepairObject queue, w/o disrupting services
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"fmt" | |
"log" | |
"math" | |
"math/rand" | |
"time" | |
"runtime" | |
_ "github.com/go-sql-driver/mysql" | |
) | |
const MAX_CONCURRENCY int = 5 | |
func main() { | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
db := conenctDB("root:@tcp(127.0.0.1:3306)/stf") | |
defer db.Close() | |
var min_object_id int64 = 0 | |
var max_object_id int64 = 0 | |
stmt, err := db.Prepare("SELECT id FROM object WHERE id > ? LIMIT 1 OFFSET 9999") | |
if err != nil { | |
log.Fatal(fmt.Sprintf("Error preparing statement: %s", err)) | |
} | |
defer stmt.Close() | |
// Find the current state of the queue_repair_object | |
queue_db := connectDB("root:@tcp(127.0.0.1:3306)/stf_queue") | |
defer queue_db.Close() | |
initial_size := getQueueSize(queue_db) | |
allowed_delta := int(float64(initial_size) * 0.05) | |
// Only allow this many goroutines to be executed at once | |
s := make(chan int, MAX_CONCURRENCY) | |
for { | |
rows, err := stmt.Query(min_object_id) | |
if err != nil { | |
log.Fatal(fmt.Sprintf("Error executing prepared statement: %s", err)) | |
} | |
count := 0 | |
for rows.Next() { | |
err := rows.Scan(&max_object_id) | |
if err != nil { | |
log.Fatal(err) | |
} | |
count++ | |
} | |
if count == 0 { | |
break | |
} | |
// Only allow | |
s <- 1 | |
// Process objects in a separate thread | |
go processObjects(s, min_object_id, max_object_id) | |
min_object_id = max_object_id | |
// Wait till our queue has been depleted | |
for { | |
size := getQueueSize(queue_db) | |
if size < initial_size || initial_size - size < allowed_delta { | |
break | |
} | |
log.Printf("Waiting for our queue to deplete... (size = %d, initial = %d, delta = %d)", size, initial_size, allowed_delta) | |
time.Sleep( time.Duration(rand.Int63n(60)) * time.Second ) | |
} | |
} | |
} | |
func getQueueSize(db *sql.DB) int { | |
rows, err := db.Query("SELECT COUNT(*) FROM queue_repair_object") | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer rows.Close() | |
count := 0 | |
for rows.Next() { | |
err := rows.Scan(&count) | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
return count | |
} | |
func connectDB(dsn string) *sql.DB { | |
db, err := sql.Open("mysql", dsn) | |
if err != nil { | |
log.Fatal(fmt.Sprintf("Error opening %s: %s", dsn, err)) | |
} | |
return db | |
} | |
func processObjects(s chan int, min_object_id int64, max_object_id int64) { | |
log.Printf("Processing %d -> %d\n", min_object_id, max_object_id) | |
// Release semaphore | |
defer func () { <- s }() | |
queue_db := connectDB("root:@tcp(127.0.0.1:3306)/stf_queue") | |
defer queue_db.Close() | |
db := connectDB("root:@tcp(127.0.0.1:3306)/stf") | |
defer db.Close() | |
stmt, err := db.Prepare("SELECT id FROM object WHERE id > ? AND id <= ? LIMIT 1000") | |
if err != nil { | |
log.Fatal(err) | |
} | |
queue_stmt, err := queue_db.Prepare("INSERT INTO queue_repair_object (args) VALUES (?)") | |
if err != nil { | |
log.Fatal(err) | |
} | |
for { | |
rows, err := stmt.Query(min_object_id, max_object_id) | |
if err != nil { | |
log.Fatal(err) | |
} | |
count := 0 | |
for rows.Next() { | |
err := rows.Scan(&min_object_id) | |
if err != nil { | |
log.Fatal(err) | |
} | |
_, err = queue_stmt.Exec(min_object_id) | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
if count == 0 { | |
break | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment