Skip to content

Instantly share code, notes, and snippets.

@odinserj
Last active October 25, 2023 13:26
  • Star 42 You must be signed in to star a gist
  • Fork 10 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save odinserj/a8332a3f486773baa009 to your computer and use it in GitHub Desktop.
// Zero-Clause BSD (more permissive than MIT, doesn't require copyright notice)
//
// Permission to use, copy, modify, and/or distribute this software for any purpose
// with or without fee is hereby granted.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(1);
public void OnCreating(CreatingContext filterContext)
{
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
{
filterContext.Canceled = true;
}
}
public void OnPerformed(PerformedContext filterContext)
{
RemoveFingerprint(filterContext.Connection, filterContext.Job);
}
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));
DateTimeOffset timestamp;
if (fingerprint != null &&
fingerprint.ContainsKey("Timestamp") &&
DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out timestamp) &&
DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static void RemoveFingerprint(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(GetFingerprintKey(job));
transaction.Commit();
}
}
private static string GetFingerprintLockKey(Job job)
{
return String.Format("{0}:lock", GetFingerprintKey(job));
}
private static string GetFingerprintKey(Job job)
{
return String.Format("fingerprint:{0}", GetFingerprint(job));
}
private static string GetFingerprint(Job job)
{
string parameters = string.Empty;
if (job.Arguments != null)
{
parameters = string.Join(".", job.Arguments);
}
if (job.Type == null || job.Method == null)
{
return string.Empty;
}
var fingerprint = String.Format(
"{0}.{1}.{2}",
job.Type.FullName,
job.Method.Name, parameters);
return fingerprint;
}
void IClientFilter.OnCreated(CreatedContext filterContext)
{
}
void IServerFilter.OnPerforming(PerformingContext filterContext)
{
}
}
@dmitry-zaets
Copy link

If I'm right - this part will also remove fingerprint if unhanded exception occurred in job.
In case of unhanded exception the job will be rescheduled. And in result you will have jobs duplications.

public void OnPerformed(PerformedContext filterContext)
{
    RemoveFingerprint(filterContext.Connection, filterContext.Job);
}

To avoid this we can check for exception:

public void OnPerformed(PerformedContext filterContext)
{
    if (filterContext.Exception == null || filterContext.ExceptionHandled)
    {
        RemoveFingerprint(filterContext.Connection, filterContext.Job);
    }
}

@dukefama
Copy link

Hi,
Does this mean if I decorate a method I call severally with this, it will stop concurrent execution?

@meywd
Copy link

meywd commented Oct 7, 2016

This causes the exception "String or binary data would be truncated.\r\nThe statement has been terminated" the cause is the output from GetFingerprintKey can be longer than 100 chars which is the field size in the Hash table:

CREATE TABLE [HangFire].[Hash]( [Id] [int] IDENTITY(1,1) NOT NULL, [Key] [nvarchar](100) NOT NULL, [Field] [nvarchar](100) NOT NULL, [Value] [nvarchar](max) NULL, [ExpireAt] [datetime2](7) NULL, CONSTRAINT [PK_HangFire_Hash] PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) )

@Korayem
Copy link

Korayem commented Oct 7, 2016

As @meywd mentioned, and it's caused specifically by this line

            // Fingerprint does not exist, it is invalid (no `Timestamp` key),
            // or it is not actual (timeout expired).
            connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
            {
                { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
            });

@mcarter101
Copy link

I modified the GetFingerprint method to create an SHA256 Hash (converted to Base64). This ensures that no matter how long the method call/parameters are, it will result in a key that is less than 100 characters. Although you most likely won't get collisions, you may want to change the Hangfire.Hash.Key column collation to SQL_Latin1_General_CP1_CS_AS (note CS for case-sensitive).

The fingerprint looks something like: fingerprint:kgmWXBmtpJJk/HKyr5c96H9FbPETduk/5doad/4rOYg=

    private static string GetFingerprint(Job job)
    {
        var parameters = string.Empty;
        if (job?.Arguments != null)
        {
            parameters = string.Join(".", job.Arguments);
        }
        if (job?.Type == null || job.Method == null)
        {
            return string.Empty;
        }
        var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
        var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
        var fingerprint = Convert.ToBase64String(hash);
        return fingerprint;
    }

@marcselman
Copy link

@dmitry-zaets I don't think that will work. If you have set retry attempt to 0 the fingerprint will never be removed. And if you use CancellationTokens when cancelling a job this will throw an Exception and not retry the job so the fingerprint will also not be removed.

I think the original code works just fine:
With the original code when an unhandled exception occurs OnPerformed will be called and the fingerprint will be removed. Then (if there are retry attempts left) a new job will be created and in OnCreating a new fingerprint will be created immediately.

@domagojmedo
Copy link

@marcselman but a new job is not created on retry so fingerprint won't be inserted. I moved fingerprint creation to OnPerforming so fingerprint gets created when job is about to be executed and removed right after, no matter exception or not

@c5racing
Copy link

This is working for me; however, if the service that Hangfire is running under stops, the fingerprint is never removed. Anyone else have this issue or workaround?

@durisv
Copy link

durisv commented Jun 17, 2019

My solution:

class DisableMultipleQueuedItemsFilterAttribute : JobFilterAttribute, IElectStateFilter
    {
        private readonly string _jobName;

        public DisableMultipleQueuedItemsFilterAttribute(string jobName)
        {
            _jobName = jobName;
        }

        public void OnStateElection(ElectStateContext context)
        {
            var processingJobs = context.Storage.GetMonitoringApi().ProcessingJobs(0, 2000);

            foreach (var processingJob in processingJobs)
            {
                if (processingJob.Value.Job.Type.Name.Equals(_jobName, StringComparison.InvariantCultureIgnoreCase) && !context.CandidateState.IsFinal)
                {
                    context.CandidateState = new DeletedState
                    {
                        Reason = $"It is not allowed to perform multiple same tasks."
                    };
                    return;
                }
            }
        }
    }

@sabiland
Copy link

sabiland commented Jan 14, 2020

Do upper solutions works in latest Hangfire 1.7.8 ? How performant (and safe) they are (removing fingerprint, etc.) ?

@durisv are you sure your solution works? All you do is comparing Job's name. What about method input-parameters ? Also your solution fetches always latest 2000 jobs for every new enqueue operation...

@jasenf
Copy link

jasenf commented Jan 18, 2020

Trying this with 1.7.8 and it does not seem to work.

var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));

always seems to return null.

@bokmadsen
Copy link

In addition to @dmitry-zaets's change, I added the IApplyStateFilter interface and added this code, to cleanup failed and deleted jobs. This should handle both retried jobs with 0 attempts and restarted servers that interrupts the job

public void OnStateApplied(ApplyStateContext filterContext, IWriteOnlyTransaction transaction)
{
    try
    {
        var failedState = filterContext.NewState is FailedState;
        var deletedState = filterContext.NewState is DeletedState;
        if (failedState || deletedState)
        {
            RemoveFingerprint(filterContext.Connection, filterContext.BackgroundJob.Job);
        }
    }
    catch (Exception)
    {
        // Unhandled exceptions can cause an endless loop.
        // Therefore, catch and ignore them all.
    }
}

The FingerprintTimeout of one hour does however seem counterproductive, as this filter should prevent concurrent jobs. If a job is running for more than an hour, it will be allowed to start. It's however easy to fix, but mayby it shouldn't check the time?

@suarezafelipe
Copy link

Original GIST should be updated with the latest comments, maybe this should be even added in the official documentation?

Several people seem to need this solution, I am having this problem in an application with hangfire 1.7.x and I'm not sure if this is going to work in production. Reading the comments it seems the original code is not yet production ready

@bradleyuk
Copy link

@bokmadsen have you come up with a solution for the one hour FingerprintTimeout limitation (besides simply increasing the TimeSpan :)

Or anyone else have any suggestions / recommendations... it works fine, except like you say the job runs for more than the fingerprint timeout, at which point a duplicate is scheduled...

@fpodrimqaku
Copy link

@mcarter101 ta ha zemren

@mack0196
Copy link

My use case is that I don't want to enqueue a second job (with same parameter values) until the enqueued job goes to Succeeded or Failed.
From initial testing I am getting the results I want (block successive enqueue until job succeeds or fails) removing the fingerprint in IApplyStateFilter.OnStateApplied. Any feedback?

 public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
    {
        private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);

        private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
        {
            using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
            {
                var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));

                if (fingerprint != null)
                {
                    // Actual fingerprint found, returning.
                    return false;
                }

                // Fingerprint does not exist, it is invalid (no `Timestamp` key),
                // or it is not actual (timeout expired).
                connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
                {
                    { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
                });

                return true;
            }
        }

        private static void RemoveFingerprint(IStorageConnection connection, Job job)
        {
            using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
            using (var transaction = connection.CreateWriteTransaction())
            {
                transaction.RemoveHash(GetFingerprintKey(job));
                transaction.Commit();
            }
        }

        private static string GetFingerprintLockKey(Job job)
        {
            return string.Format("{0}:lock", GetFingerprintKey(job));
        }

        private static string GetFingerprintKey(Job job)
        {
            return string.Format("fingerprint:{0}", GetFingerprint(job));
        }

        private static string GetFingerprint(Job job)
        {
            string parameters = string.Empty;
            if (job?.Args != null)
            {
                parameters = string.Join(".", job.Args);
            }
            if (job?.Type == null || job.Method == null)
            {
                return string.Empty;
            }

            //https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
            var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
            var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
            var fingerprint = Convert.ToBase64String(hash);
            return fingerprint;
        }

        public void OnCreating(CreatingContext filterContext)
        {
            if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
            {
                filterContext.Canceled = true;
            }
        }

        public void OnCreated(CreatedContext filterContext)
        {
            //do nothing
        }


        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
                || context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
            {
                RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            // do nothing
        }
    }

@justinbhopper
Copy link

@mack0196 Your solution is working great for me. I don't have any particular feedback. Thanks for pulling together the final solution from all these comments!

@erwin-faceit
Copy link

Took me a while to figure out an issue I was having with the above code. When running multiple servers with different cultures AND having jobs with either doubles or dates in the arguments, this is going to generate different fingerprints. The main issue could be that not everybody sets the current culture, but the line

parameters = string.Join(".", job.Args);

will invoke ToString() on all arguments, resulting in this behavior.

So, either always set the CurrentCulture, or use something like below:

private static readonly CultureInfo EnUs = new CultureInfo("en-US");

private static string ConvertArgument(object obj)
{
        switch (obj)
        {
                case null:
                        return string.Empty;
                case string s:
                        return s;
                case DateTime dt:
                        return dt.ToString("o"); // ISO8601 date format
                default:
                        return (string) Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
        }
}

and change the line above to

parameters = string.Join(".", job.Args.Select(ConvertArgument));

@mack0196
Copy link

mack0196 commented Feb 2, 2022

Thanks erwin-faceit. Updated.

using Hangfire.Client;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;

namespace Hangfire.Filters
{
    public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
    {
        private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
        private static readonly CultureInfo EnUs = new CultureInfo("en-US");

        /// <summary>
        /// Convert arguments into a culture-aware string
        /// </summary>
        ///<see cref="https://gist.github.com/odinserj/a8332a3f486773baa009?permalink_comment_id=4048344#gistcomment-4048344"/><see>
        private static string ConvertArgument(object obj)
        {
            switch (obj)
            {
                case null:
                    return string.Empty;
                case string s:
                    return s;
                case DateTime dt:
                    return dt.ToString("o"); // ISO8601 date format
                default:
                    return (string)Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
            }
        }

        private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            var distributedLock = connection.AcquireDistributedLock(finterprintLockKey, LockTimeout);
            using (distributedLock)
            {
                var fingerprint = connection.GetAllEntriesFromHash(fingerprintKey);

                if (fingerprint != null)
                {
                    // Actual fingerprint found, returning.
                    return false;
                }

                // Fingerprint does not exist, it is invalid (no `Timestamp` key),
                // or it is not actual (timeout expired).
                connection.SetRangeInHash(fingerprintKey, new Dictionary<string, string>
                    {
                        { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
                    });

                return true;
            }
        }

        private static void RemoveFingerprint(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            using (connection.AcquireDistributedLock(finterprintLockKey, LockTimeout))
            using (var transaction = connection.CreateWriteTransaction())
            {
                transaction.RemoveHash(fingerprintKey);
                transaction.Commit();
            }
        }

        private static string GetFingerprintLockKey(string fingerprintKey)
        {
            return string.Format("{0}:lock", fingerprintKey);
        }

        private static string GetFingerprintKey(Job job)
        {
            return string.Format("fingerprint:{0}", GetFingerprint(job));
        }

        private static string GetFingerprint(Job job)
        {
            string parameters = string.Empty;
            if (job?.Args != null)
            {
                parameters = string.Join(".", job.Args.Select(ConvertArgument));
            }
            if (job?.Type == null || job.Method == null)
            {
                return string.Empty;
            }

            //https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
            var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
            var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
            var fingerprint = Convert.ToBase64String(hash);
            return fingerprint;
        }

        public void OnCreating(CreatingContext filterContext)
        {
            if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
            {
                filterContext.Canceled = true;
            }
        }

        public void OnCreated(CreatedContext filterContext)
        {
            //do nothing
        }


        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
                || context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
            {
                RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            // do nothing
        }
    }
}

@mack0196
Copy link

mack0196 commented Feb 2, 2022

⚠️⚠️⚠️ This works for the Enqueue only; Scheduled jobs will duplicate.

⚠️⚠️⚠️ This filter will break ContinueWith scenarios that expect a jobId string returned from BackgroundJobClient.Enqueue.

@mcastellano
Copy link

@mack0196, I believe you should check for the transition to Deleted state as well for fingerprint removal.

@uciprian
Copy link

uciprian commented Sep 1, 2022

This solution sometimes is generating RedisTimeoutException upon fingerprint removal and in the error message it is something about
SUBSCRIBE fingerprint:60684f8fd3ad130e8c60210b41706448:lock:ev

@mack0196
Copy link

Thanks mcastellano

@afelinczak
Copy link

Hello,
We experienced problem when passing CancellationToken into a job.
https://docs.hangfire.io/en/latest/background-methods/using-cancellation-tokens.html
In our case, when job was completed calculated hash was incorrect as Hangfire replaced it with null in Parameters.
To fix it we changed ConvertArgument method to ignore token.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment