Pipeline pattern - intermittent flow

Having implementation of pipeline pattern with error handling in place, let's start working on more sophisticated pipeline solution.

Let’s define additional requirements for pipeline implementation:

  • it allows to define an intermittent flow,
  • it allows to define step to read a message from the queue,
  • it allows to define step to send a message to a different queue,
  • it is terminated after sending out a message,
  • it allows to continue from a certain step after receiving response.

On the picture below you can find desired flow diagram:
Image description
The goal of ‘In Message’ step is to read a message from the queue. It means, message has to be delivered into this specific pipe step. In other words, we must be sure that message from the queue is part of defined pipe model. To ensure this, let’s create an interface which will contain such property:

public interface IInQueuePipeModel
{
    string InMessage { get; set; }
}

As you can see on flow diagram, it is possible to define multiple ‘In Message’ steps for reading messages from different queues. How does pipeline know which step should be invoked for a particular queue?

To receive a message from the queue or send out a message to some queue, it is required to know queue name. The queue name is key here and this is exactly what is going to be used to distinguish between different ‘In Message’ steps. So there are two things needed:

  • queue name present in pipe model to have information from which queue message was received,
  • queue name defined for specific 'In Message' step, so pipeline can figure out where is starting point for pipe execution.

To force queue name definition in pipe model, let’s define QueueName property in IInQueuePipeModel interface:

public interface IInQueuePipeModel
{
    string QueueName { get; set; }
    string InMessage { get; set; }
}

Additionally, separate interface for ‘In Message’ step is needed. It will ensure queue name presence for appropriate step:

public interface IInQueuePipeStep<TPipeModel> : IPipeStep<TPipeModel> where 
TPipeModel : IInQueuePipeModel
{
    string InQueueName { get; }
}

Notice I added generic type constraint on TPipeModel, meaning that the type parameter TPipeModel implements the IInQueuePipeModel interface.

Last part is ‘Out Message’ step. This step is going to be used for preparing request message for some external queue. Additionally, after this step, pipe is going to be terminated until receiving response.

To achieve that I need specific interface for ‘Out Message’ step which will indicate breaking point in the pipeline:

public interface IOutQueuePipeStep<TPipeModel> : IPipeStep<TPipeModel> where TPipeModel : IInQueuePipeModel
{ 
}

Also, I introduced yet another interface, so when needed, pipe model will implement property for outgoing messages:

public interface IOutQueuePipeModel
{
    string OutMessage { get; set; }
}

IOutQueuePipeModel interface is not required. Since there are different ways to implement sending out messages, such property is not always needed in pipe model. Add it when you are sure you will make use of it.

Even though whole pipeline is defined as one piece, in runtime it will be split into multiple executions, as flow diagram shows. In other words there is going to be a mechanism in place that will extract only steps that are required for a particular execution. How to achieve that? Let's make use of interfaces we've already defined: IInQueuePipeStep and IOutQueuePipeStep. Additionally, new QueuePipeService will be introduced and it's going to be responsible for queue related pipe execution.

Below you can find implementation of QueuePipeService. The only difference comparing to the previous implementation is inside ExecuteAsync method:

public class QueuePipeService<TPipeModel> : IPipeService<TPipeModel>
{
    protected readonly IList<Func<IPipeStep<TPipeModel>>> _pipeSteps;

    protected QueuePipeService()
    {
        _pipeSteps = new List<Func<IPipeStep<TPipeModel>>>();
    }

    public IPipeService<TPipeModel> Add(Func<IPipeStep<TPipeModel>> pipeStep)
    {
        _pipeSteps.Add(pipeStep);
        return this;
    }

    public async Task ExecuteAsync(TPipeModel pipeModel)
    {
        var steps = GetInitializedSteps().SkipUntil(s => !(s is IInQueuePipeStep<TPipeModel> && (s as IInQueuePipeStep<TPipeModel>).InQueueName == pipeModel.QueueName))
                                         .TakeUntil(s => !(s is IOutQueuePipeStep<TPipeModel>));

        foreach (var pipeStep in steps)
        {
            await pipeStep.ExecuteAsync(pipeModel);
        }
    }

    private IEnumerable<IPipeStep<TPipeModel>> GetInitializedSteps()
    {
        return _pipeSteps.Select(s => s.Invoke());
    }
}

ExecuteAsync works in a way that until it finds step which implements IInQueuePipeStep interface and queue name for such step is equal to queue name passed in pipe model it will be skipping steps. From the matching step it gets all next steps but if one of them implements IOutQueuePipeStep, it is going to be an indication that after this step some request message is going to be sent out and pipeline will be terminated until receiving response. To achieve this functionality I created two IEnumerable extension methods: SkipUntil and TakeUntil.

As I mentioned earlier, there is a big chunk of code which is common for both: QueuePipeService and PipeService. To follow DRY (don’t repeat yourself) principle, under this link, you can find final implementation with extracted PipeServiceBase abstract class which contains common implementation for both services.

Based on the final implementation, PipelinePattern NuGet package is generated. It is ready to be downloaded and used.

Summary

Using proposed pipe implementation, whole business process can be defined in a single place. Moreover, implementation of complex business flow will have clean, SOLID, testable and easy to understand code. Without knowing implementation details, looking just on pipe steps, at the first glance it is quite obvious what is going to happen in specific pipeline. It is just a matter of digging into specific step to find out everything. Additionally, if you integrate logging into your pipeline, issues can be tracked easily.

I hope you will make use of it in your project.

You can find it in NuGet package manager by typing ‘PipelinePattern’.

Future enhancement

There are more things that could be incorporated into pipeline solution, like for instance:

  • parallel execution of a series of steps,
  • reporting of pipeline progress with SignalR.

Stay tuned!

20