Skip to content

Instantly share code, notes, and snippets.

@odinserj
Last active January 15, 2022 12:56
Show Gist options
  • Save odinserj/4a3bf40606c4da9183588a5a325dfb99 to your computer and use it in GitHub Desktop.
Save odinserj/4a3bf40606c4da9183588a5a325dfb99 to your computer and use it in GitHub Desktop.
MutexAttribute.cs
using System;
using System.Collections.Generic;
using System.Linq;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
namespace Hangfire.Pro
{
/// <summary>
/// Represents a background job filter that helps to disable concurrent execution
/// without causing worker to wait as in <see cref="Hangfire.DisableConcurrentExecutionAttribute"/>.
/// </summary>
public class MutexAttribute : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
{
private static readonly TimeSpan DistributedLockTimeout = TimeSpan.FromMinutes(1);
private readonly string _resource;
public MutexAttribute(string resource)
{
_resource = resource;
RetryInSeconds = 15;
}
public int RetryInSeconds { get; set; }
public int MaxAttempts { get; set; }
public void OnStateElection(ElectStateContext context)
{
// We are intercepting transitions to the Processed state, that is performed by
// a worker just before processing a job. During the state election phase we can
// change the target state to another one, causing a worker not to process the
// backgorund job.
if (context.CandidateState.Name != ProcessingState.StateName ||
context.BackgroundJob.Job == null)
{
return;
}
// This filter requires an extended set of storage operations. It's supported
// by all the official storages, and many of the community-based ones.
var storageConnection = context.Connection as JobStorageConnection;
if (storageConnection == null)
{
throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
}
string blockedBy;
try
{
// Distributed lock is needed here only to prevent a race condition, when another
// worker picks up a background job with the same resource between GET and SET
// operations.
// There will be no race condition, when two or more workers pick up background job
// with the same id, because state transitions are protected with distributed lock
// themselves.
using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
{
// Resource set contains a background job id that acquired a mutex for the resource.
// We are getting only one element to see what background job blocked the invocation.
var range = storageConnection.GetRangeFromSet(
GetResourceKey(context.BackgroundJob.Job.Args),
0,
0);
blockedBy = range.Count > 0 ? range[0] : null;
// We should permit an invocation only when the set is empty, or if current background
// job is already owns a resource. This may happen, when the localTransaction succeeded,
// but outer transaction was failed.
if (blockedBy == null || blockedBy == context.BackgroundJob.Id)
{
// We need to commit the changes inside a distributed lock, otherwise it's
// useless. So we create a local transaction instead of using the
// context.Transaction property.
var localTransaction = context.Connection.CreateWriteTransaction();
// Add the current background job identifier to a resource set. This means
// that resource is owned by the current background job. Identifier will be
// removed only on failed state, or in one of final states (succeeded or
// deleted).
localTransaction.AddToSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
localTransaction.Commit();
// Invocation is permitted, and we did all the required things.
return;
}
}
}
catch (DistributedLockTimeoutException)
{
// We weren't able to acquire a distributed lock within a specified window. This may
// be caused by network delays, storage outages or abandoned locks in some storages.
// Since it is required to expire abandoned locks after some time, we can simply
// postpone the invocation.
context.CandidateState = new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
{
Reason = "Couldn't acquire a distributed lock for mutex: timeout exceeded"
};
return;
}
// Background job execution is blocked. We should change the target state either to
// the Scheduled or to the Deleted one, depending on current retry attempt number.
var currentAttempt = context.GetJobParameter<int>("MutexAttempt") + 1;
context.SetJobParameter("MutexAttempt", currentAttempt);
context.CandidateState = MaxAttempts == 0 || currentAttempt <= MaxAttempts
? CreateScheduledState(blockedBy, currentAttempt)
: CreateDeletedState(blockedBy);
}
public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
if (context.BackgroundJob.Job == null) return;
if (context.OldStateName == ProcessingState.StateName)
{
using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
{
var localTransaction = context.Connection.CreateWriteTransaction();
localTransaction.RemoveFromSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
localTransaction.Commit();
}
}
}
public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
}
private static DeletedState CreateDeletedState(string blockedBy)
{
return new DeletedState
{
Reason = $"Execution was blocked by background job {blockedBy}, all attempts exhausted"
};
}
private IState CreateScheduledState(string blockedBy, int currentAttempt)
{
var reason = $"Execution is blocked by background job {blockedBy}, retry attempt: {currentAttempt}";
if (MaxAttempts > 0)
{
reason += $"/{MaxAttempts}";
}
return new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
{
Reason = reason
};
}
private IDisposable AcquireDistributedSetLock(IStorageConnection connection, IEnumerable<object> args)
{
return connection.AcquireDistributedLock(GetDistributedLockKey(args), DistributedLockTimeout);
}
private string GetDistributedLockKey(IEnumerable<object> args)
{
return $"extension:job-mutex:lock:{GetKeyFormat(args, _resource)}";
}
private string GetResourceKey(IEnumerable<object> args)
{
return $"extension:job-mutex:set:{GetKeyFormat(args, _resource)}";
}
private static string GetKeyFormat(IEnumerable<object> args, string keyFormat)
{
return String.Format(keyFormat, args.ToArray());
}
}
}
@agrath
Copy link

agrath commented Dec 21, 2017

Can you please explain the expected behaviour when using this attribute and there is a long running task which is also scheduled as a recurring job? I understand the automatically scheduled instance should block and retry until it can obtain a lock, but what will we see in the dashboard?

@agrath
Copy link

agrath commented Dec 21, 2017

Never mind, I just tested :) the long running (existing instance) remains in processing, whereas the new instance (which was scheduled to run) sits in the scheduled queue with a message indicating it is being blocked by the job in processing:
blocked

@MaxMelcher
Copy link

This is currently in Hangfire.Pro - will this be ported to Hangfire @odinserj ?

@bbakermmc
Copy link

@odinserj Can we use this in the non pro version? Is there something that is specific to pro only?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment