Skip to content

Instantly share code, notes, and snippets.

@odinserj
Last active March 14, 2024 14:42
  • Star 42 You must be signed in to star a gist
  • Fork 10 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save odinserj/a8332a3f486773baa009 to your computer and use it in GitHub Desktop.
// Zero-Clause BSD (more permissive than MIT, doesn't require copyright notice)
//
// Permission to use, copy, modify, and/or distribute this software for any purpose
// with or without fee is hereby granted.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(1);
public void OnCreating(CreatingContext filterContext)
{
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
{
filterContext.Canceled = true;
}
}
public void OnPerformed(PerformedContext filterContext)
{
RemoveFingerprint(filterContext.Connection, filterContext.Job);
}
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));
DateTimeOffset timestamp;
if (fingerprint != null &&
fingerprint.ContainsKey("Timestamp") &&
DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out timestamp) &&
DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static void RemoveFingerprint(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(GetFingerprintKey(job));
transaction.Commit();
}
}
private static string GetFingerprintLockKey(Job job)
{
return String.Format("{0}:lock", GetFingerprintKey(job));
}
private static string GetFingerprintKey(Job job)
{
return String.Format("fingerprint:{0}", GetFingerprint(job));
}
private static string GetFingerprint(Job job)
{
string parameters = string.Empty;
if (job.Arguments != null)
{
parameters = string.Join(".", job.Arguments);
}
if (job.Type == null || job.Method == null)
{
return string.Empty;
}
var fingerprint = String.Format(
"{0}.{1}.{2}",
job.Type.FullName,
job.Method.Name, parameters);
return fingerprint;
}
void IClientFilter.OnCreated(CreatedContext filterContext)
{
}
void IServerFilter.OnPerforming(PerformingContext filterContext)
{
}
}
@mack0196
Copy link

My use case is that I don't want to enqueue a second job (with same parameter values) until the enqueued job goes to Succeeded or Failed.
From initial testing I am getting the results I want (block successive enqueue until job succeeds or fails) removing the fingerprint in IApplyStateFilter.OnStateApplied. Any feedback?

 public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
    {
        private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);

        private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
        {
            using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
            {
                var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));

                if (fingerprint != null)
                {
                    // Actual fingerprint found, returning.
                    return false;
                }

                // Fingerprint does not exist, it is invalid (no `Timestamp` key),
                // or it is not actual (timeout expired).
                connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
                {
                    { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
                });

                return true;
            }
        }

        private static void RemoveFingerprint(IStorageConnection connection, Job job)
        {
            using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
            using (var transaction = connection.CreateWriteTransaction())
            {
                transaction.RemoveHash(GetFingerprintKey(job));
                transaction.Commit();
            }
        }

        private static string GetFingerprintLockKey(Job job)
        {
            return string.Format("{0}:lock", GetFingerprintKey(job));
        }

        private static string GetFingerprintKey(Job job)
        {
            return string.Format("fingerprint:{0}", GetFingerprint(job));
        }

        private static string GetFingerprint(Job job)
        {
            string parameters = string.Empty;
            if (job?.Args != null)
            {
                parameters = string.Join(".", job.Args);
            }
            if (job?.Type == null || job.Method == null)
            {
                return string.Empty;
            }

            //https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
            var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
            var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
            var fingerprint = Convert.ToBase64String(hash);
            return fingerprint;
        }

        public void OnCreating(CreatingContext filterContext)
        {
            if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
            {
                filterContext.Canceled = true;
            }
        }

        public void OnCreated(CreatedContext filterContext)
        {
            //do nothing
        }


        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
                || context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
            {
                RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            // do nothing
        }
    }

@justinbhopper
Copy link

@mack0196 Your solution is working great for me. I don't have any particular feedback. Thanks for pulling together the final solution from all these comments!

@erwin-faceit
Copy link

Took me a while to figure out an issue I was having with the above code. When running multiple servers with different cultures AND having jobs with either doubles or dates in the arguments, this is going to generate different fingerprints. The main issue could be that not everybody sets the current culture, but the line

parameters = string.Join(".", job.Args);

will invoke ToString() on all arguments, resulting in this behavior.

So, either always set the CurrentCulture, or use something like below:

private static readonly CultureInfo EnUs = new CultureInfo("en-US");

private static string ConvertArgument(object obj)
{
        switch (obj)
        {
                case null:
                        return string.Empty;
                case string s:
                        return s;
                case DateTime dt:
                        return dt.ToString("o"); // ISO8601 date format
                default:
                        return (string) Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
        }
}

and change the line above to

parameters = string.Join(".", job.Args.Select(ConvertArgument));

@mack0196
Copy link

mack0196 commented Feb 2, 2022

Thanks erwin-faceit. Updated.

using Hangfire.Client;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;

namespace Hangfire.Filters
{
    public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
    {
        private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
        private static readonly CultureInfo EnUs = new CultureInfo("en-US");

        /// <summary>
        /// Convert arguments into a culture-aware string
        /// </summary>
        ///<see cref="https://gist.github.com/odinserj/a8332a3f486773baa009?permalink_comment_id=4048344#gistcomment-4048344"/><see>
        private static string ConvertArgument(object obj)
        {
            switch (obj)
            {
                case null:
                    return string.Empty;
                case string s:
                    return s;
                case DateTime dt:
                    return dt.ToString("o"); // ISO8601 date format
                default:
                    return (string)Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
            }
        }

        private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            var distributedLock = connection.AcquireDistributedLock(finterprintLockKey, LockTimeout);
            using (distributedLock)
            {
                var fingerprint = connection.GetAllEntriesFromHash(fingerprintKey);

                if (fingerprint != null)
                {
                    // Actual fingerprint found, returning.
                    return false;
                }

                // Fingerprint does not exist, it is invalid (no `Timestamp` key),
                // or it is not actual (timeout expired).
                connection.SetRangeInHash(fingerprintKey, new Dictionary<string, string>
                    {
                        { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
                    });

                return true;
            }
        }

        private static void RemoveFingerprint(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            using (connection.AcquireDistributedLock(finterprintLockKey, LockTimeout))
            using (var transaction = connection.CreateWriteTransaction())
            {
                transaction.RemoveHash(fingerprintKey);
                transaction.Commit();
            }
        }

        private static string GetFingerprintLockKey(string fingerprintKey)
        {
            return string.Format("{0}:lock", fingerprintKey);
        }

        private static string GetFingerprintKey(Job job)
        {
            return string.Format("fingerprint:{0}", GetFingerprint(job));
        }

        private static string GetFingerprint(Job job)
        {
            string parameters = string.Empty;
            if (job?.Args != null)
            {
                parameters = string.Join(".", job.Args.Select(ConvertArgument));
            }
            if (job?.Type == null || job.Method == null)
            {
                return string.Empty;
            }

            //https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
            var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
            var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
            var fingerprint = Convert.ToBase64String(hash);
            return fingerprint;
        }

        public void OnCreating(CreatingContext filterContext)
        {
            if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
            {
                filterContext.Canceled = true;
            }
        }

        public void OnCreated(CreatedContext filterContext)
        {
            //do nothing
        }


        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
                || context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
            {
                RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            // do nothing
        }
    }
}

@mack0196
Copy link

mack0196 commented Feb 2, 2022

⚠️⚠️⚠️ This works for the Enqueue only; Scheduled jobs will duplicate.

⚠️⚠️⚠️ This filter will break ContinueWith scenarios that expect a jobId string returned from BackgroundJobClient.Enqueue.

@mcastellano
Copy link

@mack0196, I believe you should check for the transition to Deleted state as well for fingerprint removal.

@uciprian
Copy link

uciprian commented Sep 1, 2022

This solution sometimes is generating RedisTimeoutException upon fingerprint removal and in the error message it is something about
SUBSCRIBE fingerprint:60684f8fd3ad130e8c60210b41706448:lock:ev

@mack0196
Copy link

Thanks mcastellano

@afelinczak
Copy link

Hello,
We experienced problem when passing CancellationToken into a job.
https://docs.hangfire.io/en/latest/background-methods/using-cancellation-tokens.html
In our case, when job was completed calculated hash was incorrect as Hangfire replaced it with null in Parameters.
To fix it we changed ConvertArgument method to ignore token.

@HenrikHoyer
Copy link

@afelinczak

To fix it we changed ConvertArgument method to ignore token.

Please share your code changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment