Skip to content

Instantly share code, notes, and snippets.

@sachinsu
Last active February 13, 2020 08:26
Show Gist options
  • Save sachinsu/bc1742a2a0cd060239473331b65e6f65 to your computer and use it in GitHub Desktop.
Save sachinsu/bc1742a2a0cd060239473331b65e6f65 to your computer and use it in GitHub Desktop.
Using Go Routines to evaluate Oracle Advance Queuing Support using godror library
package main
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
godror "github.com/godror/godror"
"golang.org/x/sync/errgroup"
)
func main() {
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
// Open database connection
db, err := sql.Open("godror", "dummy/dummy@orclcdb")
if err != nil {
fmt.Println(err)
return
}
defer db.Close()
var user string
if err = db.QueryRowContext(ctx, "SELECT USER FROM DUAL").Scan(&user); err != nil {
fmt.Println("Error running user query")
fmt.Println(err)
return
}
// setup Queue & Queue table
const qName = "TEST_Q"
const qTblName = qName + "_TBL"
qry := `DECLARE
tbl CONSTANT VARCHAR2(61) := '` + user + "." + qTblName + `';
q CONSTANT VARCHAR2(61) := '` + user + "." + qName + `';
BEGIN
BEGIN SYS.DBMS_AQADM.stop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END;
BEGIN SYS.DBMS_AQADM.drop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END;
BEGIN SYS.DBMS_AQADM.drop_queue_table(tbl); EXCEPTION WHEN OTHERS THEN NULL; END;
SYS.DBMS_AQADM.CREATE_QUEUE_TABLE(tbl, 'RAW');
SYS.DBMS_AQADM.CREATE_QUEUE(q, tbl);
SYS.DBMS_AQADM.grant_queue_privilege('ENQUEUE', q, '` + user + `');
SYS.DBMS_AQADM.grant_queue_privilege('DEQUEUE', q, '` + user + `');
SYS.DBMS_AQADM.start_queue(q);
END;`
if _, err = db.ExecContext(ctx, qry); err != nil {
if !strings.Contains(err.Error(), "PLS-00201: 'SYS.DBMS_AQADM'") {
fmt.Println(err)
return
}
}
defer func() {
// Drop Queue
db.ExecContext(
context.Background(),
`DECLARE
tbl CONSTANT VARCHAR2(61) := USER||'.'||:1;
q CONSTANT VARCHAR2(61) := USER||'.'||:2;
BEGIN
BEGIN SYS.DBMS_AQADM.stop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END;
BEGIN SYS.DBMS_AQADM.drop_queue(q); EXCEPTION WHEN OTHERS THEN NULL; END;
BEGIN SYS.DBMS_AQADM.drop_queue_table(tbl); EXCEPTION WHEN OTHERS THEN NULL;
END;`,
qTblName, qName,
)
}()
// Create Error Group
g, gctx := errgroup.WithContext(ctx)
q, _ := godror.NewQueue(gctx, db, qName, "")
defer q.Close()
// Goroutine for Producer
g.Go(func() error {
ticker := time.NewTicker(50 * time.Millisecond)
counter := 1
for {
select {
case <-ticker.C:
tx, _ := db.Begin()
fmt.Printf("%d\n", counter)
message := godror.Message{Raw: []byte(fmt.Sprintf("Message %d", counter))}
counter += 1
q.Enqueue([]godror.Message{message})
tx.Commit()
case <-gctx.Done():
fmt.Printf("closing writer goroutine\n")
return gctx.Err()
}
}
})
// Goroutine for Consumer
g.Go(func() error {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
tx, _ := db.Begin()
messages := make([]godror.Message, 30)
n, _ := q.Dequeue(messages)
tx.Commit()
for i := 0; i < n; i++ {
fmt.Printf("Received message: %s\n", string(messages[i].Raw))
}
case <-gctx.Done():
fmt.Printf("closing reader goroutine\n")
return gctx.Err()
}
}
})
if err := g.Wait(); err == nil {
fmt.Println("finished clean")
} else {
fmt.Printf("received error: %v", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment