Skip to content

Instantly share code, notes, and snippets.

@klion26
Last active January 29, 2019 09:41
Show Gist options
  • Save klion26/eafa6174df361d3bb0447c2e7681db0f to your computer and use it in GitHub Desktop.
Save klion26/eafa6174df361d3bb0447c2e7681db0f to your computer and use it in GitHub Desktop.
testCheckpointRecoveryFailure
public void testRecoveryFailureLog() throws Exception {
final Time timeout = Time.seconds(30L);
final File zookeeperStoragePath = temporaryFolder.newFolder();
// Config
final int numberOfJobManagers = 2;
final int numberOfTaskManagers = 2;
final int numberOfSlotsPerTaskManager = 2;
assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager);
// Job managers
final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers];
// Task managers
TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers];
HighAvailabilityServices highAvailabilityServices = null;
LeaderRetrievalService leaderRetrievalService = null;
// Coordination between the processes goes through a directory
File coordinateTempDir = null;
// Cluster config
Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
// Task manager configuration
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
try {
final Deadline deadline = TestTimeOut.fromNow();
// Coordination directory
coordinateTempDir = temporaryFolder.newFolder();
// Start first process
dispatcherProcesses[0] = new DispatcherProcess(0, config);
dispatcherProcesses[0].startProcess();
highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config,
TestingUtils.defaultExecutor());
// Start the task manager process
for (int i = 0; i < numberOfTaskManagers; i++) {
taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate());
taskManagerRunners[i].start();
}
// Leader listener
TestingListener leaderListener = new TestingListener();
leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
leaderRetrievalService.start(leaderListener);
// Initial submission
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
String leaderAddress = leaderListener.getAddress();
UUID leaderId = leaderListener.getLeaderSessionID();
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect(
leaderAddress,
DispatcherId.fromUuid(leaderId),
DispatcherGateway.class);
final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get();
// Wait for all task managers to connect to the leading job manager
waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft());
final File coordinateDirClosure = coordinateTempDir;
final Throwable[] errorRef = new Throwable[1];
// we trigger program execution in a separate thread
Thread programTrigger = new Thread("Program Trigger") {
@Override
public void run() {
try {
testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
}
catch (Throwable t) {
t.printStackTrace();
errorRef[0] = t;
}
}
};
//start the test program
programTrigger.start();
// wait until all marker files are in place, indicating that all tasks have started
AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis());
// delete the coordinate Temp Directory
FileUtils.deleteDirectory(coordinateTempDir);
// Kill one of the job managers and trigger recovery
dispatcherProcesses[0].destroy();
dispatcherProcesses[1] = new DispatcherProcess(1, config);
dispatcherProcesses[1].startProcess();
FiniteDuration testTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
Deadline testDeadline = testTimeOut.fromNow();
// test that the excepted log will be printed.
boolean success = false;
while (testDeadline.hasTimeLeft()) {
String output = dispatcherProcesses[1].getProcessOutput();
if (output != null) {
if (output.contains("Could not restart the job") &&
output.contains("java.io.FileNotFoundException")) {
success = true;
break;
}
}
else {
log.warn("No process output available.");
}
Thread.sleep(500);
}
assertTrue("Did not find expected output in logs.", success);
}
catch (Throwable t) {
// Print early (in some situations the process logs get too big
// for Travis and the root problem is not shown)
t.printStackTrace();
for (DispatcherProcess p : dispatcherProcesses) {
if (p != null) {
p.printProcessLog();
}
}
throw t;
}
finally {
for (int i = 0; i < numberOfTaskManagers; i++) {
if (taskManagerRunners[i] != null) {
taskManagerRunners[i].close();
}
}
if (leaderRetrievalService != null) {
leaderRetrievalService.stop();
}
for (DispatcherProcess dispatcherProcess : dispatcherProcesses) {
if (dispatcherProcess != null) {
dispatcherProcess.destroy();
}
}
if (highAvailabilityServices != null) {
highAvailabilityServices.closeAndCleanupAllData();
}
RpcUtils.terminateRpcService(rpcService, timeout);
// Delete coordination directory
if (coordinateTempDir != null) {
try {
FileUtils.deleteDirectory(coordinateTempDir);
}
catch (Throwable ignored) {
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment