Skip to content

Instantly share code, notes, and snippets.

@snaury
Created September 4, 2023 13:56
Show Gist options
  • Save snaury/a138b6f91234b46a71d3c2d1540842d8 to your computer and use it in GitHub Desktop.
Save snaury/a138b6f91234b46a71d3c2d1540842d8 to your computer and use it in GitHub Desktop.
diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp
index e210fbb534b..bc2799e27a0 100644
--- a/stdlib/public/Concurrency/Actor.cpp
+++ b/stdlib/public/Concurrency/Actor.cpp
@@ -1052,6 +1052,12 @@ static JobRef getNextJobInQueue(Job *job) {
static void setNextJobInQueue(Job *job, JobRef next) {
*reinterpret_cast<JobRef*>(job->SchedulerPrivate) = next;
}
+static Job* getLastJobInQueue(Job *job) {
+ return *reinterpret_cast<Job**>(job->SchedulerPrivate + 1);
+}
+static void setLastJobInQueue(Job *job, Job *last) {
+ *reinterpret_cast<Job**>(job->SchedulerPrivate + 1) = last;
+}
namespace {
@@ -1062,6 +1068,12 @@ struct JobQueueTraits {
static void setNext(Job *job, Job *next) {
setNextJobInQueue(job, JobRef::getPreprocessed(next));
}
+ static Job *getLast(Job *job) {
+ return getLastJobInQueue(job);
+ }
+ static void setLast(Job *job, Job *last) {
+ setLastJobInQueue(job, last);
+ }
static int compare(Job *lhs, Job *rhs) {
return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority());
}
@@ -1089,8 +1101,10 @@ preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingPro
assert(unprocessedStart.getAsJob() != unprocessedEnd.getAsJob());
// Build up a list of jobs we need to preprocess
- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
- ListMerger jobsToProcess;
+ // using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
+ // ListMerger jobsToProcess;
+ Job *newFirst = NULL;
+ Job *newLast = NULL;
// Get just the prefix list of unprocessed jobs
auto current = unprocessedStart;
@@ -1100,16 +1114,26 @@ preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingPro
auto job = current.getAsJob();
current = getNextJobInQueue(job);
- jobsToProcess.insertAtFront(job);
+ // jobsToProcess.insertAtFront(job);
+ JobQueueTraits::setNext(job, newFirst);
+ newFirst = job;
+ if (!newLast) {
+ newLast = job;
+ }
}
// Finish processing the unprocessed jobs
- Job *newProcessedJobs = jobsToProcess.release();
- assert(newProcessedJobs);
-
- ListMerger mergedList(existingProcessedJobsToMergeInto);
- mergedList.merge(newProcessedJobs);
- return mergedList.release();
+ // Job *newProcessedJobs = jobsToProcess.release();
+ // assert(newProcessedJobs);
+ assert(newFirst);
+
+ // ListMerger mergedList(existingProcessedJobsToMergeInto);
+ // mergedList.merge(newProcessedJobs);
+ // return mergedList.release();
+ Job *existingLast = JobQueueTraits::getLast(existingProcessedJobsToMergeInto);
+ JobQueueTraits::setNext(existingLast, newFirst);
+ JobQueueTraits::setLast(existingProcessedJobsToMergeInto, newLast);
+ return existingProcessedJobsToMergeInto;
}
// Called with the actor drain lock held.
@@ -1129,8 +1153,10 @@ preprocessQueue(JobRef start) {
// There exist some jobs which haven't been preprocessed
// Build up a list of jobs we need to preprocess
- using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
- ListMerger jobsToProcess;
+ // using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
+ // ListMerger jobsToProcess;
+ Job *newFirst = NULL;
+ Job *newLast = NULL;
Job *wellFormedListStart = NULL;
@@ -1148,23 +1174,34 @@ preprocessQueue(JobRef start) {
auto job = current.getAsJob();
current = getNextJobInQueue(job);
- jobsToProcess.insertAtFront(job);
+ // jobsToProcess.insertAtFront(job);
+ JobQueueTraits::setNext(job, newFirst);
+ newFirst = job;
+ if (!newLast) {
+ newLast = job;
+ }
}
// Finish processing the unprocessed jobs
- auto processedJobHead = jobsToProcess.release();
- assert(processedJobHead);
+ // auto processedJobHead = jobsToProcess.release();
+ // assert(processedJobHead);
+ assert(newFirst);
Job *firstJob = NULL;
if (wellFormedListStart) {
// Merge it with already known well formed list if we have one.
- ListMerger mergedList(wellFormedListStart);
- mergedList.merge(processedJobHead);
- firstJob = mergedList.release();
+ // ListMerger mergedList(wellFormedListStart);
+ // mergedList.merge(processedJobHead);
+ // firstJob = mergedList.release();
+ firstJob = wellFormedListStart;
+ Job *existingLast = JobQueueTraits::getLast(firstJob);
+ JobQueueTraits::setNext(existingLast, newFirst);
} else {
// Nothing to merge with, just return the head we already have
- firstJob = processedJobHead;
+ // firstJob = processedJobHead;
+ firstJob = newFirst;
}
+ JobQueueTraits::setLast(firstJob, newLast);
return firstJob;
}
@@ -1365,7 +1402,16 @@ Job * DefaultActorImpl::drainOne() {
auto newState = oldState;
// Dequeue the first job and set up a new head
- newState = newState.withFirstJob(getNextJobInQueue(firstJob));
+ JobRef nextRef = getNextJobInQueue(firstJob);
+ if (nextRef) {
+ assert(!nextRef.needsPreprocessing());
+ Job* next = nextRef.getAsPreprocessedJob();
+ Job* last = JobQueueTraits::getLast(firstJob);
+ assert(last != firstJob);
+ JobQueueTraits::setLast(next, last);
+ }
+
+ newState = newState.withFirstJob(nextRef);
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment