Skip to content

Instantly share code, notes, and snippets.

@AndrMoiseev
Last active July 31, 2023 10:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AndrMoiseev/6fc3d065172493d5ba6edea3e54252b5 to your computer and use it in GitHub Desktop.
Save AndrMoiseev/6fc3d065172493d5ba6edea3e54252b5 to your computer and use it in GitHub Desktop.
Problematic Workflow
@WorkflowInterface
public interface ReconcileFilesMetadataWorkflow {
String ID = "ReconcileFilesMetadata";
@WorkflowMethod(name = ID)
long reconcileFilesMetadata(ReconcileRequest request);
@QueryMethod
Stat getStat();
class Impl implements ReconcileFilesMetadataWorkflow {
private static final int BATCH_SIZE = 1000;
private static final int MAX_HISTORY_LENGTH = 500;
private EnumerateFilesActivity enumerateFiles = Workflow.newActivityStub(
EnumerateFilesActivity.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.validateAndBuildWithDefaults());
private static final ReconcileFilesMetadataActivity reconcileMetadata = Workflow.newActivityStub(
ReconcileFilesMetadataActivity.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofMinutes(10))
.setHeartbeatTimeout(Duration.ofSeconds(30))
.validateAndBuildWithDefaults());
private Stat stat;
private PageToken nextPageToken;
@Override
public long reconcileFilesMetadata(ReconcileRequest request) {
nextPageToken = request.getToken();
stat = request.getStat();
var totalCount = enumerateFiles.getFilesWithContentCount();
if (totalCount == 0)
return 0;
stat = stat.setTotalFiles(totalCount);
while (true) {
var response = reconcileMetadata.reconcileMetadata(nextPageToken, BATCH_SIZE);
if (response.getNextPageToken() == null)
return stat.getFixedFiles();
stat = stat.update(response.getProcessedCount(), response.getFixedCount());
nextPageToken = response.getNextPageToken();
if (Workflow.getInfo().getHistoryLength() > MAX_HISTORY_LENGTH)
Workflow.continueAsNew(new ReconcileRequest(nextPageToken, stat));
}
}
@Override
public Stat getStat() {
return stat;
}
}
@Value
@With(AccessLevel.PRIVATE)
class Stat {
public static Stat start() {
return new Stat(0, 0, 0);
}
private long totalFiles;
private long processedFiles;
private long fixedFiles;
public Stat setTotalFiles(long count) {
return withTotalFiles(count);
}
public Stat update(long processedCount, long fixedCount) {
return withProcessedFiles(processedFiles + processedCount)
.withFixedFiles(fixedFiles + fixedCount);
}
}
@Value
class ReconcileRequest {
public static ReconcileRequest scanAllFiles() {
return new ReconcileRequest(null, Stat.start());
}
@Nullable
PageToken token;
Stat stat;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment