Skip to content

Instantly share code, notes, and snippets.

@serihiro
Last active January 17, 2019 02:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save serihiro/0bb10d2a9ddcbfde17dbd09bdd4cb2f3 to your computer and use it in GitHub Desktop.
Save serihiro/0bb10d2a9ddcbfde17dbd09bdd4cb2f3 to your computer and use it in GitHub Desktop.
digdagのUTからtaskが実際に実行されるまでの流れ

これは何か

  • treasure-data/digdag#701 このバグを自分で報告したので自分で直せるかなと思ってdigdagのretry機構を調べようした
  • そしたら、そもそもdigdagの全体像が全く分からないことに気付いた
  • テストコードを適当に見ていったらWorkflowExecutorTest.retryOnGroupingTask()のテストケースが、実際にdigファイルを渡してworkflowを実行するテストケースだったので、コレを使ってどのようにdigdagがworkflowを実行しているのかをまず調べることにした <- イマココ

雑に当たりをつけて見つけたoperatorにおけるretry実装箇所

  • 全てのoperatorはBaseOperatorrun methodから実行されるrunTask methodにそのOperatorの処理本体が実装されている
  • この構成はプラグインの参考実装としてリポジトリに含まれているExampleOperatorの実装を読むと分かる
  • 雑にgrepかけたら、実際のretryに関する制御っぽい部分がBaseOperator.run()にあった。

実際のrun() methodにおけるretry機構部分のコードを見てみる

    @Override
    public TaskResult run()
    {
        RetryControl retry = RetryControl.prepare(request.getConfig(), request.getLastStateParams(), false);
        try {
            try {
                return runTask();
            }
            finally {
                workspace.close();
            }
        }
        catch (RuntimeException ex) {
            // Propagate polling TaskExecutionException instances
            if (ex instanceof TaskExecutionException) {
                TaskExecutionException tex = (TaskExecutionException) ex;
                boolean isPolling = !tex.isError();
                if (isPolling) {
                    // TODO: reset retry state params
                    throw tex;
                }
            }

            boolean doRetry = retry.evaluate();
            if (doRetry) {
                throw TaskExecutionException.ofNextPollingWithCause(ex,
                        retry.getNextRetryInterval(),
                        ConfigElement.copyOf(retry.getNextRetryStateParams()));
            }
            else {
                throw ex;
            }
        }
    }
  • 至ってシンプルである
  • runTask 実行時になんらかの例外が投げられた場合、それが TaskExecutionException クラスの例外でなければ(この辺の処理はあとでしらべる)RetryControl クラスのオブジェクトからevaluate() メソッドの返り値のbooleanを見て、trueならなんかそれっぽい例外クラスをthrowしている
  • TaskExecutionException クラスの例外であってもその例外オブジェクトの isError() がfalseならばそれを上にもっかいthrowする。どうやらこれはpollingのためにretry残数に関係なくretryさせるための機構のようだ。
  • ところでこの辺がnestされたtaskのretry挙動に関係ありそうだな。。

Throwされた TaskExecutionException はどこへいった?

                catch (TaskExecutionException ex) {
                    if (ex.getRetryInterval().isPresent()) {
                        if (!ex.getError(cf).isPresent()) {
                            logger.debug("Retrying task {}", ex.toString());
                        }
                        else {
                            logger.error("Task failed, retrying", ex);
                        }
                        callback.retryTask(request.getSiteId(),
                                request.getTaskId(), request.getLockId(), agentId,
                                ex.getRetryInterval().get(), ex.getStateParams(cf).get(),
                                ex.getError(cf));
                    }
                    else {
                        logger.error("Task {} failed.\n{}", request.getTaskName(), formatExceptionMessage(ex));
                        logger.debug("", ex);
                        // TODO use debug to log stacktrace here
                        callback.taskFailed(request.getSiteId(),
                                request.getTaskId(), request.getLockId(), agentId,
                                ex.getError(cf).get());  // TODO is error set?
                    }
                }
  • ex.getRetryInterval().isPresent() がtrueならばretryされるように読める
  • ということはretryしない時は ex.getRetryInterval() がNoneになる?
  • ところで、しれっと isPresent() とか Of()とか、scalaのOptionalクラスみたいなことしてんなと思ったら guava というgoogleが提供するjavaの便利libraryに入っているらしい
  • これがfalseになる条件ってなんだろう?と思ったら TaskExecutionException のprivateなコンストラクタにretryIntervalにOptional.absent()を代入するやつが存在する
  • コレを使って TaskExecutionException のinstanceを作ってthrowする時がretryが終わる時なのだろうか?
  • ちなみにこのコンストラクタはpublicなコンストラクタの一つである これから実行されている
  • このpublicなコンストラクタは例えばこのようなところで使われていることから、やはりretryを抜けるのに使われていると思われる
        catch (Exception e) {
            String formattedErrorMessage = String.format(errorMessage, errorMessageParameters);

            if (!retry(e)) {
                logger.warn("{}: giving up", formattedErrorMessage, e);
                throw new TaskExecutionException(e);
            }

            int retryIteration = retryState.params().get(RETRY, int.class, 0);
            retryState.params().set(RETRY, retryIteration + 1);
            int interval = (int) Math.min(retryInterval.min().getSeconds() * Math.pow(2, retryIteration), retryInterval.max().getSeconds());
            logger.warn("{}: retrying in {} seconds", formattedErrorMessage, interval, e);
            throw state.pollingTaskExecutionException(interval);
        }

大体わかったけど、やっぱり呼び出し元から追ってかないとtaskがどういう風に管理されてどういう風に実行されてるか分からないじゃない

+first:
  echo>: ""
  append_file: out
+doit:
  _retry: 3
  +task1:
    echo>: "try"
    append_file: out
  +task2:
    fail>: task failed expectedly
  • というわけでWorkflowExecutorTestのretryOnGroupingTask() から読んでいく
  • 追ってくと分かるのだが、いきなりTransactionManagerとか出てきてビビるのだが、digdagもDBをjob storeとしたJob Queueのようなアーキテクチャになってるっぽいことが分かる
  • で、jobをDBに登録してattemptIdを生成してWorkflowExecuterのrunUntilDone(long attemptId) に渡されて、ここでなんか終わるまで監視してるっぽい
  • ここにこそ秘密があるに違いないということでここを頑張って読んでいく
  • と思ったがやっていることはWorkflowからtaskを生成してEnqueueするところまでっぽいかった

どこでWorkerが動いてるか分からないのでもっかい調べる

  • 少なくともOperatorManager.callExecutorでoperator.run()が呼ばれているのでここにブレークポイント貼ってtestをdebug実行してみる
  • stack traceを調べたら、main threadは一旦ここで止まった
  • ブレークポイントから辿っていくとMultiThreadAgent.run()が別Threadで動作している
    @Override
    public void run()
    {
        while (!stop) {
            try {
                synchronized (addActiveTaskLock) {
                    if (executor.isShutdown()) {
                        break;
                    }
                    // Because addActiveTaskLock is locked, no one increases activeTaskCount in this synchronized block. Now get the maximum count.
                    int maximumActiveTasks = activeTaskCount.get();
                    // Because the maximum count doesn't increase, here can know that at least N number of threads are idling.
                    int guaranteedAvaialbleThreads = executor.getMaximumPoolSize() - maximumActiveTasks;
                    // Acquire at most guaranteedAvaialbleThreads or 10. This guarantees that all tasks start immediately.
                    int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10);
                    if (maxAcquire > 0) {
                        transactionManager.begin(() -> {
                            List<TaskRequest> reqs = taskServer.lockSharedAgentTasks(maxAcquire, agentId, config.getLockRetentionTime(), 1000);
                            for (TaskRequest req : reqs) {
                                executor.submit(() -> {
                                    try {
                                        runner.run(req);
                                    }
                                    catch (Throwable t) {
                                        logger.error("Uncaught exception. Task queue will detect this failure and this task will be retried later.", t);
                                        errorReporter.reportUncaughtError(t);
                                    }
                                    finally {
                                        activeTaskCount.decrementAndGet();
                                    }
                                });
                                activeTaskCount.incrementAndGet();
                            }
                            return null;
                        });
                    }
                    else {
                        // no executor thread is available. sleep for a while until a task execution finishes
                        addActiveTaskLock.wait(500);
                    }
                }
            }
            catch (Throwable t) {
                logger.error("Uncaught exception during acquiring tasks from a server. Ignoring. Agent thread will be retried.", t);
                errorReporter.reportUncaughtError(t);
                try {
                    // sleep before retrying
                    Thread.sleep(1000);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
  • ここでrunner オブジェクトは OperatorManager インスタンスである
  • request数分だけrun(TaskRequest request) -> runWithHeartbeat(request) -> workspaceManager.withExtractedArchive(request) -> runWithWorkspace(projectPath, request)
  • runWithWorkspace(projectPath, request)の中で callExecutor(projectPath, type, mergedRequest) が実行される
        TaskRequest mergedRequest = TaskRequest.builder()
            .from(request)
            .localConfig(new CheckedConfig(localConfig, usedKeys))
            .config(new CheckedConfig(config, usedKeys))
            .build();

        TaskResult result = callExecutor(projectPath, type, mergedRequest);
  • callExecutorの最後のほうで以下のようにoperatorに引数渡して実行して結果を返してるっぽい
        OperatorContext context = new DefaultOperatorContext(
                projectPath, mergedRequest, secretProvider, privilegedVariables);

        Operator operator = factory.newOperator(context);

        return operator.run();
    @Inject
    public LocalAgentManager(
            AgentConfig config,
            AgentId agentId,
            TaskServerApi taskServer,
            OperatorManager operatorManager,
            TransactionManager transactionManager)
    {
        if (config.getEnabled()) {
            this.agentFactory =
                    () -> new MultiThreadAgent(config, agentId, taskServer, operatorManager, transactionManager, errorReporter);
        }
        else {
            this.agentFactory = null;
        }
    }

    @PostConstruct
    public synchronized void start()
    {
        if (agentFactory != null && thread == null) {
            agent = agentFactory.get();
            Thread thread = new ThreadFactoryBuilder()
                .setDaemon(false)  // tasks taken from the queue should be certainly processed or callbacked to the server
                .setNameFormat("local-agent-%d")
                .build()
                .newThread(agent);
            thread.start();
            this.thread = thread;
        }
    }

改めてSequence図にまとめるとこうなる

(調査中)待てよ、retryはどこでやってんだ?

(調査中)一旦の結論

  • enqueue時にretry分のtaskを個別に追加して実行しているだけ
  • だとするとgroup taskのretryの挙動がおかしいのはこの時に追加されたretry用のtaskがおかしい?
  • taskをretryのたびに全部取ってきて標準出力に吐かせてしらべたら、retry用に複製されたtaskにはupstreamsがセットされていない。このせいでErrorが起きても止まらない。。のか?

調査メモ

  • ⇡のsequense図で WorkflowExecutorrunWhile() の中で enqueueReadyTasks() -> taskIdでぐるぐる -> enqueueTask(dispatcher, taskId)の中に意味深な箇所がある
            if (task.getTaskType().isGroupingOnly()) {
                return retryGroupingTask(lockedTask);
            }
  • debuggerで追うと、 +retry_on_group+doit のtaskだけがこのifでtrueになる。どうやらこれはtaskの直下にtaskの定義しかない場合は isGroupingOnly() がtrue評価される模様
  • retryGroupingTask は以下のように定義されている
    private boolean retryGroupingTask(TaskControl lockedTask)
    {
        // rest task state of subtasks
        StoredTask task = lockedTask.get();

        TaskTree tree = new TaskTree(sm.getTaskRelations(task.getAttemptId()));
        List<Long> childrenIdList = tree.getRecursiveChildrenIdList(task.getId());
        lockedTask.copyInitialTasksForRetry(childrenIdList);

        lockedTask.setGroupRetryReadyToPlanned();

        return true;
    }
  • +retry_on_group+doit taskの中で定義されている子taskのidをリストで取り出して、それをcopyしている
  • lockedTask.copyInitialTasksForRetry(childrenIdList);は以下の通り
        @Override
        public boolean copyInitialTasksForRetry(List<Long> recursiveChildrenIdList)
        {
            List<StoredTask> tasks = handle.createQuery(
                    selectTaskDetailsQuery() + " where t.id " + inLargeIdListExpression(recursiveChildrenIdList) +
                    " and " + bitAnd("t.state_flags", Integer.toString(TaskStateFlags.``)) + " != 0"  // only initial tasks
                )
                .map(stm)
                .list();
            if (tasks.isEmpty()) {
                return false;
            }
            for (StoredTask task : tasks) {
                Task newTask = Task.taskBuilder()
                    .from(task)
                    .state(TaskStateCode.BLOCKED)
                    .stateFlags(TaskStateFlags.empty())
                    .build();
                addSubtask(tasks.get(0).getAttemptId(), newTask);
            }
            return true;
        }
  • 文字通りtaskをcopyして新規taskとして追加している(DBにinsertしている)

  • なお、retryの度に recursiveChildrenIdList は当然増えていくのだが、複製元のtaskは state_flagsINITIAL_TASKとのANDを取ってtrueになるものだけに絞り込んでいるので、1回目に複製されたtaskだけがretryの度に複製される

  • 試しに毎回の retryGroupingTask() が実行される度にその時のtask一覧を全部取ってきて見てみる

+        public List<StoredTask> getAllTasks(){
+            return handle.createQuery(selectTaskDetailsQuery()).map(stm).list();
+        }
+         System.out.println("");
+         lockedTask.getAllTasks().forEach(System.out::println);
StoredTask{id=1, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, fullName=+retry_on_group, config={}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=2, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:34.665Z, startedAt=2018-01-01T12:29:30.740Z, stateParams={}, retryCount=0, parentId=1, fullName=+retry_on_group+first, config={"echo>":"","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=3, attemptId=1, upstreams=[2], updatedAt=2018-01-01T12:29:36.514Z, stateParams={"retry_count":1}, retryCount=1, parentId=1, fullName=+retry_on_group+doit, config={"_retry":3}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=4, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:35.807Z, startedAt=2018-01-01T12:29:34.824Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=5, attemptId=1, upstreams=[4], updatedAt=2018-01-01T12:29:36.469Z, startedAt=2018-01-01T12:29:35.821Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=6, attemptId=1, upstreams=[5], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=7, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:36.514Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=8, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:36.514Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=9, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:36.514Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}

StoredTask{id=1, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, fullName=+retry_on_group, config={}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=2, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:34.665Z, startedAt=2018-01-01T12:29:30.740Z, stateParams={}, retryCount=0, parentId=1, fullName=+retry_on_group+first, config={"echo>":"","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=3, attemptId=1, upstreams=[2], updatedAt=2018-01-01T12:29:37.916Z, stateParams={"retry_count":2}, retryCount=2, parentId=1, fullName=+retry_on_group+doit, config={"_retry":3}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=4, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:35.807Z, startedAt=2018-01-01T12:29:34.824Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=5, attemptId=1, upstreams=[4], updatedAt=2018-01-01T12:29:36.469Z, startedAt=2018-01-01T12:29:35.821Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=6, attemptId=1, upstreams=[5], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=7, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.888Z, startedAt=2018-01-01T12:29:36.654Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=8, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.834Z, startedAt=2018-01-01T12:29:36.659Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{}}
StoredTask{id=9, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.860Z, startedAt=2018-01-01T12:29:36.664Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=10, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.916Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=11, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.916Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=12, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.916Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}

StoredTask{id=1, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, fullName=+retry_on_group, config={}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=2, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:34.665Z, startedAt=2018-01-01T12:29:30.740Z, stateParams={}, retryCount=0, parentId=1, fullName=+retry_on_group+first, config={"echo>":"","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=3, attemptId=1, upstreams=[2], updatedAt=2018-01-01T12:29:38.975Z, stateParams={"retry_count":3}, retryCount=3, parentId=1, fullName=+retry_on_group+doit, config={"_retry":3}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=4, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:35.807Z, startedAt=2018-01-01T12:29:34.824Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=5, attemptId=1, upstreams=[4], updatedAt=2018-01-01T12:29:36.469Z, startedAt=2018-01-01T12:29:35.821Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=6, attemptId=1, upstreams=[5], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=7, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.888Z, startedAt=2018-01-01T12:29:36.654Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=8, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.834Z, startedAt=2018-01-01T12:29:36.659Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{}}
StoredTask{id=9, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.860Z, startedAt=2018-01-01T12:29:36.664Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=10, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.952Z, startedAt=2018-01-01T12:29:38.045Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=11, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.946Z, startedAt=2018-01-01T12:29:38.049Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{}}
StoredTask{id=12, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.922Z, startedAt=2018-01-01T12:29:38.054Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=13, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.975Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=14, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.975Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=15, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.975Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
  • 毎回taskが3つずつ増えていく様子が見えるが、stateを見るとやはり初回と結果が違う
  • よく見たら、retry用にcopyされたtaskにはupstreamsがセットされていなかったことに気づいた
  • 仮説だが、task実行時にupstreamsがセットされていたら、upstreamsにセットされているtaskIdのtaskのstateがすべて success になっていないと実行されないとかそういう挙動をするのではないだろうか
  • なんかWorkflowExecutorこの辺 がその辺やってそうな名前のメソッドを呼んでいるのだが、あとで読む
  • copyしたときに直前のtaskに依存するようにupstreamsを設定するようにしてみた
  • 修正したのは DatabaseSessionStoreManager#copyInitialTasksForRetryこの辺
+            DatabaseTaskControlStore store = new DatabaseTaskControlStore(handle);
+            int index = 0;
             for (StoredTask task : tasks) {
                 Task newTask = Task.taskBuilder()
-                    .from(task)
-                    .state(TaskStateCode.BLOCKED)
-                    .stateFlags(TaskStateFlags.empty())
-                    .build();
-                addSubtask(tasks.get(0).getAttemptId(), newTask);
+                        .from(task)
+                        .state(TaskStateCode.BLOCKED)
+                        .stateFlags(TaskStateFlags.empty())
+                        .build();
+
+                long newTaskId = addSubtask(tasks.get(0).getAttemptId(), newTask);
+                if (index > 0) {
+                    store.addDependencies(newTaskId, Arrays.asList(newTaskId - 1));
+                }
+                ++index;
  • これで以下のworkflowを実行しても、failした後のtaskである +doit+task3が実行されなくなった
+first:
  echo>: ""
  append_file: out
+doit:
  _retry: 3
  +task1:
    echo>: "try"
    append_file: out
  +task2:
    fail>: task failed expectedly
  +task3:
    echo>: "not executed"
    append_file: out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment