Skip to content

Instantly share code, notes, and snippets.

@henvic
Created October 15, 2021 08:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save henvic/3a4873734c4ffd1ff066f797280d0f41 to your computer and use it in GitHub Desktop.
Save henvic/3a4873734c4ffd1ff066f797280d0f41 to your computer and use it in GitHub Desktop.
search test infrastructure with Replica still using testing.TB (to be replaced)
// Package searchtest can be used to write integration tests with OpenSearch and search-replica.
//
// You can set the following environment variables to control logging:
// VERBOSE_OPENSEARCH=true to enable logging of requests and responses made with the OpenSearch client.
// VERBOSE_SEARCH_REPLICA=true to print verbose output from the search-replica process.
//
// Use If search-replica isn't on your $PATH or to run a different command, you can set the environment variable:
// SEARCH_REPLICA_COMMAND=search-replica
//
// search-replica works subscribing to a PostgreSQL publication using logical replication.
// https://www.postgresql.org/docs/current/logical-replication.html
//
// The easiest way to have OpenSearch running on your machine is with Docker composer.
// For more information, see https://opensearch.org/downloads.html
//
// Install search-replica with:
// $ go install github.com/hatch-studio/search-replica
package searchtest
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"strings"
"syscall"
"testing"
"time"
"github.com/opensearch-project/opensearch-go"
"github.com/opensearch-project/opensearch-go/opensearchapi"
"github.com/opensearch-project/opensearch-go/opensearchtransport"
)
var (
// IndexPrefix for the search used for integration tests.
// In practice, it has the same value as sqltest.DatabasePrefix.
IndexPrefix = "test"
// The following unexported variables are controlled by environment variables.
// They're unexported as providing multiple ways to set them is a source of confusion.
verboseOpenSearch bool
verboseSearchReplica bool
)
func init() {
if v, ok := os.LookupEnv("VERBOSE_OPENSEARCH"); ok && v != "false" {
verboseOpenSearch = true
}
if v, ok := os.LookupEnv("VERBOSE_SEARCH_REPLICA"); ok && v != "false" {
verboseSearchReplica = true
}
}
// DefaultClient for OpenSearch.
// Not safe for use on production code due to allowing insecure TLS connections.
func DefaultClient(t testing.TB) *opensearch.Client {
cfg := opensearch.Config{
Addresses: []string{os.Getenv("OPENSEARCH_URL")},
Username: os.Getenv("OPENSEARCH_USERNAME"),
Password: os.Getenv("OPENSEARCH_PASSWORD"),
Transport: searchTransport,
}
if verboseOpenSearch {
cfg.Logger = &opensearchtransport.CurlLogger{
Output: Logger(t),
EnableRequestBody: true,
EnableResponseBody: true,
}
}
search, err := opensearch.NewClient(cfg)
if err != nil {
t.Fatalf("Error creating the client: %s", err)
}
return search
}
// searchTransport is almost the same as http.DefaultTransport
// except for not verifying HTTPS connections, so that the developer and CI might use OpenSearch right away without configuring certificates.
var searchTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
// Replica manages the search-replica process that is called when tests are executed.
type Replica struct {
// Publication is essentially a group of PostgreSQL tables whose data changes are intended to be replicated through logical replication.
Publication string
// Slot that search-replica should use.
Slot string
// GracefulStop can be set to call t.Cleanup automatically once tests are done.
// If this field is not set, you must call Stop() manually.
// If search-replica cannot be stopped gracefully within this duration, it's killed.
GracefulStop time.Duration
// cmd is the search-replica process.
cmd *exec.Cmd
// test controlling the replica.
t testing.TB
// stopCtx manages graceful shutdown of the search-replica process.
stopCtx context.Context
}
// Start search-replica.
// It waits until receiving an indication that that streaming replication has started before returning.
//
// The field StopTimeout can be set to stop execution automatically once tests are done.
func (r *Replica) Start(ctx context.Context, t testing.TB) {
if r.t != nil {
panic("search-replica runner already started")
}
command := os.Getenv("SEARCH_REPLICA_COMMAND")
if command == "" {
command = "search-replica"
}
var stopCancel context.CancelFunc
r.stopCtx, stopCancel = context.WithCancel(ctx)
p, err := exec.LookPath(command)
if err != nil {
t.Fatalf("cannot find search-replica on your system: %v", err)
}
r.cmd = exec.CommandContext(r.stopCtx, p, "-recreate")
r.cmd.Env = append(
os.Environ(),
"SEARCH_PUSH_PERIOD=1s",
fmt.Sprintf("PG_PUBLICATION=%s", r.Publication),
fmt.Sprintf("PG_SLOT=%s", r.Slot),
)
// Wait until search-replica announces it's ready.
var success bool
done := make(chan struct{}, 1)
ew := &expectWriter{
find: []byte("Started streaming replication"),
ready: func() {
success = true
done <- struct{}{}
},
}
if verboseSearchReplica {
r.cmd.Stdout = os.Stdout
r.cmd.Stderr = io.MultiWriter(os.Stderr, ew)
} else {
r.cmd.Stderr = ew
}
go func() {
defer stopCancel()
t.Log("starting up search-replica")
if err := r.cmd.Run(); err != nil && !strings.Contains(err.Error(), "signal: terminated") {
t.Errorf("search-replica died: %v", err)
done <- struct{}{}
}
}()
if r.GracefulStop != 0 {
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), r.GracefulStop)
defer cancel()
r.Stop(ctx)
})
}
<-done
if !success {
t.Fatal("search-replica failed to start")
}
}
// Stop tries to gracefully shutdown search-replica.
// Should be called with t.Cleanup.
//
// If it doesn't terminate gracefully before the received context expires,
// it kills the process.
func (r *Replica) Stop(ctx context.Context) {
if r.cmd == nil || r.cmd.Process == nil || r.cmd.Process.Pid == 0 {
return
}
r.cmd.Process.Signal(syscall.SIGTERM)
select {
case <-r.stopCtx.Done():
return
case <-ctx.Done():
r.t.Error("failed to shutdown search-replica gracefully")
}
}
// expectWriter implements a io.Writer that checks if a message appears in a stream.
type expectWriter struct {
find []byte
success bool
ready func()
memory []byte
}
func (w *expectWriter) Write(p []byte) (n int, err error) {
if !w.success && bytes.Contains(append(w.memory, p...), w.find) {
w.success = true
w.ready()
}
switch {
case len(p) > len(w.find):
w.memory = p
default:
w.memory = append(w.memory[len(w.find)-len(p):], p...)
}
return len(p), nil
}
// NewInfrastructure creates a new search engine infrastructure.
func NewInfrastructure(t testing.TB, c *opensearch.Client) *Infrastructure {
return &Infrastructure{
t: t,
client: c,
}
}
// Infrastructure for automating integration testing with OpenSearch.
type Infrastructure struct {
t testing.TB
client *opensearch.Client
}
// Reindex recreates the search engine index.
//
// You probably want to use the database name as the index name.
// To make development easier, the index isn't deleted after the tests are done so you can manually check them easily.
// You can use the OpenSearch Dashboard or CLI tool for that.
//
// After table schema changes, the settings or mapping might get outdated.
// If this happens, you might find the following command useful to understand what is going on:
// $ opensearch-cli curl get --path "/<index>" | jq --indent 4
func (i *Infrastructure) Reindex(ctx context.Context, index string, body []byte) {
i.deleteIndex(ctx, index)
i.createIndex(ctx, index, body)
}
// createdIndex creates the search index.
func (i *Infrastructure) createIndex(ctx context.Context, index string, body []byte) {
i.verifyIndex(index)
req := opensearchapi.IndicesCreateRequest{
Index: index,
Body: bytes.NewBuffer(body),
}
resp, err := req.Do(ctx, i.client)
if err != nil {
i.t.Fatalf("map index: %v", err)
}
defer resp.Body.Close() // nolint: errcheck
if resp.IsError() {
i.t.Fatalf("search: mapping response with status code %v", resp.StatusCode)
}
}
// deleteIndex deletes the search index.
// If the index doesn't exist, it does nothing.
func (i *Infrastructure) deleteIndex(ctx context.Context, index string) {
i.verifyIndex(index)
req := opensearchapi.IndicesDeleteRequest{
Index: []string{index},
}
resp, err := req.Do(ctx, i.client)
if err != nil {
i.t.Fatalf("cleanup index: %v", err)
}
defer resp.Body.Close() // nolint: errcheck
// Ignore index_not_found_exception error.
if resp.IsError() && resp.StatusCode != 404 {
i.t.Fatalf("search: cleanup response with status code %v", resp.StatusCode)
}
}
// verifyIndex checks if the index starts with "test" with the purpose of making this fail safely if called on a production environment by mistake.
func (i *Infrastructure) verifyIndex(index string) {
if !strings.HasPrefix(index, IndexPrefix) {
i.t.Fatalf(`refusing to run integration tests: search engine index name is %q (%q prefix is required)`, index, IndexPrefix)
}
}
// Logger for OpenSearch requests executed during tests.
//
// It only prints when verbose mode is used, and its output is controlled by
// setting the environment variable VERBOSE_OPENSEARCH=true.
func Logger(t testing.TB) io.Writer {
return &testingLogger{
t: t,
}
}
type testingLogger struct {
t testing.TB
}
func (l *testingLogger) Write(p []byte) (int, error) {
l.t.Log(string(p))
return 0, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment