Skip to content

Instantly share code, notes, and snippets.

@odinserj
Last active March 14, 2024 14:42
Show Gist options
  • Star 42 You must be signed in to star a gist
  • Fork 10 You must be signed in to fork a gist
  • Save odinserj/a8332a3f486773baa009 to your computer and use it in GitHub Desktop.
Save odinserj/a8332a3f486773baa009 to your computer and use it in GitHub Desktop.
// Zero-Clause BSD (more permissive than MIT, doesn't require copyright notice)
//
// Permission to use, copy, modify, and/or distribute this software for any purpose
// with or without fee is hereby granted.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(1);
public void OnCreating(CreatingContext filterContext)
{
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
{
filterContext.Canceled = true;
}
}
public void OnPerformed(PerformedContext filterContext)
{
RemoveFingerprint(filterContext.Connection, filterContext.Job);
}
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));
DateTimeOffset timestamp;
if (fingerprint != null &&
fingerprint.ContainsKey("Timestamp") &&
DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out timestamp) &&
DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static void RemoveFingerprint(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(GetFingerprintKey(job));
transaction.Commit();
}
}
private static string GetFingerprintLockKey(Job job)
{
return String.Format("{0}:lock", GetFingerprintKey(job));
}
private static string GetFingerprintKey(Job job)
{
return String.Format("fingerprint:{0}", GetFingerprint(job));
}
private static string GetFingerprint(Job job)
{
string parameters = string.Empty;
if (job.Arguments != null)
{
parameters = string.Join(".", job.Arguments);
}
if (job.Type == null || job.Method == null)
{
return string.Empty;
}
var fingerprint = String.Format(
"{0}.{1}.{2}",
job.Type.FullName,
job.Method.Name, parameters);
return fingerprint;
}
void IClientFilter.OnCreated(CreatedContext filterContext)
{
}
void IServerFilter.OnPerforming(PerformingContext filterContext)
{
}
}
@mack0196
Copy link

mack0196 commented Feb 2, 2022

Thanks erwin-faceit. Updated.

using Hangfire.Client;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;

namespace Hangfire.Filters
{
    public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
    {
        private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
        private static readonly CultureInfo EnUs = new CultureInfo("en-US");

        /// <summary>
        /// Convert arguments into a culture-aware string
        /// </summary>
        ///<see cref="https://gist.github.com/odinserj/a8332a3f486773baa009?permalink_comment_id=4048344#gistcomment-4048344"/><see>
        private static string ConvertArgument(object obj)
        {
            switch (obj)
            {
                case null:
                    return string.Empty;
                case string s:
                    return s;
                case DateTime dt:
                    return dt.ToString("o"); // ISO8601 date format
                default:
                    return (string)Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
            }
        }

        private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            var distributedLock = connection.AcquireDistributedLock(finterprintLockKey, LockTimeout);
            using (distributedLock)
            {
                var fingerprint = connection.GetAllEntriesFromHash(fingerprintKey);

                if (fingerprint != null)
                {
                    // Actual fingerprint found, returning.
                    return false;
                }

                // Fingerprint does not exist, it is invalid (no `Timestamp` key),
                // or it is not actual (timeout expired).
                connection.SetRangeInHash(fingerprintKey, new Dictionary<string, string>
                    {
                        { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
                    });

                return true;
            }
        }

        private static void RemoveFingerprint(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            using (connection.AcquireDistributedLock(finterprintLockKey, LockTimeout))
            using (var transaction = connection.CreateWriteTransaction())
            {
                transaction.RemoveHash(fingerprintKey);
                transaction.Commit();
            }
        }

        private static string GetFingerprintLockKey(string fingerprintKey)
        {
            return string.Format("{0}:lock", fingerprintKey);
        }

        private static string GetFingerprintKey(Job job)
        {
            return string.Format("fingerprint:{0}", GetFingerprint(job));
        }

        private static string GetFingerprint(Job job)
        {
            string parameters = string.Empty;
            if (job?.Args != null)
            {
                parameters = string.Join(".", job.Args.Select(ConvertArgument));
            }
            if (job?.Type == null || job.Method == null)
            {
                return string.Empty;
            }

            //https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
            var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
            var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
            var fingerprint = Convert.ToBase64String(hash);
            return fingerprint;
        }

        public void OnCreating(CreatingContext filterContext)
        {
            if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
            {
                filterContext.Canceled = true;
            }
        }

        public void OnCreated(CreatedContext filterContext)
        {
            //do nothing
        }


        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
                || context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
            {
                RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            // do nothing
        }
    }
}

@mack0196
Copy link

mack0196 commented Feb 2, 2022

⚠️⚠️⚠️ This works for the Enqueue only; Scheduled jobs will duplicate.

⚠️⚠️⚠️ This filter will break ContinueWith scenarios that expect a jobId string returned from BackgroundJobClient.Enqueue.

@mcastellano
Copy link

@mack0196, I believe you should check for the transition to Deleted state as well for fingerprint removal.

@uciprian
Copy link

uciprian commented Sep 1, 2022

This solution sometimes is generating RedisTimeoutException upon fingerprint removal and in the error message it is something about
SUBSCRIBE fingerprint:60684f8fd3ad130e8c60210b41706448:lock:ev

@mack0196
Copy link

Thanks mcastellano

@afelinczak
Copy link

Hello,
We experienced problem when passing CancellationToken into a job.
https://docs.hangfire.io/en/latest/background-methods/using-cancellation-tokens.html
In our case, when job was completed calculated hash was incorrect as Hangfire replaced it with null in Parameters.
To fix it we changed ConvertArgument method to ignore token.

@HenrikHoyer
Copy link

@afelinczak

To fix it we changed ConvertArgument method to ignore token.

Please share your code changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment