20
Pipeline pattern - intermittent flow
Having implementation of pipeline pattern with error handling in place, let's start working on more sophisticated pipeline solution.
Let’s define additional requirements for pipeline implementation:
- it allows to define an intermittent flow,
- it allows to define step to read a message from the queue,
- it allows to define step to send a message to a different queue,
- it is terminated after sending out a message,
- it allows to continue from a certain step after receiving response.
On the picture below you can find desired flow diagram:
The goal of ‘In Message’ step is to read a message from the queue. It means, message has to be delivered into this specific pipe step. In other words, we must be sure that message from the queue is part of defined pipe model. To ensure this, let’s create an interface which will contain such property:
public interface IInQueuePipeModel
{
string InMessage { get; set; }
}
As you can see on flow diagram, it is possible to define multiple ‘In Message’ steps for reading messages from different queues. How does pipeline know which step should be invoked for a particular queue?
To receive a message from the queue or send out a message to some queue, it is required to know queue name. The queue name is key here and this is exactly what is going to be used to distinguish between different ‘In Message’ steps. So there are two things needed:
- queue name present in pipe model to have information from which queue message was received,
- queue name defined for specific 'In Message' step, so pipeline can figure out where is starting point for pipe execution.
To force queue name definition in pipe model, let’s define QueueName
property in IInQueuePipeModel
interface:
public interface IInQueuePipeModel
{
string QueueName { get; set; }
string InMessage { get; set; }
}
Additionally, separate interface for ‘In Message’ step is needed. It will ensure queue name presence for appropriate step:
public interface IInQueuePipeStep<TPipeModel> : IPipeStep<TPipeModel> where
TPipeModel : IInQueuePipeModel
{
string InQueueName { get; }
}
Notice I added generic type constraint on TPipeModel
, meaning that the type parameter TPipeModel
implements the IInQueuePipeModel
interface.
Last part is ‘Out Message’ step. This step is going to be used for preparing request message for some external queue. Additionally, after this step, pipe is going to be terminated until receiving response.
To achieve that I need specific interface for ‘Out Message’ step which will indicate breaking point in the pipeline:
public interface IOutQueuePipeStep<TPipeModel> : IPipeStep<TPipeModel> where TPipeModel : IInQueuePipeModel
{
}
Also, I introduced yet another interface, so when needed, pipe model will implement property for outgoing messages:
public interface IOutQueuePipeModel
{
string OutMessage { get; set; }
}
IOutQueuePipeModel
interface is not required. Since there are different ways to implement sending out messages, such property is not always needed in pipe model. Add it when you are sure you will make use of it.
Even though whole pipeline is defined as one piece, in runtime it will be split into multiple executions, as flow diagram shows. In other words there is going to be a mechanism in place that will extract only steps that are required for a particular execution. How to achieve that? Let's make use of interfaces we've already defined: IInQueuePipeStep
and IOutQueuePipeStep
. Additionally, new QueuePipeService
will be introduced and it's going to be responsible for queue related pipe execution.
Below you can find implementation of QueuePipeService
. The only difference comparing to the previous implementation is inside ExecuteAsync
method:
public class QueuePipeService<TPipeModel> : IPipeService<TPipeModel>
{
protected readonly IList<Func<IPipeStep<TPipeModel>>> _pipeSteps;
protected QueuePipeService()
{
_pipeSteps = new List<Func<IPipeStep<TPipeModel>>>();
}
public IPipeService<TPipeModel> Add(Func<IPipeStep<TPipeModel>> pipeStep)
{
_pipeSteps.Add(pipeStep);
return this;
}
public async Task ExecuteAsync(TPipeModel pipeModel)
{
var steps = GetInitializedSteps().SkipUntil(s => !(s is IInQueuePipeStep<TPipeModel> && (s as IInQueuePipeStep<TPipeModel>).InQueueName == pipeModel.QueueName))
.TakeUntil(s => !(s is IOutQueuePipeStep<TPipeModel>));
foreach (var pipeStep in steps)
{
await pipeStep.ExecuteAsync(pipeModel);
}
}
private IEnumerable<IPipeStep<TPipeModel>> GetInitializedSteps()
{
return _pipeSteps.Select(s => s.Invoke());
}
}
ExecuteAsync
works in a way that until it finds step which implements IInQueuePipeStep
interface and queue name for such step is equal to queue name passed in pipe model it will be skipping steps. From the matching step it gets all next steps but if one of them implements IOutQueuePipeStep
, it is going to be an indication that after this step some request message is going to be sent out and pipeline will be terminated until receiving response. To achieve this functionality I created two IEnumerable
extension methods: SkipUntil
and TakeUntil
.
As I mentioned earlier, there is a big chunk of code which is common for both: QueuePipeService
and PipeService
. To follow DRY (don’t repeat yourself) principle, under this link, you can find final implementation with extracted PipeServiceBase
abstract class which contains common implementation for both services.
Based on the final implementation, PipelinePattern NuGet package is generated. It is ready to be downloaded and used.
Using proposed pipe implementation, whole business process can be defined in a single place. Moreover, implementation of complex business flow will have clean, SOLID, testable and easy to understand code. Without knowing implementation details, looking just on pipe steps, at the first glance it is quite obvious what is going to happen in specific pipeline. It is just a matter of digging into specific step to find out everything. Additionally, if you integrate logging into your pipeline, issues can be tracked easily.
I hope you will make use of it in your project.
You can find it in NuGet package manager by typing ‘PipelinePattern’.
There are more things that could be incorporated into pipeline solution, like for instance:
- parallel execution of a series of steps,
- reporting of pipeline progress with SignalR.
Stay tuned!
20