Skip to content

Instantly share code, notes, and snippets.

@AshwinAmbal
Created June 24, 2024 22:21
Show Gist options
  • Save AshwinAmbal/7d336821e8c83875a52a4bb8f2f40f26 to your computer and use it in GitHub Desktop.
Save AshwinAmbal/7d336821e8c83875a52a4bb8f2f40f26 to your computer and use it in GitHub Desktop.
package xk6_byom
import (
"context"
"fmt"
"log"
"math/rand"
"time"
byomConfig "xk6-byom/config"
msg "xk6-byom/message"
fileutils "xk6-byom/utils"
"go.k6.io/k6/js/modules"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/xds"
)
//go:generate protoc --go_out=message --proto_path=./message --go-grpc_out=message ./message/model_config.proto
//go:generate protoc --go_out=message --proto_path=./message --go-grpc_out=message ./message/grpc_service.proto
func init() {
modules.Register("k6/x/byom", new(BYOM))
}
type BYOM struct {
conn *grpc.ClientConn
client msg.GRPCInferenceServiceClient
requests []*msg.ModelInferRequest
config byomConfig.Configuration
}
func setupConnAndClient(hostname string) (*grpc.ClientConn, msg.GRPCInferenceServiceClient, error) {
if hostname == "" {
return nil, nil, fmt.Errorf("hostname is nil")
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
opts := []grpc.DialOption{
grpc.WithInitialWindowSize(1 << 20),
grpc.WithInitialConnWindowSize(1 << 20),
grpc.WithReadBufferSize(256 * 1024),
grpc.WithWriteBufferSize(256 * 1024),
grpc.FailOnNonTempDialError(true),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.DialContext(ctx, hostname, opts...)
if err != nil {
return nil, nil, fmt.Errorf("failed to start gRPC connection: %v", err)
}
client := msg.NewGRPCInferenceServiceClient(conn)
return conn, client, nil
}
func (byom *BYOM) InitializeBYOM(configPath string) error {
config, err := byomConfig.LoadConfiguration(configPath)
if err != nil {
return err
}
byom.config = config
byom.conn, byom.client, err = setupConnAndClient(config.Hostname)
if err != nil {
return err
}
return nil
}
func setupRequestsForInference(path string, pattern string, region string, maxS3FileDownload int64) ([]*msg.ModelInferRequest, error) {
requests, err := fileutils.ReadBYOMRequestFromTextFiles(path, pattern, region, maxS3FileDownload)
return requests, err
}
func (byom *BYOM) SetupInferRequests() (err error) {
config := byom.config
byom.requests, err = setupRequestsForInference(config.ProtoDirPath, config.Pattern, config.Region, config.MaxS3FileDownload)
if err != nil {
return err
}
return nil
}
func (byom *BYOM) SendAndReceiveRequest() int {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
var err error
rIndex := rand.Intn(len(byom.requests))
_, err = byom.client.ModelInfer(ctx, byom.requests[rIndex])
if err != nil {
log.Print(err)
return 1
}
return 0
}
func (byom *BYOM) CloseConnection() {
if byom.conn != nil {
byom.conn.Close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment