Pipeline pattern - intermittent flow code examples

Pipeline example using Azure Functions

Let’s imagine a request to translate header, body and footer of some article. But before marking translation as accepted, it has to be approved by some external unit as well. To get approval, request message for text acceptance must be placed on some external queue. When approval response is received, pipeline can be finalized.

Working example can be found here. To run it, all you have to do is to define your ServiceBus connection string and create three queues:

  • translate-text
  • approve-text-out
  • approve-text-in

Using for instance ServiceBusExplorer tool you can put a messages on the queues.

Couple words about implementation.

TranslateTextPipeModel implements two interfaces:

  • IInQueuePipeModel
  • IOutQueuePipeModel

There are two ‘In Message’ steps:

  • DeserializeTranslateTextRequestStep
  • DeserializeApprovalTextResponseStep

and one ‘Out Message’ step:

  • PrepareApprovalTextRequestStep

I have defined TranslateTextPipeFactory which is used inside TranslateTextService to create complete pipeline. Moreover, in service, pipe model with queue name and input message is initiated:

var pipeModel = new TranslateTextPipeModel
{
    QueueName = queueName,
    InMessage = message
};

Service also invokes pipeline execution.

Two Azure Functions are defined:

  • TranslateTextFunction
  • TextApprovalFunction

Both functions make use of TranslateTextService. First function receives a message from “translate-text” service bus and at the end it puts a request for text approval on “approve-text-out” queue. With Azure Functions that return value, you can bind a function output binding to the return value. So if you want return value to be put on service bus, you have to specify following attribute:

[return: ServiceBus("approve-text-out", Connection = "ServiceBus")]

Second function is invoked when text approval response is delivered to “approve-text-in” queue. This function finalizes the pipeline flow.

Simple, isn’t it?

Pipeline example using Azure WebJob

Example used for WebJob is exactly the same example as for Azure Functions – request for header, body and footer translation of some article.

Solution for Azure Web Job is a bit different. First of all a message must be put on external queue by ourselves. Using Azure Functions it was possible to apply the output binding attribute.

When it comes to receiving messages, it can be done by simply defining functions desired for it or by making use of IHostedService interface to implement some background queue related tasks.

I will present you second approach. Using IHostedService interface, ServiceBusListener for pipeline will be implemented. For each pipeline queue that delivers messages, message handler is going to be registered automatically. Thanks to that it's possible to define general flow for receiving messages and passing them into pipeline execution.

If by any mean it is required to extend pipeline and add some additional external call, all what is needed is just to define additional queue name and pipeline steps to handle this specific case.

As I've already mentioned, sending message to external queue must be handled by ourselves. For that let’s define generic OutServiceBusPipeStep which will implement IOutQueuePipeStep interface. Generic step implementation will contain three abstractions required to be overridden by the inheriting class:

  • ServiceBusConnectionConfigKey (property)
  • OutQueueName (property)
  • CreateOutQueueMessage(TPipeModel pipeModel) (function)

ExecuteAsync function from OutServiceBusPipeStep will prepare message, create QueueClient and based on the configuration passed through properties it will put a message on the queue.

From now on this generic implementation can be used for all WebJob pipe steps that are responsible for sending message to external queue.

Next part that differs from Azure Functions solution is the way of handling incoming messages.

For that I defined abstract ServiceBusListenerBase<TPipeModel, TInQueueEnum> class which implements IHostedService interface. Implementing such interface it is required to implement following interface members:

  • Task StartAsync(CancellationToken cancellationToken)
  • Task StopAsync(CancellationToken cancellationToken)

StartAsync method will iterate through all queues that deliver messages for pipeline and for each and every queue, message handler will be registered. Message handler does nothing but receiving message, preparing pipe model, creating and executing pipeline.

Has the second generic type parameter for ServiceBusListenerBase caught your attention?

To have single source of truth for all queues related to receiving or sending messages for specific pipeline I created two enumeration types:

  • TranslateTextPipeInQueue
  • TranslateTextPipeOutQueue

Enumeration type which defines ‘InQueue’ values - TranslateTextPipeInQueue, will be passed as TInQueueEnum type into ServiceBusListenerBase. Thanks to that, I’m able to iterate through its values and register message handler for defined queues.

Another benefit of introducing an enumeration type is that this is the only place to be modified if new queue will become part of existing process.

Final solution can be found under this link.

27