27
Pipeline pattern - intermittent flow code examples
Let’s imagine a request to translate header, body and footer of some article. But before marking translation as accepted, it has to be approved by some external unit as well. To get approval, request message for text acceptance must be placed on some external queue. When approval response is received, pipeline can be finalized.
Working example can be found here. To run it, all you have to do is to define your ServiceBus connection string and create three queues:
- translate-text
- approve-text-out
- approve-text-in
Using for instance ServiceBusExplorer tool you can put a messages on the queues.
Couple words about implementation.
TranslateTextPipeModel
implements two interfaces:
IInQueuePipeModel
IOutQueuePipeModel
There are two ‘In Message’ steps:
DeserializeTranslateTextRequestStep
DeserializeApprovalTextResponseStep
and one ‘Out Message’ step:
PrepareApprovalTextRequestStep
I have defined TranslateTextPipeFactory
which is used inside TranslateTextService
to create complete pipeline. Moreover, in service, pipe model with queue name and input message is initiated:
var pipeModel = new TranslateTextPipeModel
{
QueueName = queueName,
InMessage = message
};
Service also invokes pipeline execution.
Two Azure Functions are defined:
TranslateTextFunction
TextApprovalFunction
Both functions make use of TranslateTextService
. First function receives a message from “translate-text” service bus and at the end it puts a request for text approval on “approve-text-out” queue. With Azure Functions that return value, you can bind a function output binding to the return value. So if you want return value to be put on service bus, you have to specify following attribute:
[return: ServiceBus("approve-text-out", Connection = "ServiceBus")]
Second function is invoked when text approval response is delivered to “approve-text-in” queue. This function finalizes the pipeline flow.
Simple, isn’t it?
Example used for WebJob is exactly the same example as for Azure Functions – request for header, body and footer translation of some article.
Solution for Azure Web Job is a bit different. First of all a message must be put on external queue by ourselves. Using Azure Functions it was possible to apply the output binding attribute.
When it comes to receiving messages, it can be done by simply defining functions desired for it or by making use of IHostedService
interface to implement some background queue related tasks.
I will present you second approach. Using IHostedService
interface, ServiceBusListener
for pipeline will be implemented. For each pipeline queue that delivers messages, message handler is going to be registered automatically. Thanks to that it's possible to define general flow for receiving messages and passing them into pipeline execution.
If by any mean it is required to extend pipeline and add some additional external call, all what is needed is just to define additional queue name and pipeline steps to handle this specific case.
As I've already mentioned, sending message to external queue must be handled by ourselves. For that let’s define generic OutServiceBusPipeStep
which will implement IOutQueuePipeStep
interface. Generic step implementation will contain three abstractions required to be overridden by the inheriting class:
-
ServiceBusConnectionConfigKey
(property) -
OutQueueName
(property) -
CreateOutQueueMessage(TPipeModel pipeModel)
(function)
ExecuteAsync
function from OutServiceBusPipeStep
will prepare message, create QueueClient
and based on the configuration passed through properties it will put a message on the queue.
From now on this generic implementation can be used for all WebJob pipe steps that are responsible for sending message to external queue.
Next part that differs from Azure Functions solution is the way of handling incoming messages.
For that I defined abstract ServiceBusListenerBase<TPipeModel, TInQueueEnum>
class which implements IHostedService
interface. Implementing such interface it is required to implement following interface members:
Task StartAsync(CancellationToken cancellationToken)
Task StopAsync(CancellationToken cancellationToken)
StartAsync
method will iterate through all queues that deliver messages for pipeline and for each and every queue, message handler will be registered. Message handler does nothing but receiving message, preparing pipe model, creating and executing pipeline.
Has the second generic type parameter for ServiceBusListenerBase
caught your attention?
To have single source of truth for all queues related to receiving or sending messages for specific pipeline I created two enumeration types:
TranslateTextPipeInQueue
TranslateTextPipeOutQueue
Enumeration type which defines ‘InQueue’ values - TranslateTextPipeInQueue
, will be passed as TInQueueEnum
type into ServiceBusListenerBase
. Thanks to that, I’m able to iterate through its values and register message handler for defined queues.
Another benefit of introducing an enumeration type is that this is the only place to be modified if new queue will become part of existing process.
Final solution can be found under this link.
27