Skip to content

Instantly share code, notes, and snippets.

@kdgregory
Last active June 11, 2023 19:38
Show Gist options
  • Save kdgregory/ef033040efcbdbe2875893135478a905 to your computer and use it in GitHub Desktop.
Save kdgregory/ef033040efcbdbe2875893135478a905 to your computer and use it in GitHub Desktop.
Attempts to stress CloudWatch logs to replicate failure of the logging integration test
example, 519> java -jar target/cloudwatch-1.0-SNAPSHOT.jar 10 8 2
2023-06-11 07:24:52,595 INFO c.k.e.LogStressor_v2 [main] - deleting log group if it exists: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [main] - rep 0 starting threads
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [Thread-0] - thread starting
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [Thread-0] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [Thread-1] - thread starting
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [Thread-2] - thread starting
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [Thread-1] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [Thread-2] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,993 INFO c.k.e.LogStressor_v2 [Thread-3] - thread starting
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-3] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-4] - thread starting
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-4] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-5] - thread starting
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-5] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-6] - thread starting
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [main] - rep 0 waiting for threads to finish
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-6] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-7] - thread starting
2023-06-11 07:24:52,994 INFO c.k.e.LogStressor_v2 [Thread-7] - attempting to create log group: com.kdgregory.example.LogStressor_v2-1686482692166
2023-06-11 07:24:53,118 WARN c.k.e.LogStressor_v2 [Thread-1] - caught OperationAbortedException
2023-06-11 07:24:53,118 WARN c.k.e.LogStressor_v2 [Thread-6] - caught OperationAbortedException
2023-06-11 07:24:53,174 INFO c.k.e.LogStressor_v2 [Thread-1] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,211 INFO c.k.e.LogStressor_v2 [Thread-5] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,211 INFO c.k.e.LogStressor_v2 [Thread-0] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,219 INFO c.k.e.LogStressor_v2 [Thread-7] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,220 INFO c.k.e.LogStressor_v2 [Thread-3] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,229 INFO c.k.e.LogStressor_v2 [Thread-6] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,229 INFO c.k.e.LogStressor_v2 [Thread-2] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,263 INFO c.k.e.LogStressor_v2 [Thread-1] - sending batch 0
2023-06-11 07:24:53,264 INFO c.k.e.LogStressor_v2 [Thread-4] - attempting to create log stream: shared_stream
2023-06-11 07:24:53,322 INFO c.k.e.LogStressor_v2 [Thread-4] - sending batch 0
2023-06-11 07:24:53,328 INFO c.k.e.LogStressor_v2 [Thread-0] - sending batch 0
2023-06-11 07:24:53,335 INFO c.k.e.LogStressor_v2 [Thread-5] - sending batch 0
2023-06-11 07:24:53,337 INFO c.k.e.LogStressor_v2 [Thread-7] - sending batch 0
2023-06-11 07:24:53,343 INFO c.k.e.LogStressor_v2 [Thread-3] - sending batch 0
2023-06-11 07:24:53,354 INFO c.k.e.LogStressor_v2 [Thread-2] - sending batch 0
2023-06-11 07:24:53,362 INFO c.k.e.LogStressor_v2 [Thread-6] - sending batch 0
2023-06-11 07:24:53,753 INFO c.k.e.LogStressor_v2 [Thread-1] - sent batch 0; rejected event info: null
2023-06-11 07:24:53,753 INFO c.k.e.LogStressor_v2 [Thread-1] - sending batch 1
2023-06-11 07:24:53,788 INFO c.k.e.LogStressor_v2 [Thread-4] - sent batch 0; rejected event info: null
2023-06-11 07:24:53,788 INFO c.k.e.LogStressor_v2 [Thread-4] - sending batch 1
2023-06-11 07:24:53,820 INFO c.k.e.LogStressor_v2 [Thread-3] - sent batch 0; rejected event info: null
2023-06-11 07:24:53,821 INFO c.k.e.LogStressor_v2 [Thread-3] - sending batch 1
2023-06-11 07:24:53,840 INFO c.k.e.LogStressor_v2 [Thread-2] - sent batch 0; rejected event info: null
2023-06-11 07:24:53,840 INFO c.k.e.LogStressor_v2 [Thread-2] - sending batch 1
2023-06-11 07:24:53,854 INFO c.k.e.LogStressor_v2 [Thread-6] - sent batch 0; rejected event info: null
2023-06-11 07:24:53,854 INFO c.k.e.LogStressor_v2 [Thread-0] - sent batch 0; rejected event info: null
2023-06-11 07:24:53,854 INFO c.k.e.LogStressor_v2 [Thread-6] - sending batch 1
2023-06-11 07:24:53,854 INFO c.k.e.LogStressor_v2 [Thread-0] - sending batch 1
2023-06-11 07:24:53,893 INFO c.k.e.LogStressor_v2 [Thread-7] - sent batch 0; rejected event info: null
2023-06-11 07:24:53,893 INFO c.k.e.LogStressor_v2 [Thread-7] - sending batch 1
2023-06-11 07:24:54,115 INFO c.k.e.LogStressor_v2 [Thread-1] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,115 INFO c.k.e.LogStressor_v2 [Thread-1] - thread finished
2023-06-11 07:24:54,152 INFO c.k.e.LogStressor_v2 [Thread-4] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,152 INFO c.k.e.LogStressor_v2 [Thread-4] - thread finished
2023-06-11 07:24:54,157 INFO c.k.e.LogStressor_v2 [Thread-6] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,157 INFO c.k.e.LogStressor_v2 [Thread-6] - thread finished
2023-06-11 07:24:54,175 INFO c.k.e.LogStressor_v2 [Thread-3] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,175 INFO c.k.e.LogStressor_v2 [Thread-3] - thread finished
2023-06-11 07:24:54,188 INFO c.k.e.LogStressor_v2 [Thread-0] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,188 INFO c.k.e.LogStressor_v2 [Thread-0] - thread finished
2023-06-11 07:24:54,258 INFO c.k.e.LogStressor_v2 [Thread-2] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,259 INFO c.k.e.LogStressor_v2 [Thread-2] - thread finished
2023-06-11 07:24:54,278 INFO c.k.e.LogStressor_v2 [Thread-7] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,278 INFO c.k.e.LogStressor_v2 [Thread-7] - thread finished
2023-06-11 07:24:54,297 INFO c.k.e.LogStressor_v2 [Thread-5] - sent batch 0; rejected event info: null
2023-06-11 07:24:54,297 INFO c.k.e.LogStressor_v2 [Thread-5] - sending batch 1
2023-06-11 07:24:54,644 INFO c.k.e.LogStressor_v2 [Thread-5] - sent batch 1; rejected event info: null
2023-06-11 07:24:54,644 INFO c.k.e.LogStressor_v2 [Thread-5] - thread finished
2023-06-11 07:24:54,644 INFO c.k.e.LogStressor_v2 [main] - retrieving messages from com.kdgregory.example.LogStressor_v2-1686482692166 / shared_stream
2023-06-11 07:24:58,764 DEBUG c.k.e.LogStressor_v2 [main] - retrieved 59827 events
2023-06-11 07:25:13,700 DEBUG c.k.e.LogStressor_v2 [main] - retrieved 63926 events
2023-06-11 07:25:27,767 DEBUG c.k.e.LogStressor_v2 [main] - retrieved 63926 events
2023-06-11 07:25:41,165 DEBUG c.k.e.LogStressor_v2 [main] - retrieved 63926 events
2023-06-11 07:25:55,239 DEBUG c.k.e.LogStressor_v2 [main] - retrieved 63926 events
2023-06-11 07:26:05,239 ERROR c.k.e.LogStressor_v2 [main] - failed to retrieve all messages
<configuration debug="false">
<jmxConfigurator />
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %-5level %logger{24} [%thread] - %msg%n</pattern>
</encoder>
</appender>
<root level="warn">
<appender-ref ref="CONSOLE" />
</root>
<logger name="com.kdgregory" level="debug">
</logger>
</configuration>
package com.kdgregory.example;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.*;
public class LogStressor_v2
{
private final static int BATCH_SIZE = 4000;
private static Logger logger = LoggerFactory.getLogger(LogStressor_v2.class);
public static void main(String[] argv)
throws Exception
{
if (argv.length != 3)
{
System.err.println("invocation: LogsStressor NUM_REPS NUM_THREADS NUM_BATCHES");
System.exit(1);
}
int numReps = Integer.parseInt(argv[0]);
int numThreads = Integer.parseInt(argv[1]);
int numBatches = Integer.parseInt(argv[2]);
int expectedMessageCount = numThreads * numBatches * BATCH_SIZE;
String logGroupName = LogStressor_v2.class.getName() + "-" + System.currentTimeMillis();
String logStreamName = "shared_stream";
CloudWatchLogsClient client = CloudWatchLogsClient.builder().build();
for (int repNumber = 0 ; repNumber < numReps ; repNumber++)
{
deleteLogGroupIfExists(client, logGroupName);
logger.info("rep {} starting threads", repNumber);
List<Thread> threads = new ArrayList<Thread>();
for (int jj = 0 ; jj < numThreads ; jj++)
{
Thread thread = new Thread(new LogWriter(client, logGroupName, logStreamName, numBatches));
threads.add(thread);
thread.start();
}
logger.info("rep {} waiting for threads to finish", repNumber);
for (Thread thread : threads)
{
thread.join();
}
List<OutputLogEvent> events = retrieveAllMessages(client, logGroupName, logStreamName, expectedMessageCount);
if (events.size() != expectedMessageCount)
{
logger.error("failed to retrieve all messages");
System.exit(1);
}
deleteLogGroupIfExists(client, logGroupName);
logger.info("rep {} finished", repNumber);
}
}
//----------------------------------------------------------------------------
// This is the log-writer, which is run on a background thread
//----------------------------------------------------------------------------
private static class LogWriter
implements Runnable
{
private CloudWatchLogsClient client;
private String logGroupName;
private String logStreamName;
private int numBatches;
public LogWriter(CloudWatchLogsClient client, String logGroupName, String logStreamName, int numBatches)
{
this.client = client;
this.logGroupName = logGroupName;
this.logStreamName = logStreamName;
this.numBatches = numBatches;
}
@Override
public void run()
{
logger.info("thread starting");
tryCreateLogGroup(client, logGroupName);
sleepQuietly(100); // works-around a known issue with Insights not reading messages before group creation timestamp
long baseTimestamp = System.currentTimeMillis();
tryCreateLogStream(client, logGroupName, logStreamName);
for (int ii = 0 ; ii < numBatches ; ) // increment only on success
{
try
{
logger.info("sending batch {}", ii);
PutLogEventsResponse response = writeBatch(client, logGroupName, logStreamName, ii, baseTimestamp);
logger.info("sent batch {}; rejected event info: {}", ii, response.rejectedLogEventsInfo());
ii++;
}
catch (Exception ex)
{
logger.error("exception sending batch", ex);
}
}
logger.info("thread finished");
}
}
//----------------------------------------------------------------------------
// Static utility methods for interacting with the SDK
//----------------------------------------------------------------------------
private static LogGroup describeLogGroup(CloudWatchLogsClient client, String logGroupName)
{
DescribeLogGroupsRequest request = DescribeLogGroupsRequest.builder()
.logGroupNamePrefix(logGroupName)
.build();
DescribeLogGroupsResponse response = client.describeLogGroups(request);
// the name should be unique, so we won't bother with pagination
for (LogGroup logGroup : response.logGroups())
{
if (logGroup.logGroupName().equals(logGroupName))
return logGroup;
}
return null;
}
private static void tryCreateLogGroup(CloudWatchLogsClient client, String logGroupName)
{
logger.info("attempting to create log group: {}", logGroupName);
try
{
CreateLogGroupRequest request = CreateLogGroupRequest.builder()
.logGroupName(logGroupName)
.build();
client.createLogGroup(request);
}
catch (OperationAbortedException ex)
{
// this means that someone is trying to create the group concurrently
// fall through to wait for it to be ready
logger.warn("caught OperationAbortedException");
}
catch (ResourceAlreadyExistsException ex)
{
// this is OK, means another thread already created stream
}
// must wait for group to become ready before we can create stream
long timeoutAt = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeoutAt)
{
LogGroup group = describeLogGroup(client, logGroupName);
if (group != null)
return;
}
throw new RuntimeException("timed out waiting for log group to become ready");
}
private static void deleteLogGroupIfExists(CloudWatchLogsClient client, String logGroupName)
{
logger.info("deleting log group if it exists: {}", logGroupName);
try
{
DeleteLogGroupRequest request = DeleteLogGroupRequest.builder()
.logGroupName(logGroupName)
.build();
client.deleteLogGroup(request);
}
catch (ResourceNotFoundException ex)
{
// doesn't exist; no problem
return;
}
// wait for it to be gone
long timeoutAt = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeoutAt)
{
LogGroup group = describeLogGroup(client, logGroupName);
if (group == null)
return;
}
throw new RuntimeException("timed out waiting for log group deletion");
}
private static LogStream describeLogStream(CloudWatchLogsClient client, String logGroupName, String logStreamName)
{
DescribeLogStreamsRequest request = DescribeLogStreamsRequest.builder()
.logGroupName(logGroupName)
.logStreamNamePrefix(logStreamName)
.build();
DescribeLogStreamsResponse response = client.describeLogStreams(request);
// should be only one, no need for pagination
for (LogStream logStream : response.logStreams())
{
if (logStream.logStreamName().equals(logStreamName))
return logStream;
}
return null;
}
private static void tryCreateLogStream(CloudWatchLogsClient client, String logGroupName, String logStreamName)
{
logger.info("attempting to create log stream: {}", logStreamName);
try
{
CreateLogStreamRequest request = CreateLogStreamRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.build();
client.createLogStream(request);
}
catch (OperationAbortedException ex)
{
// this means that someone is trying to create the stream concurrently
// fall through to wait for it to be ready
logger.warn("caught OperationAbortedException");
}
catch (ResourceAlreadyExistsException ex)
{
// this is OK, means another thread already created stream
}
// must wait for stream to become ready before we can write
long timeoutAt = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeoutAt)
{
if (describeLogStream(client, logGroupName, logStreamName) != null)
return;
}
throw new RuntimeException("timed out waiting for log stream to become ready");
}
private static PutLogEventsResponse writeBatch(CloudWatchLogsClient client, String logGroupName, String logStreamName, int batchNum, long baseTimestamp)
{
String chunk = "123456789+123456789+123456789+123456789+123456789+123456789+";
String threadName = Thread.currentThread().getName();
List<InputLogEvent> logEvents = new ArrayList<>(1024);
for (int ii = 0 ; ii < BATCH_SIZE ; ii++)
{
logEvents.add(InputLogEvent.builder()
.timestamp(baseTimestamp + ii)
.message(String.format("%s, batch %d, message %d, %s", threadName, batchNum, ii, chunk))
.build());
}
PutLogEventsRequest request = PutLogEventsRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.logEvents(logEvents)
.build();
return client.putLogEvents(request);
}
private static List<OutputLogEvent> retrieveAllMessages(CloudWatchLogsClient client, String logGroupName, String logStreamName, int expectedMessageCount)
{
logger.info("retrieving messages from {} / {}", logGroupName, logStreamName);
List<OutputLogEvent> result = new ArrayList<OutputLogEvent>();
long timeout = System.currentTimeMillis() + 60000;
while (System.currentTimeMillis() < timeout)
{
result.clear();
GetLogEventsRequest request = GetLogEventsRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.startFromHead(Boolean.TRUE)
.startTime(Long.valueOf(0))
.build();
for (OutputLogEvent event : client.getLogEventsPaginator(request).events())
{
result.add(event);
}
logger.debug("retrieved {} events", result.size());
if ((expectedMessageCount == 0) || (result.size() >= expectedMessageCount))
return result;
sleepQuietly(10000);
}
return result;
}
private static void sleepQuietly(long millis)
{
try
{
Thread.sleep(millis);
}
catch (InterruptedException ex)
{
logger.warn("sleep interrupted");
}
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kdgregory.example</groupId>
<artifactId>cloudwatch</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>CloudWatch Logs Stressor</name>
<description>
Dumps messages into CloudWatch Logs using multiple threads, to verify that
all messages were written.
</description>
<properties>
<!-- build configuration props, because Maven can't pick a sensible default -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<aws-sdk-v2.version>2.20.67</aws-sdk-v2.version>
<logback.version>1.2.11</logback.version>
<slf4j.version>1.7.32</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatchlogs</artifactId>
<version>${aws-sdk-v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws-sdk-v2.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- use the SLF4J bridge for commons-logging in case it's pulled in transiently -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<debug>true</debug>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.kdgregory.example.LogStressor_v2</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
@kdgregory
Copy link
Author

kdgregory commented Jun 11, 2023

Instructions

These assume that you're running on Linux, and have Maven and a Java SDK installed.

  1. Create working directory and switch to it.

    mkdir /tmp/example
    cd /tmp/example
    vi pom.xml
    
  2. Store pom.xml into this directory using your favorite editor.

    vi pom.xml
    
  3. Create the resources directory, and store logback.xml into it.

    mkdir -p src/main/resources
    vi src/main/resources/logback.xml
    
  4. Create the source directory, and store LogStressor_v2.java into it.

    mkdir -p src/main/java/com/kdgregory/example
    vi src/main/java/com/kdgregory/example/LogStressor_v2.java
    
  5. Use Maven to build the executable JAR.

    mvn clean package
    
  6. Run the JAR. In this case I tell it to perform 20 repetitions, with 8 threads that each write two batches of 4,000 events.

     java -jar target/cloudwatch-1.0-SNAPSHOT.jar 20 8 2
    

If you do not see an OperationAbortedException in the logs, re-run and possibly increase the number of reps.

This example will delete the log group that it creates, unless the retrieval fails. In that case, the log group is left in place for forensics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment