Skip to content

Instantly share code, notes, and snippets.

Created Oct 15, 2021
What would you like to do?
search test infrastructure with Replica still using testing.TB (to be replaced)
// Package searchtest can be used to write integration tests with OpenSearch and search-replica.
// You can set the following environment variables to control logging:
// VERBOSE_OPENSEARCH=true to enable logging of requests and responses made with the OpenSearch client.
// VERBOSE_SEARCH_REPLICA=true to print verbose output from the search-replica process.
// Use If search-replica isn't on your $PATH or to run a different command, you can set the environment variable:
// SEARCH_REPLICA_COMMAND=search-replica
// search-replica works subscribing to a PostgreSQL publication using logical replication.
// The easiest way to have OpenSearch running on your machine is with Docker composer.
// For more information, see
// Install search-replica with:
// $ go install
package searchtest
import (
var (
// IndexPrefix for the search used for integration tests.
// In practice, it has the same value as sqltest.DatabasePrefix.
IndexPrefix = "test"
// The following unexported variables are controlled by environment variables.
// They're unexported as providing multiple ways to set them is a source of confusion.
verboseOpenSearch bool
verboseSearchReplica bool
func init() {
if v, ok := os.LookupEnv("VERBOSE_OPENSEARCH"); ok && v != "false" {
verboseOpenSearch = true
if v, ok := os.LookupEnv("VERBOSE_SEARCH_REPLICA"); ok && v != "false" {
verboseSearchReplica = true
// DefaultClient for OpenSearch.
// Not safe for use on production code due to allowing insecure TLS connections.
func DefaultClient(t testing.TB) *opensearch.Client {
cfg := opensearch.Config{
Addresses: []string{os.Getenv("OPENSEARCH_URL")},
Username: os.Getenv("OPENSEARCH_USERNAME"),
Password: os.Getenv("OPENSEARCH_PASSWORD"),
Transport: searchTransport,
if verboseOpenSearch {
cfg.Logger = &opensearchtransport.CurlLogger{
Output: Logger(t),
EnableRequestBody: true,
EnableResponseBody: true,
search, err := opensearch.NewClient(cfg)
if err != nil {
t.Fatalf("Error creating the client: %s", err)
return search
// searchTransport is almost the same as http.DefaultTransport
// except for not verifying HTTPS connections, so that the developer and CI might use OpenSearch right away without configuring certificates.
var searchTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// Replica manages the search-replica process that is called when tests are executed.
type Replica struct {
// Publication is essentially a group of PostgreSQL tables whose data changes are intended to be replicated through logical replication.
Publication string
// Slot that search-replica should use.
Slot string
// GracefulStop can be set to call t.Cleanup automatically once tests are done.
// If this field is not set, you must call Stop() manually.
// If search-replica cannot be stopped gracefully within this duration, it's killed.
GracefulStop time.Duration
// cmd is the search-replica process.
cmd *exec.Cmd
// test controlling the replica.
t testing.TB
// stopCtx manages graceful shutdown of the search-replica process.
stopCtx context.Context
// Start search-replica.
// It waits until receiving an indication that that streaming replication has started before returning.
// The field StopTimeout can be set to stop execution automatically once tests are done.
func (r *Replica) Start(ctx context.Context, t testing.TB) {
if r.t != nil {
panic("search-replica runner already started")
command := os.Getenv("SEARCH_REPLICA_COMMAND")
if command == "" {
command = "search-replica"
var stopCancel context.CancelFunc
r.stopCtx, stopCancel = context.WithCancel(ctx)
p, err := exec.LookPath(command)
if err != nil {
t.Fatalf("cannot find search-replica on your system: %v", err)
r.cmd = exec.CommandContext(r.stopCtx, p, "-recreate")
r.cmd.Env = append(
fmt.Sprintf("PG_PUBLICATION=%s", r.Publication),
fmt.Sprintf("PG_SLOT=%s", r.Slot),
// Wait until search-replica announces it's ready.
var success bool
done := make(chan struct{}, 1)
ew := &expectWriter{
find: []byte("Started streaming replication"),
ready: func() {
success = true
done <- struct{}{}
if verboseSearchReplica {
r.cmd.Stdout = os.Stdout
r.cmd.Stderr = io.MultiWriter(os.Stderr, ew)
} else {
r.cmd.Stderr = ew
go func() {
defer stopCancel()
t.Log("starting up search-replica")
if err := r.cmd.Run(); err != nil && !strings.Contains(err.Error(), "signal: terminated") {
t.Errorf("search-replica died: %v", err)
done <- struct{}{}
if r.GracefulStop != 0 {
t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), r.GracefulStop)
defer cancel()
if !success {
t.Fatal("search-replica failed to start")
// Stop tries to gracefully shutdown search-replica.
// Should be called with t.Cleanup.
// If it doesn't terminate gracefully before the received context expires,
// it kills the process.
func (r *Replica) Stop(ctx context.Context) {
if r.cmd == nil || r.cmd.Process == nil || r.cmd.Process.Pid == 0 {
select {
case <-r.stopCtx.Done():
case <-ctx.Done():
r.t.Error("failed to shutdown search-replica gracefully")
// expectWriter implements a io.Writer that checks if a message appears in a stream.
type expectWriter struct {
find []byte
success bool
ready func()
memory []byte
func (w *expectWriter) Write(p []byte) (n int, err error) {
if !w.success && bytes.Contains(append(w.memory, p...), w.find) {
w.success = true
switch {
case len(p) > len(w.find):
w.memory = p
w.memory = append(w.memory[len(w.find)-len(p):], p...)
return len(p), nil
// NewInfrastructure creates a new search engine infrastructure.
func NewInfrastructure(t testing.TB, c *opensearch.Client) *Infrastructure {
return &Infrastructure{
t: t,
client: c,
// Infrastructure for automating integration testing with OpenSearch.
type Infrastructure struct {
t testing.TB
client *opensearch.Client
// Reindex recreates the search engine index.
// You probably want to use the database name as the index name.
// To make development easier, the index isn't deleted after the tests are done so you can manually check them easily.
// You can use the OpenSearch Dashboard or CLI tool for that.
// After table schema changes, the settings or mapping might get outdated.
// If this happens, you might find the following command useful to understand what is going on:
// $ opensearch-cli curl get --path "/<index>" | jq --indent 4
func (i *Infrastructure) Reindex(ctx context.Context, index string, body []byte) {
i.deleteIndex(ctx, index)
i.createIndex(ctx, index, body)
// createdIndex creates the search index.
func (i *Infrastructure) createIndex(ctx context.Context, index string, body []byte) {
req := opensearchapi.IndicesCreateRequest{
Index: index,
Body: bytes.NewBuffer(body),
resp, err := req.Do(ctx, i.client)
if err != nil {
i.t.Fatalf("map index: %v", err)
defer resp.Body.Close() // nolint: errcheck
if resp.IsError() {
i.t.Fatalf("search: mapping response with status code %v", resp.StatusCode)
// deleteIndex deletes the search index.
// If the index doesn't exist, it does nothing.
func (i *Infrastructure) deleteIndex(ctx context.Context, index string) {
req := opensearchapi.IndicesDeleteRequest{
Index: []string{index},
resp, err := req.Do(ctx, i.client)
if err != nil {
i.t.Fatalf("cleanup index: %v", err)
defer resp.Body.Close() // nolint: errcheck
// Ignore index_not_found_exception error.
if resp.IsError() && resp.StatusCode != 404 {
i.t.Fatalf("search: cleanup response with status code %v", resp.StatusCode)
// verifyIndex checks if the index starts with "test" with the purpose of making this fail safely if called on a production environment by mistake.
func (i *Infrastructure) verifyIndex(index string) {
if !strings.HasPrefix(index, IndexPrefix) {
i.t.Fatalf(`refusing to run integration tests: search engine index name is %q (%q prefix is required)`, index, IndexPrefix)
// Logger for OpenSearch requests executed during tests.
// It only prints when verbose mode is used, and its output is controlled by
// setting the environment variable VERBOSE_OPENSEARCH=true.
func Logger(t testing.TB) io.Writer {
return &testingLogger{
t: t,
type testingLogger struct {
t testing.TB
func (l *testingLogger) Write(p []byte) (int, error) {
return 0, nil
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment