Last active
July 31, 2023 10:51
-
-
Save AndrMoiseev/6fc3d065172493d5ba6edea3e54252b5 to your computer and use it in GitHub Desktop.
Problematic Workflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@WorkflowInterface | |
public interface ReconcileFilesMetadataWorkflow { | |
String ID = "ReconcileFilesMetadata"; | |
@WorkflowMethod(name = ID) | |
long reconcileFilesMetadata(ReconcileRequest request); | |
@QueryMethod | |
Stat getStat(); | |
class Impl implements ReconcileFilesMetadataWorkflow { | |
private static final int BATCH_SIZE = 1000; | |
private static final int MAX_HISTORY_LENGTH = 500; | |
private EnumerateFilesActivity enumerateFiles = Workflow.newActivityStub( | |
EnumerateFilesActivity.class, | |
ActivityOptions.newBuilder() | |
.setStartToCloseTimeout(Duration.ofSeconds(10)) | |
.validateAndBuildWithDefaults()); | |
private static final ReconcileFilesMetadataActivity reconcileMetadata = Workflow.newActivityStub( | |
ReconcileFilesMetadataActivity.class, | |
ActivityOptions.newBuilder() | |
.setStartToCloseTimeout(Duration.ofMinutes(10)) | |
.setHeartbeatTimeout(Duration.ofSeconds(30)) | |
.validateAndBuildWithDefaults()); | |
private Stat stat; | |
private PageToken nextPageToken; | |
@Override | |
public long reconcileFilesMetadata(ReconcileRequest request) { | |
nextPageToken = request.getToken(); | |
stat = request.getStat(); | |
var totalCount = enumerateFiles.getFilesWithContentCount(); | |
if (totalCount == 0) | |
return 0; | |
stat = stat.setTotalFiles(totalCount); | |
while (true) { | |
var response = reconcileMetadata.reconcileMetadata(nextPageToken, BATCH_SIZE); | |
if (response.getNextPageToken() == null) | |
return stat.getFixedFiles(); | |
stat = stat.update(response.getProcessedCount(), response.getFixedCount()); | |
nextPageToken = response.getNextPageToken(); | |
if (Workflow.getInfo().getHistoryLength() > MAX_HISTORY_LENGTH) | |
Workflow.continueAsNew(new ReconcileRequest(nextPageToken, stat)); | |
} | |
} | |
@Override | |
public Stat getStat() { | |
return stat; | |
} | |
} | |
@Value | |
@With(AccessLevel.PRIVATE) | |
class Stat { | |
public static Stat start() { | |
return new Stat(0, 0, 0); | |
} | |
private long totalFiles; | |
private long processedFiles; | |
private long fixedFiles; | |
public Stat setTotalFiles(long count) { | |
return withTotalFiles(count); | |
} | |
public Stat update(long processedCount, long fixedCount) { | |
return withProcessedFiles(processedFiles + processedCount) | |
.withFixedFiles(fixedFiles + fixedCount); | |
} | |
} | |
@Value | |
class ReconcileRequest { | |
public static ReconcileRequest scanAllFiles() { | |
return new ReconcileRequest(null, Stat.start()); | |
} | |
@Nullable | |
PageToken token; | |
Stat stat; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment