Skip to content

Instantly share code, notes, and snippets.

@fukajun
Last active July 15, 2020 08:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fukajun/d3e4f9701133bfc612553846aaa94697 to your computer and use it in GitHub Desktop.
Save fukajun/d3e4f9701133bfc612553846aaa94697 to your computer and use it in GitHub Desktop.

DigDagでタスクが実行されるときの流れを少し調べたのでメモ 理解までは至ってないが、通知がretryされる部分に関しては見つけることができた

タスクが実行される直前

実行時に Starting a new session project のメッセージをログとして出力している部分

https://github.com/treasure-data/digdag/blob/ca85577adbaf95d55bf37a32f2628c1603efd126/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java#L294-L299

                    logger.info("Starting a new session project id={} workflow name={} session_time={}",
                            projId, ar.getWorkflowName(), SESSION_TIME_FORMATTER.withZone(ar.getTimeZone()).format(ar.getSessionTime()));

                    StoredSessionAttemptWithSession storedAttemptWithSession =
                        StoredSessionAttemptWithSession.of(siteId, storedSession, storedAttempt);

通知タスクを呼び出している部分

通知もnotifyというタスクぽいので、通常のタスクが実行されるのと変わらない流れをとおる

https://github.com/treasure-data/digdag/blob/ca85577adbaf95d55bf37a32f2628c1603efd126/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java#L257-L257

        TaskResult result = callExecutor(projectPath, type, mergedRequest);

通知を送信しているところ部分

実際の通知の送信処理を呼ぶ部分、RetryExecutorを使って失敗した場合に指定の間隔を開けて再送信を行っている

https://github.com/treasure-data/digdag/blob/ca85577adbaf95d55bf37a32f2628c1603efd126/digdag-core/src/main/java/io/digdag/core/notification/DefaultNotifier.java#L56-L87

    @Override
    public void sendNotification(Notification notification)
            throws NotificationException
    {
        logger.debug("Notification: {}", notification);

        if (sender == null) {
            return;
        }

        RetryExecutor retryExecutor = retryExecutor()
                .retryIf(exception -> true)
                .withInitialRetryWait(minRetryWait)
                .withMaxRetryWait(maxRetryWait)
                .onRetry((exception, retryCount, retryLimit, retryWait) -> logger.warn("Sending notification failed: retry {} of {}", retryCount, retryLimit, exception))
                .withRetryLimit(retries);

        try {
            retryExecutor.run(() -> {
                try {
                    sender.sendNotification(notification);
                }
                catch (NotificationException e) {
                    throw Throwables.propagate(e);
                }
            });
        }
        catch (RetryExecutor.RetryGiveupException e) {
            throw new NotificationException("Sending notification failed", e);
        }
    }
}

タスク失敗時のスタックトレースからおってみる

呼び出されている順番が非常にわかりやすい タスクが失敗したときのログ↓OperatorManager.callExecutor あたり気になるな

	at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
	at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
	at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:312)
	at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:254)
	at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
	at io.digdag.core.agent.ExtractArchiveWorkspaceManager.withExtractedArchive(ExtractArchiveWorkspaceManager.java:77)
	at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
	at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
	at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

タスクが失敗したときはココらへんを通る

https://github.com/treasure-data/digdag/blob/ca85577adbaf95d55bf37a32f2628c1603efd126/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java#L161-L180

                catch (RuntimeException | AssertionError ex) { // Avoid infinite task retry cause of AssertionError by Operators
                    if (ex instanceof ConfigException) {
                        logger.error("Configuration error at task {}: {}", request.getTaskName(), formatExceptionMessage(ex));
                    }
                    else {
                        logger.error("Task failed with unexpected error: {}", ex.getMessage(), ex);
                    }
                    callback.taskFailed(request.getSiteId(),
                            request.getTaskId(), request.getLockId(), agentId,
                            buildExceptionErrorConfig(ex).toConfig(cf));  // no retry
                }
                return true;
            });
        }
        catch (RuntimeException | IOException ex) {
            // exception happened in workspaceManager
            logger.error("Task failed with unexpected error: {}", ex.getMessage(), ex);
            callback.taskFailed(request.getSiteId(),
                    request.getTaskId(), request.getLockId(), agentId,
                    buildExceptionErrorConfig(ex).toConfig(cf));

タスク失敗したときの通知タスクはどのようにして追加されるか?

失敗したときに、taskFailedがよばれるようになっている。

https://github.com/treasure-data/digdag/blob/ca85577adbaf95d55bf37a32f2628c1603efd126/digdag-core/src/main/java/io/digdag/core/agent/InProcessTaskCallbackApi.java#L140-L146

    @Override
    public void taskFailed(int siteId,
            long taskId, String lockId, AgentId agentId,
            Config error)
    {
        tm.begin(() -> exec.taskFailed(siteId, taskId, lockId, agentId, error));
    }

execは WorkflowExecutor

失敗通知用のタスクが設定刷るかどうかの分岐

⭐retry failedするとルートからの実行ではないので、ココがtrueにならない気がする

https://github.com/treasure-data/digdag/blob/ca85577adbaf95d55bf37a32f2628c1603efd126/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java#L641-L643

                if (isRootTask) {
                    errorTaskIds.add(addAttemptFailureAlertTask(lockedTask));
                }

失敗要タスクを追加するコード

https://github.com/treasure-data/digdag/blob/ca85577adbaf95d55bf37a32f2628c1603efd126/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java#L1317-L1325

    private long addAttemptFailureAlertTask(TaskControl rootTask)
    {
        Config config = cf.create();
        config.set("_type", "notify");
        config.set("_command", "Workflow session attempt failed");
        WorkflowTaskList tasks = compiler.compileTasks(rootTask.get().getFullName(), "^failure-alert", config);
        return rootTask.addGeneratedSubtasksWithoutLimit(tasks, ImmutableList.of(), false);
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment