Skip to content

Instantly share code, notes, and snippets.

@robstradling
Last active December 11, 2023 20:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robstradling/7046c09dbd173756ce9c6e642c2b2050 to your computer and use it in GitHub Desktop.
Save robstradling/7046c09dbd173756ce9c6e642c2b2050 to your computer and use it in GitHub Desktop.
go-ora: Regression since v2.7.23 when using DBMS_AQ.DEQUEUE_ARRAY
package main
import (
"database/sql"
"flag"
"fmt"
"os"
"time"
go_ora "github.com/sijms/go-ora/v2"
)
type test1 struct {
Name string `udt:"name"`
}
func createTypes(conn *sql.DB) error {
sqlText := `CREATE TYPE test1 AS OBJECT ( name varchar2(256) )`
_, err := conn.Exec(sqlText)
if err != nil {
return err
}
sqlText = `CREATE OR REPLACE TYPE test1collection AS TABLE OF test1`
_, err = conn.Exec(sqlText)
if err != nil {
return err
}
return nil
}
func dropTypes(conn *sql.DB) error {
sqlText := `DROP TYPE test1collection`
_, err := conn.Exec(sqlText)
if err != nil {
return err
}
sqlText = `DROP TYPE test1`
_, err = conn.Exec(sqlText)
if err != nil {
return err
}
return nil
}
func setupQueue(conn *sql.DB) error {
sqlText := `BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'test1table', queue_payload_type => 'test1' ); END;`
_, err := conn.Exec(sqlText)
if err != nil {
return err
}
sqlText = `BEGIN DBMS_AQADM.CREATE_QUEUE ( queue_name => 'test1queue', queue_table => 'test1table' ); END;`
_, err = conn.Exec(sqlText)
if err != nil {
return err
}
sqlText = `BEGIN DBMS_AQADM.START_QUEUE ( queue_name => 'test1queue', enqueue => TRUE ); END;`
_, err = conn.Exec(sqlText)
if err != nil {
return err
}
return nil
}
func stopQueue(conn *sql.DB) error {
sqlText := `BEGIN DBMS_AQADM.STOP_QUEUE(queue_name => 'test1queue'); END;`
_, err := conn.Exec(sqlText)
if err != nil {
return err
}
sqlText = `BEGIN DBMS_AQADM.DROP_QUEUE(queue_name => 'test1queue'); END;`
_, err = conn.Exec(sqlText)
if err != nil {
return err
}
sqlText = `BEGIN DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'test1table'); END;`
_, err = conn.Exec(sqlText)
if err != nil {
return err
}
return nil
}
func enqueue(conn *sql.DB, message test1) error {
sqlText := `
DECLARE
enqueueOptions DBMS_AQ.enqueue_options_t;
messageProperties DBMS_AQ.message_properties_t;
msgID_raw RAW(100);
BEGIN
DBMS_AQ.ENQUEUE(
queue_name => 'test1queue',
enqueue_options => enqueueOptions,
message_properties => messageProperties,
payload => :1,
msgid => msgID_raw
);
END;`
_, err := conn.Exec(sqlText, message)
if err != nil {
return err
}
return nil
}
func dequeueArray(conn *sql.DB, arraySize int, waitTime time.Duration) ([]test1, error) {
sqlText := fmt.Sprintf(`
DECLARE
dequeueOptions DBMS_AQ.dequeue_options_t;
messagePropertiesArray DBMS_AQ.message_properties_array_t;
msgIDArray DBMS_AQ.msgid_array_t;
BEGIN
dequeueOptions.WAIT := %d;
:1 := DBMS_AQ.DEQUEUE_ARRAY(
queue_name => 'test1queue',
dequeue_options => dequeueOptions,
array_size => %d,
message_properties_array => messagePropertiesArray,
payload_array => :2,
msgid_array => msgIDArray
);
END;`, (waitTime / time.Second), arraySize)
var nMessages sql.NullInt64
var messages []test1
_, err := conn.Exec(sqlText, sql.Out{Dest: &nMessages}, sql.Out{Dest: &messages})
if err != nil {
return nil, err
}
return messages, nil
}
func usage() {
fmt.Println()
fmt.Println("dequeue_array")
fmt.Println()
fmt.Println("Usage:")
fmt.Println(` dequeue_array -server server_url`)
flag.PrintDefaults()
fmt.Println()
fmt.Println("Example:")
fmt.Println(` dequeue_array -server "oracle://user:pass@server/service_name"`)
fmt.Println()
}
func main() {
var (
server string
)
flag.StringVar(&server, "server", "", "Server's URL, oracle://user:pass@server/service_name")
flag.Parse()
connStr := os.ExpandEnv(server)
if connStr == "" {
fmt.Println("Missing -server option")
usage()
os.Exit(1)
}
fmt.Println("Connected")
conn, err := sql.Open("oracle", connStr)
if err != nil {
fmt.Println("Can't create connection: ", err)
return
}
defer func() {
err = conn.Close()
if err != nil {
fmt.Println("Can't close connection: ", err)
}
}()
err = conn.Ping()
if err != nil {
fmt.Println("Can't ping connection: ", err)
return
}
err = createTypes(conn)
if err != nil {
fmt.Println("Can't create types: ", err)
return
}
defer func() {
err := dropTypes(conn)
if err != nil {
fmt.Println("Can't drop types: ", err)
}
}()
if err := go_ora.RegisterType(conn, "test1", "test1collection", test1{}); err != nil {
fmt.Println("Failed to register type", err)
return
}
err = setupQueue(conn)
if err != nil {
fmt.Println("Can't setup queue: ", err)
return
}
defer func() {
err := stopQueue(conn)
if err != nil {
fmt.Println("Can't stop queue: ", err)
}
}()
err = enqueue(conn, test1{Name: "Test 1"})
if err != nil {
fmt.Println("Can't enqueue message: ", err)
return
}
messages, err := dequeueArray(conn, 2, 5*time.Second)
if err != nil {
fmt.Println("Failed to dequeue array", err)
return
}
fmt.Printf("Dequeued %d messages:\n", len(messages))
for _, msg := range messages {
fmt.Printf(" Message: %+v\n", msg)
}
}
module gist.github.com/robstradling/7046c09dbd173756ce9c6e642c2b2050
go 1.20
require github.com/sijms/go-ora/v2 v2.7.23
Connected
Dequeued 1 messages:
Message: {Name:Test 1}
Connected
Failed to dequeue array ORA-06502: PL/SQL: numeric or value error
ORA-06512: at line 8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment