Last active
January 29, 2019 09:41
-
-
Save klion26/eafa6174df361d3bb0447c2e7681db0f to your computer and use it in GitHub Desktop.
testCheckpointRecoveryFailure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void testRecoveryFailureLog() throws Exception { | |
final Time timeout = Time.seconds(30L); | |
final File zookeeperStoragePath = temporaryFolder.newFolder(); | |
// Config | |
final int numberOfJobManagers = 2; | |
final int numberOfTaskManagers = 2; | |
final int numberOfSlotsPerTaskManager = 2; | |
assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager); | |
// Job managers | |
final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers]; | |
// Task managers | |
TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers]; | |
HighAvailabilityServices highAvailabilityServices = null; | |
LeaderRetrievalService leaderRetrievalService = null; | |
// Coordination between the processes goes through a directory | |
File coordinateTempDir = null; | |
// Cluster config | |
Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( | |
zooKeeper.getConnectString(), zookeeperStoragePath.getPath()); | |
// Task manager configuration | |
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); | |
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); | |
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); | |
final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config); | |
try { | |
final Deadline deadline = TestTimeOut.fromNow(); | |
// Coordination directory | |
coordinateTempDir = temporaryFolder.newFolder(); | |
// Start first process | |
dispatcherProcesses[0] = new DispatcherProcess(0, config); | |
dispatcherProcesses[0].startProcess(); | |
highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( | |
config, | |
TestingUtils.defaultExecutor()); | |
// Start the task manager process | |
for (int i = 0; i < numberOfTaskManagers; i++) { | |
taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate()); | |
taskManagerRunners[i].start(); | |
} | |
// Leader listener | |
TestingListener leaderListener = new TestingListener(); | |
leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); | |
leaderRetrievalService.start(leaderListener); | |
// Initial submission | |
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); | |
String leaderAddress = leaderListener.getAddress(); | |
UUID leaderId = leaderListener.getLeaderSessionID(); | |
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect( | |
leaderAddress, | |
DispatcherId.fromUuid(leaderId), | |
DispatcherGateway.class); | |
final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get(); | |
// Wait for all task managers to connect to the leading job manager | |
waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft()); | |
final File coordinateDirClosure = coordinateTempDir; | |
final Throwable[] errorRef = new Throwable[1]; | |
// we trigger program execution in a separate thread | |
Thread programTrigger = new Thread("Program Trigger") { | |
@Override | |
public void run() { | |
try { | |
testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath); | |
} | |
catch (Throwable t) { | |
t.printStackTrace(); | |
errorRef[0] = t; | |
} | |
} | |
}; | |
//start the test program | |
programTrigger.start(); | |
// wait until all marker files are in place, indicating that all tasks have started | |
AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir, | |
READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis()); | |
// delete the coordinate Temp Directory | |
FileUtils.deleteDirectory(coordinateTempDir); | |
// Kill one of the job managers and trigger recovery | |
dispatcherProcesses[0].destroy(); | |
dispatcherProcesses[1] = new DispatcherProcess(1, config); | |
dispatcherProcesses[1].startProcess(); | |
FiniteDuration testTimeOut = new FiniteDuration(5, TimeUnit.MINUTES); | |
Deadline testDeadline = testTimeOut.fromNow(); | |
// test that the excepted log will be printed. | |
boolean success = false; | |
while (testDeadline.hasTimeLeft()) { | |
String output = dispatcherProcesses[1].getProcessOutput(); | |
if (output != null) { | |
if (output.contains("Could not restart the job") && | |
output.contains("java.io.FileNotFoundException")) { | |
success = true; | |
break; | |
} | |
} | |
else { | |
log.warn("No process output available."); | |
} | |
Thread.sleep(500); | |
} | |
assertTrue("Did not find expected output in logs.", success); | |
} | |
catch (Throwable t) { | |
// Print early (in some situations the process logs get too big | |
// for Travis and the root problem is not shown) | |
t.printStackTrace(); | |
for (DispatcherProcess p : dispatcherProcesses) { | |
if (p != null) { | |
p.printProcessLog(); | |
} | |
} | |
throw t; | |
} | |
finally { | |
for (int i = 0; i < numberOfTaskManagers; i++) { | |
if (taskManagerRunners[i] != null) { | |
taskManagerRunners[i].close(); | |
} | |
} | |
if (leaderRetrievalService != null) { | |
leaderRetrievalService.stop(); | |
} | |
for (DispatcherProcess dispatcherProcess : dispatcherProcesses) { | |
if (dispatcherProcess != null) { | |
dispatcherProcess.destroy(); | |
} | |
} | |
if (highAvailabilityServices != null) { | |
highAvailabilityServices.closeAndCleanupAllData(); | |
} | |
RpcUtils.terminateRpcService(rpcService, timeout); | |
// Delete coordination directory | |
if (coordinateTempDir != null) { | |
try { | |
FileUtils.deleteDirectory(coordinateTempDir); | |
} | |
catch (Throwable ignored) { | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment