Skip to content

Instantly share code, notes, and snippets.

@giuliohome
Last active April 20, 2024 22:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save giuliohome/7ab1e57bd510b72f14faf0d95488a8de to your computer and use it in GitHub Desktop.
Save giuliohome/7ab1e57bd510b72f14faf0d95488a8de to your computer and use it in GitHub Desktop.
Connections count while concurrent goroutines run inside a DB conn pool
open System.Threading.Tasks
open Npgsql.FSharp
open Npgsql
let connectionString : string =
Sql.host "myhome"
|> Sql.database "test_db"
|> Sql.username "test_user"
|> Sql.password "test123"
|> Sql.port 5432
|> Sql.formatConnectionString
// construct the connection
let singleton = new NpgsqlConnection(connectionString)
singleton.Open()
let checkConnectionPool (singleton: NpgsqlConnection) : Task<int list> =
singleton
|> Sql.existingConnection
|> Sql.query "select count(*) as conn_num from pg_stat_activity where usename='test_user';"
|> Sql.executeAsync (fun read ->
read.int "conn_num")
type Distributor = { Id: int; Name: string; }
let getDistributors (singleton: NpgsqlConnection) (myid: int): Async<Distributor list> = async {
let! res =
singleton
|> Sql.existingConnection
|> Sql.query "SELECT * FROM distributors WHERE did = @id"
|> Sql.parameters [ "@id", Sql.int myid ]
|> Sql.executeAsync (fun read ->
{
Id = read.int "did"
Name = read.text "name"
})
|> Async.AwaitTask
let! connNum = checkConnectionPool singleton |> Async.AwaitTask
connNum
|> List.head
|> printfn "connections: %d"
return res
}
async {
printfn "start"
let tasks =
[|1..30|]
|> Array.map(getDistributors singleton)
printfn "tasks ready"
let tasks =
tasks
|> Async.Sequential
printfn "tasks sequentiated"
let! tasks = tasks
tasks
|> Array.iter(
List.iter (fun d ->
printfn "name : %s id %d" d.Name d.Id)
)
printfn "end"
}
|> Async.RunSynchronously
package main
import (
"database/sql"
"fmt"
"log"
"sync"
"time"
_ "github.com/lib/pq"
)
const (
numGoroutines = 100
connectionURL = "postgres://test_user:test123@127.0.0.1:5432/test_db?sslmode=disable"
)
func main() {
db, err := sql.Open("postgres", connectionURL)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Set maximum open connections and idle connections
db.SetMaxOpenConns(10) // Maximum open connections
db.SetMaxIdleConns(5) // Maximum idle connections
// Wait group to wait for all goroutines to finish
var wg sync.WaitGroup
// Channel to signal completion of ticker goroutine
tickerDone := make(chan struct{})
// Channel to signal completion of goroutines
done := make(chan struct{})
// Start ticker to periodically fetch connection count
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
go func() {
defer close(tickerDone)
for {
select {
case <-ticker.C:
var connectionCount int
err := db.QueryRow("SELECT COUNT(*) FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&connectionCount)
if err != nil {
log.Printf("Error fetching connection count: %s\n", err)
return
}
fmt.Printf("Connection count: %d\n", connectionCount)
case <-done:
return
}
}
}()
// Fetch initial connection count
var initialConnectionCount int
err = db.QueryRow("SELECT COUNT(*) FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&initialConnectionCount)
if err != nil {
log.Printf("Error fetching initial connection count: %s\n", err)
return
}
fmt.Printf("Initial connection count: %d\n", initialConnectionCount)
// Start goroutines to execute transactions
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
err := executeTransaction(db, id)
if err != nil {
log.Printf("Error executing transaction in goroutine %d: %s\n", id, err)
}
}(i)
}
// Wait for all goroutines to finish
wg.Wait()
fmt.Println("All goroutines finished execution")
// Signal ticker to stop
close(done)
fmt.Println("Ticker stopped")
// Wait for ticker goroutine to finish
<-tickerDone
// Fetch final connection count
var finalConnectionCount int
err = db.QueryRow("SELECT COUNT(*) FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&finalConnectionCount)
if err != nil {
log.Printf("Error fetching final connection count: %s\n", err)
return
}
fmt.Printf("Final connection count: %d\n", finalConnectionCount)
}
func executeTransaction(db *sql.DB, id int) error {
// Start transaction
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Sleep to simulate work
time.Sleep(time.Second)
fmt.Printf("Goroutine %d: Transaction started\n", id)
// Query active connections
var connectionCount int
err = tx.QueryRow("SELECT COUNT(*) AS connection_count FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&connectionCount)
if err != nil {
return err
}
fmt.Printf("Goroutine %d: Connection count: %d\n", id, connectionCount)
// Commit transaction
err = tx.Commit()
if err != nil {
return err
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment