30
How to Schedule Delayed Jobs on a Specific Queue with Hangfire
We use Hangfire at Assetbots to manage and coordinate all our background processing and event handling. While Hangfire comes with a lot of great features out of the box, it lacks the ability to schedule delayed jobs on a specific queue. Luckily for us, Hangfire’s architecture is extremely simple and extensible, so with just a little bit of custom code we can implement this feature ourselves.
At its core, Hangfire treats jobs as individual state machines. There are a number of built-in states that the system ships with, along with their corresponding handlers. Each state’s handler is responsible for transitioning jobs to and from that state in storage.
Another core feature of Hangfire’s architecture is the chain-of-responsibility pipeline. This processing pipeline has a number of stages that can be intercepted using job filters. Each filter can operate on and change the job’s behavior at that point in the pipeline. There can be multiple filters applied, each operating independently, and each applied at different levels of granularity (e.g., at the job or method level, at the class level, or system-wide).
As you might imagine, there are a lot of possibilities afforded by these two simple primitives. In any case, let's use a custom job state and custom queue filter to enable us to schedule delayed jobs on the queue of our choice.
We need to accomplish two things:
EnqueuedState
.ScheduledState
to the EnqueuedState
and using it to enqueue the job to the correct queue.Hangfire ships with an un-sealed state representing delayed, scheduled jobs. And since states in Hangfire are persisted as JSON using JSON.NET, we can simply extend this class and add a custom property to it. This property will be serialized and de-serialized using the default JSON serialization infrastructure.
using System;
using Hangfire.States;
using Newtonsoft.Json;
public sealed class ScheduledQueueState : ScheduledState
{
public ScheduledQueueState(TimeSpan enqueueIn)
: this(DateTime.UtcNow.Add(enqueueIn), null)
{
}
public ScheduledQueueState(DateTime enqueueAt)
: this(enqueueAt, null)
{
}
[JsonConstructor]
public ScheduledQueueState(DateTime enqueueAt, string queue)
: base(enqueueAt)
{
this.Queue = queue?.Trim();
}
public string Queue { get; }
}
This class is extremely straightforward: extend
ScheduledState
and add a Queue
property while maintaining JSON serialization compatibility. Next, we need to take advantage of this new property when moving into the EnqueuedState
.Our
QueueFilter
class will do two things:EnqueuedState
or the ScheduledQueueState
and grab their Queue
property to store as a custom job parameter, andQueue
job parameter when transitioning to (or electing) the EnqueuedState
.
using System;
using Hangfire.Client;
using Hangfire.States;
public sealed class QueueFilter : IClientFilter, IElectStateFilter
{
public const string QueueParameterName = "Queue";
public void OnCreated(CreatedContext filterContext)
{
}
public void OnCreating(CreatingContext filterContext)
{
string queue = null;
switch (filterContext.InitialState)
{
case EnqueuedState es:
queue = es.Queue;
break;
case ScheduledQueueState sqs:
queue = sqs.Queue;
break;
default:
break;
}
if (!string.IsNullOrWhiteSpace(queue))
{
filterContext.SetJobParameter(QueueFilter.QueueParameterName, queue);
}
}
public void OnStateElection(ElectStateContext context)
{
if (context.CandidateState.Name == EnqueuedState.StateName)
{
string queue = context.GetJobParameter<string>(QueueFilter.QueueParameterName)?.Trim();
if (string.IsNullOrWhiteSpace(queue))
{
queue = EnqueuedState.DefaultQueue;
}
context.CandidateState = new EnqueuedState(queue);
}
}
}
Again, this class is pretty simple. We’re using C#’s pattern matching to check if the job is being created with either of the two states we care about. If it is, we are setting a custom parameter that will travel with the job through the pipeline.
Then, when the system is transitioning the job to a new state, we check to see if the state being transitioned to is the
EnqueuedState
. If it is, we look for our previously set custom parameter and use it to create a new version of the EnqueuedState
to transition into instead.Now that we have all the code we need in place, how do we actually wire up the filter and take advantage of our new
ScheduledQueueState
? First, we need to register our QueueFilter
into Hangfire’s processing pipeline:services.AddHangfire(configuration: (services, config) =>
{
config.UseFilter(new QueueFilter());
});
Finally, we can create extension methods corresponding to the
Schedule
overloads that we need. For example:public static string Schedule(
[NotNull] this IBackgroundJobClient client,
[NotNull, InstantHandle] Expression<Action> methodCall,
TimeSpan delay,
string queue)
{
if (client == null)
{
throw new ArgumentNullException(nameof(client));
}
return client.Create(methodCall, new ScheduledQueueState(delay, queue));
}
The above extension will let us schedule delayed jobs that don’t return a
Task
and don’t take a parameter onto the scheduled
queue as follows:// Acquire a reference to IBackgroundJobClient via dependency injection.
private IBackgroundJobClient client;
// When you want to schedule your job.
this.client.Schedule(
() => Console.Log("Hello, world!"),
TimeSpan.FromDays(1),
"scheduled");
It is straightforward from here how to add any other overloads you may need.
We have been happy with Hangfire’s combination of easy setup and rich extensibility while building Assetbots. It has become an important part of our internal infrastructure as we scale our background processing needs.