26
How to Schedule Delayed Jobs on a Specific Queue with Hangfire
We use Hangfire at Assetbots to manage and coordinate all our background processing and event handling. While Hangfire comes with a lot of great features out of the box, it lacks the ability to schedule delayed jobs on a specific queue. Luckily for us, Hangfire’s architecture is extremely simple and extensible, so with just a little bit of custom code we can implement this feature ourselves.
At its core, Hangfire treats jobs as individual state machines. There are a number of built-in states that the system ships with, along with their corresponding handlers. Each state’s handler is responsible for transitioning jobs to and from that state in storage.
For example, the EnqueuedState
handler adds jobs to their corresponding queue in storage. Compare that to the ScheduledState
handler, which sets a timestamp on a custom scheduled
metadata key in storage that indicates when the job should be enqueued.
Another core feature of Hangfire’s architecture is the chain-of-responsibility pipeline. This processing pipeline has a number of stages that can be intercepted using job filters. Each filter can operate on and change the job’s behavior at that point in the pipeline. There can be multiple filters applied, each operating independently, and each applied at different levels of granularity (e.g., at the job or method level, at the class level, or system-wide).
Using filters, you can extend Hangfire to implement things like logging each state transition or reporting unhandled errors to Bugsnag or Sentry.
As you might imagine, there are a lot of possibilities afforded by these two simple primitives. In any case, let's use a custom job state and custom queue filter to enable us to schedule delayed jobs on the queue of our choice.
We need to accomplish two things:
- Add a custom piece of metadata to the scheduled job indicating what queue we want to be enqueued onto when we transition to the
EnqueuedState
. - Read our custom queue metadata when transitioning from the
ScheduledState
to theEnqueuedState
and using it to enqueue the job to the correct queue.
Hangfire ships with an un-sealed state representing delayed, scheduled jobs. And since states in Hangfire are persisted as JSON using JSON.NET, we can simply extend this class and add a custom property to it. This property will be serialized and de-serialized using the default JSON serialization infrastructure.
using System;
using Hangfire.States;
using Newtonsoft.Json;
public sealed class ScheduledQueueState : ScheduledState
{
public ScheduledQueueState(TimeSpan enqueueIn)
: this(DateTime.UtcNow.Add(enqueueIn), null)
{
}
public ScheduledQueueState(DateTime enqueueAt)
: this(enqueueAt, null)
{
}
[JsonConstructor]
public ScheduledQueueState(DateTime enqueueAt, string queue)
: base(enqueueAt)
{
this.Queue = queue?.Trim();
}
public string Queue { get; }
}
This class is extremely straightforward: extend ScheduledState
and add a Queue
property while maintaining JSON serialization compatibility. Next, we need to take advantage of this new property when moving into the EnqueuedState
.
Our QueueFilter
class will do two things:
- Watch for jobs being created in either the
EnqueuedState
or theScheduledQueueState
and grab theirQueue
property to store as a custom job parameter, and - Use our custom
Queue
job parameter when transitioning to (or electing) theEnqueuedState
.
using System;
using Hangfire.Client;
using Hangfire.States;
public sealed class QueueFilter : IClientFilter, IElectStateFilter
{
public const string QueueParameterName = "Queue";
public void OnCreated(CreatedContext filterContext)
{
}
public void OnCreating(CreatingContext filterContext)
{
string queue = null;
switch (filterContext.InitialState)
{
case EnqueuedState es:
queue = es.Queue;
break;
case ScheduledQueueState sqs:
queue = sqs.Queue;
break;
default:
break;
}
if (!string.IsNullOrWhiteSpace(queue))
{
filterContext.SetJobParameter(QueueFilter.QueueParameterName, queue);
}
}
public void OnStateElection(ElectStateContext context)
{
if (context.CandidateState.Name == EnqueuedState.StateName)
{
string queue = context.GetJobParameter<string>(QueueFilter.QueueParameterName)?.Trim();
if (string.IsNullOrWhiteSpace(queue))
{
queue = EnqueuedState.DefaultQueue;
}
context.CandidateState = new EnqueuedState(queue);
}
}
}
Again, this class is pretty simple. We’re using C#’s pattern matching to check if the job is being created with either of the two states we care about. If it is, we are setting a custom parameter that will travel with the job through the pipeline.
Then, when the system is transitioning the job to a new state, we check to see if the state being transitioned to is the EnqueuedState
. If it is, we look for our previously set custom parameter and use it to create a new version of the EnqueuedState
to transition into instead.
Now that we have all the code we need in place, how do we actually wire up the filter and take advantage of our new ScheduledQueueState
? First, we need to register our QueueFilter
into Hangfire’s processing pipeline:
services.AddHangfire(configuration: (services, config) =>
{
config.UseFilter(new QueueFilter());
});
Finally, we can create extension methods corresponding to the Schedule
overloads that we need. For example:
public static string Schedule(
[NotNull] this IBackgroundJobClient client,
[NotNull, InstantHandle] Expression<Action> methodCall,
TimeSpan delay,
string queue)
{
if (client == null)
{
throw new ArgumentNullException(nameof(client));
}
return client.Create(methodCall, new ScheduledQueueState(delay, queue));
}
The above extension will let us schedule delayed jobs that don’t return a Task
and don’t take a parameter onto the scheduled
queue as follows:
// Acquire a reference to IBackgroundJobClient via dependency injection.
private IBackgroundJobClient client;
// When you want to schedule your job.
this.client.Schedule(
() => Console.Log("Hello, world!"),
TimeSpan.FromDays(1),
"scheduled");
It is straightforward from here how to add any other overloads you may need.
We have been happy with Hangfire’s combination of easy setup and rich extensibility while building Assetbots. It has become an important part of our internal infrastructure as we scale our background processing needs.
26