Skip to content

Instantly share code, notes, and snippets.

@distantcam
Last active August 17, 2023 15:54
Show Gist options
  • Star 22 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save distantcam/64cf44d84441e5c45e197f7d90c6df3e to your computer and use it in GitHub Desktop.
Save distantcam/64cf44d84441e5c45e197f7d90c6df3e to your computer and use it in GitHub Desktop.
Unity Job system with async

This is some simple helper code to run Unity jobs in an async context.

To use, run your async function using JobHelper.RunAsync(() => RunJob());

In your job you can await JobHandle or call await ScheduleAsync()

await job.Schedule(length, batchSize);

await job.ScheduleAwait(length, batchSize);

If you use ScheduleAwait the job will be chained to the previous job, so

await job.ScheduleAwait(length, batchSize);

// Is actually

await job.Schedule(length, batchSize, previousJobHandle);
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Unity.Jobs;
public static class JobHelper
{
public static void RunAsync(Action asyncMethod)
{
if (asyncMethod == null) throw new ArgumentNullException("asyncMethod");
var prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new UnitySynchronizationContext(true);
SynchronizationContext.SetSynchronizationContext(syncCtx);
syncCtx.OperationStarted();
asyncMethod();
syncCtx.OperationCompleted();
syncCtx.RunOnCurrentThread();
}
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
public static void RunAsync(Func<Task> asyncMethod)
{
if (asyncMethod == null) throw new ArgumentNullException("asyncMethod");
var prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new UnitySynchronizationContext(false);
SynchronizationContext.SetSynchronizationContext(syncCtx);
var t = asyncMethod();
if (t == null) throw new InvalidOperationException("No task provided.");
t.ContinueWith(delegate { syncCtx.Complete(); }, TaskScheduler.Default);
syncCtx.RunOnCurrentThread();
t.GetAwaiter().GetResult();
}
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
public static T RunAsync<T>(Func<Task<T>> asyncMethod)
{
if (asyncMethod == null) throw new ArgumentNullException("asyncMethod");
var prevCtx = SynchronizationContext.Current;
try
{
var syncCtx = new UnitySynchronizationContext(false);
SynchronizationContext.SetSynchronizationContext(syncCtx);
var t = asyncMethod();
if (t == null) throw new InvalidOperationException("No task provided.");
t.ContinueWith(delegate { syncCtx.Complete(); }, TaskScheduler.Default);
syncCtx.RunOnCurrentThread();
return t.GetAwaiter().GetResult();
}
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
public static JobHandleAwaiter GetAwaiter(this JobHandle jobHandle) => new JobHandleAwaiter(jobHandle);
public static JobHandleAwaiter ScheduleAsync<T>(this T job) where T : struct, IJob
{
var unitySC = SynchronizationContext.Current as UnitySynchronizationContext;
if (unitySC == null)
throw new InvalidOperationException("Awaiting jobs must be done in the UnitySynchronizationContext");
var previousHandle = unitySC.CurrentHandle;
var handle = job.Schedule(previousHandle);
unitySC.CurrentHandle = handle;
return new JobHandleAwaiter(handle);
}
public static JobHandleAwaiter ScheduleAsync<T>(this T job, int arrayLength, int innerloopBatchCount) where T : struct, IJobParallelFor
{
var unitySC = SynchronizationContext.Current as UnitySynchronizationContext;
if (unitySC == null)
throw new InvalidOperationException("Awaiting jobs must be done in the UnitySynchronizationContext");
var previousHandle = unitySC.CurrentHandle;
var handle = job.Schedule(arrayLength, innerloopBatchCount, previousHandle);
unitySC.CurrentHandle = handle;
return new JobHandleAwaiter(handle);
}
private sealed class UnitySynchronizationContext : SynchronizationContext
{
private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>> m_queue =
new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
private int m_operationCount = 0;
private readonly bool m_trackOperations;
public UnitySynchronizationContext(bool trackOperations)
{
m_trackOperations = trackOperations;
}
public JobHandle CurrentHandle { get; set; } = default(JobHandle);
public override void Post(SendOrPostCallback d, object state)
{
m_queue.Add(new KeyValuePair<SendOrPostCallback, object>(d, state));
}
public override void Send(SendOrPostCallback d, object state)
{
throw new NotSupportedException("Synchronously sending is not supported.");
}
public void RunOnCurrentThread()
{
foreach (var workItem in m_queue.GetConsumingEnumerable())
workItem.Key(workItem.Value);
}
public void Complete() { m_queue.CompleteAdding(); }
public override void OperationStarted()
{
if (m_trackOperations)
Interlocked.Increment(ref m_operationCount);
}
public override void OperationCompleted()
{
if (m_trackOperations &&
Interlocked.Decrement(ref m_operationCount) == 0)
Complete();
}
}
public struct JobHandleAwaiter : INotifyCompletion
{
readonly JobHandle jobHandle;
public JobHandleAwaiter(JobHandle jobHandle)
{
this.jobHandle = jobHandle;
}
public JobHandleAwaiter GetAwaiter() => this;
public bool IsCompleted => jobHandle.IsCompleted;
public void OnCompleted(Action continuation)
{
jobHandle.Complete();
continuation();
}
public void GetResult() { }
}
}
@rickomax
Copy link

I've been doing some tests, and looks like JobHelper.RunAsync(() => RunJob()); still awaits until the job completion to run the next code-line on the caller thread...

@Sky-Fleet-Developer
Copy link

Вроде работает, спасибо большое!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment