Implementing the Correlation Identifier Pattern on Stateful Logic Apps using the Webhook Action

Introduction

In many business scenarios, there is the need to implement long-running processes which first send a message to a second process and then pause and wait for an asynchronous response before they continue. Being this an asynchronous communication, the challenge is to correlate the response to the original request. The Correlation Identifier enterprise integration pattern targets this scenario.

Azure Logic Apps provides a stateful workflow engine that allow us to implement robust integration workflows quite easily. One of the workflow actions in Logic Apps is the webhook action, which can be used to implement the Correlation Identifier pattern. One typical scenario in which this pattern can be used is when an approval step with a custom API (with a similar behaviour to the Send Approval Email connector) is required in a workflow.

In this post, I will show how to implement the Correlation Identifier enterprise integration pattern on Logic Apps leveraging the webhook action.

Some background information

The Correlation Identifier Pattern

Adapted from Enterprise Integration Patterns

The Correlation Identifier enterprise integration pattern proposes to add a unique id to the request message on the requestor end and return it as the correlation identifier in the asynchronous response message. This way, when the requestor receives the asynchronous response, it knows which request that response corresponds to. Depending on the functional and non-functional requirements, this pattern can be implemented in a stateless or stateful manner.

Understanding webhooks

A webhook is a service that will be triggered on a particular event and will result on an Http call to a RESTful subscriber. A much more comprehensive definition can be found here. You might be familiar with the configuration of webhooks with static subscribers. In a previous post, I showed how to trigger a Logic App by an SMS message with a Twilio webhook. This webhook will sends all events to the same Http endpoint, i.e. a static subscriber.

The Correlation Identifier pattern on Logic Apps

If you have used the Send Approval Email Logic App Connector, this implements the Correlation Identifier pattern out-of-the-box in a stateful manner. When this connector is used in a Logic App workflow, an email is sent, and the workflow instance waits for a response. Once the email recipient clicks on a button in the email, the particular workflow instance receives an asynchronous callback with a payload containing the user selection; and it continues to the next step. This approval email comes in very handy in many cases; however, a custom implementation of this pattern might be required in different business scenarios. The webhook action allow us to have a custom implementation of the Correlation Identifier pattern.

The Logic Apps Webhook Action

To implement the Correlation Identifier pattern, it’s important that you have a basic understanding of the Logic Apps webhook action. Justin wrote some handy notes about it here. The webhook action of Logic Apps works with an instance-based, i.e. dynamic webhook subscription. Once executed, the webhook action generates an instance-based callback URL for the dynamic subscription. This URL is to be used to send a correlated response to trigger the continuation of the corresponding workflow. This applies the Return Address integration pattern.

We can implement the Correlation Identifier pattern by building a Custom API Connector for Logic Apps following the webhook subscribe and unsubscribe pattern of Logic Apps. However, it’s also possible to implement this pattern without the need of writing a Custom API Connector, as I’ll show below.

Scenario

To illustrate the pattern, I’ll be using a fictitious company called FarmToTable. FarmToTable provides delivery of fresh produce by drone. Consumers subscribe to the delivery service by creating their personalised list of produce to be delivered on a weekly basis. FarmToTable requires to implement an SMS confirmation service so that an SMS message is sent to each consumer the day before the scheduled delivery date. After receiving the text message, the customer must confirm within 12 hours whether they want the delivery or not, so that the delivery is arranged.

The Solution Architecture

As mentioned above, the scenario requires sending an SMS text message and waiting for an SMS response. For sending and receiving the SMS, we will be using Twilio. More details on working with Logic Apps and Twilio on one of my previous posts. Twilio provides webhooks that are triggered when SMS messages are received. The Twilio webhooks only allow static subscriptions, i.e. calling one single Http endpoint. Nevertheless, the webhook action of Logic Apps requires the webhook subscribe and unsubscribe pattern, which works with an instance-based subscription. Thus, we need to implement a wrapper for the required subscribe/unsubscribe pattern.

The architecture of this pattern is shown in the figure below and explain after.

Components of the solution:

  1. Long-running stateful workflow. This is the Logic App that controls the main workflow, sends a request, pauses and waits for an asynchronous response. This is implememented by using the webhook action.
  2. Subscribe/Unsubscribe Webhook Wrapper. In our scenario, we are working with a third-party service (Twilio) that only supports webhooks with static subscriptions; thus, we need to create this wrapper. This wrapper is composed by 4 different parts.
  • Subscription store: A database to store the unique message Id and the instance-based callback URL provided by the webhook action. In my implementation, I’m using Azure Cosmos DB for this. Nevertheless, you can use any other suitable alternative. Because the only message id we can send to Twilio and get back is the phone number, I’m using this as my correlation identifier. We can assume that for this scenario the phone number is unique during the day.
  • Subscribe and Start Request Processing API: this is a RESTful API that is in charge of starting the processing of the request and storing the subscription. I’m implementing this API with a Logic App, but you can use an Azure Function, an API App or a Custom Api App connector for Logic App.
  • Unsubscribe and Cancel Request Processing API: this is another RESTful API that is only going to be called if the webhook action on the main workflow times out. This API is in charge of cancelling the processing and deleting the subscription from the store. The unsubscribe step has a similar purpose to the CancellationToken structure used in C# async programming. In our scenario, there is nothing to cancel though. Like the previous API, I’m implementing this with a Logic App, but you can use different technologies.
  • Instance-based webhook: this webhook is to be triggered by the third-party webhook with a static subscription. Once triggered, this Logic App is in charge of getting the instance-based callback URL from the store and invoking it. After making the call back to the main workflow instance, the subscription is to be deleted.

The actual solution

To implement this solution, I’m going to follow the steps described below:

1. Configure my Twilio account to be able to send and receive SMS messages. More details here.

2. Create a Service Bus Namespace and 2 queues. For my scenario, I’m using one inbound queue (ScheduledDeliveriesToConfirm) and one outbound queue (ConfirmedScheduledDeliveries). For your own scenarios, you can use other triggers and outbound protocols.

3. Create a Cosmos Db collection to store the instance-based webhook subscriptions. More details on how to work with Cosmos Db here.

  • Create Cosmos Db account (with the Document DB API).
  • Create database
  • Create collection.

4. Create the “Subscribe and Start Request Processing API”. I’m using a Logic App workflow to implement this API as shown below. I hope the steps with their comments are self-explanatory.

  • The workflow is Http triggered. It expects, as the request body, the scheduled delivery details and the instance-based callback URL of the calling webhook action.
  • The provided Http trigger URL is to be configured later in the webhook action subscribe Uri of the main Logic App.
  • It stores the correlation on Cosmos Db. More information on the Cosmos Db connector here.
  • It starts the request processing by calling the Twilio connector to send the SMS message.

The expected payload for this API is as the one below. This payload is to be sent by the webhook action subscribe call on the main Logic App:

{
    "callbackUrl": "https://prod-00.australiasoutheast.logic.azure.com/workflows/guid/runs/guid/actions/action/run?params",
    "scheduledDelivery": {
        "deliveryId": "2c5c8390-b6c8-4274-b785-33121b01e219",
        "customer": "Paco de la Cruz",
        "customerPreferredName": "Paco",
        "phone": "+61000000000",
        "orderName": "Seasonal leafy greens and fruits",
        "deliveryAddressName": "Home",
        "deliveryDate": "2017-07-20",
        "deliveryTime": "07:30",
        "createdDateTime": "2017-07-19T09:10:03.209"
    }
} 

You can have a look at the code behind here. Please use it just as a reference, as it hasn’t been refactored for deployment.

5. Create the “Unsubscribe and Cancel Request Processing API”. I used another Logic App workflow to implement this API. This API is only going to be called if the webhook action on the main workflow times out. The workflow is show below.

  • The workflow is Http triggered. It expects as the request body the message id so the corresponding subscription can be deleted.
  • The provided Http trigger URL is to be configured later in the webhook action unsubscribe Uri of the main Logic App.
  • It deletes the subscription from Cosmos Db. More information on the Cosmos Db connector here.

The expected payload for this API is quite simple, as the one shown below. This payload is to be sent by the webhook action unsubscribe call on the main Logic App:

{
    "id": "+61000000000"
}

The code behind is published here. Please use it just as a reference, as it hasn’t been refactored to be deployed.

6. Create the Instance-based Webhook. I’m using another Logic App to implement the instance-based webhook as shown below.

  • The workflow is Http triggered. It’s to be triggered by the Twilio webhook.
  • The provided Http trigger URL is to be configured later in the Twilio webhook.
  • It gets the message Id (phone number) from the Twilio message.
  • It then gets the instance-based subscription (callback URL) from Cosmos Db.
  • Then, it posts the received message to the corresponding instance of the main Logic App workflow by using the correlated callback URL.
  • After making the callback, it deletes the subscription from Cosmos Db.

The code behind for this workflow is here. Please use it just as a reference, as it is not ready to be deployed.

7. Configure the Twilio static webhook. Now, we have to configure the Twilio webhook to call the Logic App created above when an SMS message is received. Detailed instructions in my previous post.

8. Create the long-running stateful workflow. Once we have the implemented the subscribe/unsubscribe webhook wrapper required for the Logic App webhook action, we can start creating the long-running stateful workflow. This is shown below.

In order to trigger the Unsubscription API, the timeout property of the webhook action must be configured. This can be specified under the settings of the action. The Duration is to be configured the in ISO 8601 duration format. If you don’t want to resend the request after the time out, you should turn off the retry policy.

  • The workflow is triggered by messages on the ScheduledDeliveriesToConfirm Service Bus queue.
  • Then the webhook action:
    • Sends the scheduled delivery message and the corresponding instance-based callback URL to the Subscribe and Start Request Processing Logic App.
    • Waits for the callback from the Instance-based webhook. This would receive as an Http post the response send by the customer. If a response is received before the time out limit, the action will succeed and continue to the next action.
    • If the webhook action times out, it calls the Unsubscribe and Cancel Request Processing Logic App and sends the message id (phone number); and the action fails so the workflow does not continue. However, if required, you could continue the workflow by configuring the RunAfter property of the subsequent action.
  • If a response is received, the workflow continues assessing the response. If the response is ‘YES’, it sends the original message to the ConfirmedScheduledDeliveries queue.

The code behind of this workflow is available here. Please use it just as a reference only, as it hasn’t been refactored for deployment.

Now, we have finished implementing the whole solution! 🙂 You can have a look at all the Logic Apps JSON definitions in this repository.

Conclusion

In this post, I’ve shown how to implement the Correlation Identifier pattern using a stateful Logic App. To illustrate the pattern, I implemented an approval step in a Logic App workflow with a custom API. For this, I used Twilio, a third-party service, that offers a webhook with a static subscription; and created a wrapper to implement the subscribe/unsubscribe pattern, including an instance-based webhook to meet the Logic Apps webhook action requirements.

I hope you find this post useful whenever you have to add a custom approval step or correlate asynchronous messages using Logic Apps, or that I’ve given you an overview of how to enable the correlation of asynchronous messages in your own workflow or integration scenarios.

Feel free to add your comments or questions below, and happy clouding!

Cross-posted on Mexia Blog.
Follow me on @pacodelacruz

Triggering an Azure Logic App by SMS messages with Twilio

Introduction

SMS messaging has been a widely adopted way of communication over the last decades, not only for people but for organisations as well. Even though nowadays there are many messaging apps that are more popular and flexible than plain SMS, there are still scenarios in which businesses find SMS messaging a valuable way to communicate with their customers, employees or business partners. For instance, there are food chains that allow you to place your favourite order via SMS, some business provide order and shipment tracking via SMS, and there are many confirmation systems that utilise SMS so users can confirm their attendance to a meeting or appointment via a text message.

Twilio is a flexible and extensible PaaS offering that provides, among other communication services, that of SMS. Twilio allows you to send and receive SMS messages and integrate them into your business processes via APIs. Meanwhile, Logic Apps offers the Twilio connector which provides actions to send, list and get messages from Twilio. However, at the time of writing, this connector does not include a trigger, so that a Logic instance can be initiated when an SMS is received on a phone number of a Twilio account.

In this post, I will show how to trigger an Azure Logic App when an SMS message is received on a Twilio phone number.

Prerequisites

If you want to implement this scenario, you would need the following:

  1. A Twilio Account
  2. A Twilio Phone Number (you have to buy a phone number that can send and receive SMS messages, which has a monthly cost)
  3. Balance on your Twilio Account to allow for the Programmable SMS costs (receiving and sending SMS messages)

Implementing an SMS triggered Azure Logic App

The steps to implement an SMS triggered Logic App are described below:

1. Create a Logic App with an Http Request Trigger

As mentioned before, the Twilio connector does not offer a trigger to initiate a Logic App instance. However, Twilio provides a Webhook that allows you to subscribe your Http endpoint so that Twilio makes an Http Post whenever an SMS message is received. Therefore, we can create an Http triggered Logic App to receive these calls from the webhook. The quickest way to do is to start with the corresponding HTTP Triggered template.

Once the Logic Apps has been created, we should make sure that POST is selected as the method to trigger the workflow, so Twilio can call the Logic App endpoint and send the corresponding payload.

2. Getting the Http Url

After we save the Logic App containing the Http Request trigger, we get the workflow endpoint Url, including the corresponding SAS token. For your own solution, you might want to implement an intermediate layer and abstract the actual Logic App Url using Azure API Management or an Azure Functions Proxy. For this exercise, I will continue using the generated Url.

3. Configuring the Twilio Webhook to Call the Http endpoint when a message comes in

Once we have the Logic App endpoint Url, on the Twilio Portal we configure the Twilio Phone Number to use a Webhook when a message comes in to call the Logic App endpoint with an Http Post. The Webhook will send the SMS details as form data to the Logic App.

Now, we are all set to trigger our Logic App when an SMS is received.

4. Getting the SMS Message Details

You can inspect the message sent by the Twilio Webhook by sending an SMS to trigger your Logic App, and checking the output body of your run as shown below.

The Twilio Webhook sends Url encoded form data (“$content-type”: “application/x-www-form-urlencoded”). You should expect a request output body similar to the one shown below.

Being this a Form Data Post request, we can use the function @triggerFormDataValue() to get each of the properties sent by Twilio, e.g. @triggerFormDataValue(‘Body’) and @triggerFormDataValue(‘From’)

5. Sending an SMS Response

To make the Logic App a bit more fun, and show more of the Twilio connector capabilities, I’ll add one action to the workflow to reply back to the sender via SMS.

When we add the Twilio – Send Text Message, we first have to configure the API Connection. We have to give the connection a name and provide the Account Id, and Access Token. We can get these details from the Twilio Dashboard. We get the Account Id as the Account SID and the Access Token as the Auth Token.

Once we have configured the API Connection, we are ready to continue configuring the actual action. We should be able to select from the drop down list the From Phone Number in our Twilio account that we want to use to send the response message. In our scenario, we get the To Phone Number using @triggerFormDataValue(‘From’). For this exercise, I’m adding the original Text as part of the Reply Text as shown below.

And, that’s pretty much it. We have implemented a Logic App that is triggered by an SMS which replies back to the sender. In case you are interested on seeing the code behind, you can have a look at it below:

Conclusion

In this post, I’ve shown how to trigger a Logic App workflow by an SMS message using a Twilio account, how to get the message details, and how to reply back to the sender using the Twilio connector. Thanks to the Logic Apps capabilities, this implementation is quite easy and straight forward. Of course, you can extend and enrich your workflow making use of the many different connectors to get or push data from and to diverse apps and LOB systems. I hope you have found this post useful and it comes in handy when you are implementing solutions that require SMS integration. Feel free to add your comments or queries below.

Happy clouding!

Cross-posted on Mexia Blog

Transforming JSON Objects in Logic Apps

Introduction

Many integration scenarios require translating messages from one data model to another. This is described in the Message Translator Enterprise Integration Pattern. Some of these might be:

  • Translation between two different proprietary data models
  • Translation between a proprietary data model and an industry standard specification, and vice-versa
  • Translation between a proprietary data model and a Canonical Model, and vice-versa
  • Normalisation from different third-party formats to an internal Canonical Model
  • Content Filtering to remove unnecessary or sensitive information
  • Content Enrichment which can include different input messages from diverse sources
  • Applying an Envelope Wrapper to add metadata into the message

Logic Apps, as an Integration Platform as a Service (iPaaS), offers different capabilities that allow us to transform messages flowing through. The Enterprise Integration Transform Connector allows us to use XSLT-based graphically-designed maps to convert XML messages from one XML format to another. This connector can be used together with flat file decoder and encoder to transform from flat files to XML and vice-versa; and with the EDIFACT encoder and decoder and X12 encoder and decoder to translate from EDI formats to XML and vice-versa. Even though, flat files, EDI and XML are quite common in legacy integrations, JSON is now the de-facto standard for data interchange.

While there are clear transformation tools for these legacy formats, I have heard more than a couple of times the question of how to transform JSON objects to different data models within a Logic App. In this blog post, I’ll share some tips on how to leverage Logic Apps capabilities to implement JSON transformations in integration workflows.

Scenario

To show how to transform JSON objects, I’ll be working with an imagined scenario. The Keep Yourself Active Company is organising a corporate Step Challenge across their multiple locations. In addition to the step competition, the company wants to collect data about workout distances and energy burned to show them in the dashboards. Due to the large scale of the competition, they chose to use more than one fitness tracking app and they need to support different units of measure, e.g. calories and kilojoules for energy burned, and kilometres and miles for distances. The integration team has started architecting the solution and have designed a Canonical Model to represent each participant’s data. They have decided to implement the Normalisation pattern to standardise the diverse formats coming from the different apps into the Canonical Model. In addition, the team has implemented a routing and tracking framework that requires an Envelope Wrapper to include some metadata. Sample input messages, including one from one particular fitness tracking app, and a canonical sample message are shown below.

Input 1: Employee Details coming from the HR System

{
   "firstName": "Paco",
   "lastName": "de la Cruz", 
   "location": "Melbourne",
   "country": "Australia",
   "department": "Information Technologies",
   "email": "paco@keepyourselfactive.com.au"
}

Input 2: Tracked Activities from the Fitness Tracking App 1

{
   "user": "paco@keepyourselfactive.com.au",
   "workouts": [
      {
         "date": "2017-05-22",
         "type": "run",
         "distanceInMiles": 3.73,
         "time": "31:21",
         "energyInCalories": 533,
         "elevationInFeet": 119
      },
      {
         "date": "2017-05-24",
         "type": "run",
         "distanceInMiles": 3.74,
         "time": "32:05",
         "energyInCalories": 529,
         "elevationInFeet": 121
      },
      {
         "date": "2017-05-27",
         "type": "run",
         "distanceInMiles": 3.73,
         "time": "31:12",
         "energyInCalories": 534,
         "elevationInFeet": 118
      }
   ]
}
			

Input 3: Step Count from the Fitness Tracking App 1

{
   "user": "paco@keepyourselfactive.com.au",
   "steps": [
      {
         "date": "2017-05-22",
         "steps": 11813
      },
      {
         "date": "2017-05-23",
         "steps": 8340
      },
      {
         "date": "2017-05-24",
         "steps": 10980
      },
      {
         "date": "2017-05-25",
         "steps": 9753
      },
      {
         "date": "2017-05-26",
         "steps": 8798
      },
      {
         "date": "2017-05-27",
         "steps": 12531
      },
      {
         "date": "2017-05-28",
         "steps": 7689
      }
   ]
}

			

Expected output in the Canonical Model, including an Envelope with metadata.

{
   "metadata": {
      "messageId": "6468f980-a167-4307-888e-874a843aebe4", 
      "timestamp": "2017-05-29T01:00:00:000Z", 
      "entityType": "ActiveChallengeParticipantWeekRecords",
      "version": "2017-04-01"
   }
   "payload": {
      "participant": {
        "givenName": "Paco",
        "familyName": "de la Cruz", 
        "office": "Melbourne",
        "country": "Australia",
        "department": "Information Technologies",
        "email": "paco@keepyourselfactive.com.au"
      },
      "steps": [
         {
            "date": "2017-05-22",
            "steps": 11813
         },
         {
            "date": "2017-05-23",
            "steps": 8340
         },
         {
            "date": "2017-05-24",
            "steps": 10980
         },
         {
            "date": "2017-05-25",
            "steps": 9753
         },
         {
            "date": "2017-05-26",
            "steps": 8798
         },
         {
            "date": "2017-05-27",
            "steps": 12531
         },
         {
            "date": "2017-05-28",
            "steps": 7689
         }
      ],
      "workouts": [
         {
            "date": "2017-05-22",
            "type": "run",
            "distanceInKms": 6.00,
            "time": "31:21",
            "energyInKJ": 2230
         },
         {
            "date": "2017-05-24",
            "type": "run",
            "distanceInKms": 6.02,
            "time": "32:05",
            "energyInKJ": 2213
         },
         {
            "date": "2017-05-27",
            "type": "run",
            "distanceInKms": 6.00,
            "time": "31:12",
            "energyInKJ": 2234
         }
      ]
   }
}

As we can see, this scenario includes several Enterprise Messaging Patterns, including Message Translator, Enrichment (assuming the information is coming from different APIs), Normalisation, Canonical Model and Envelope Wrapper. Let’s have a look at how to implement this transformation in Logic Apps.

Preparing my scenario in a Logic App

For demonstration purposes, I’m implementing this scenario using a simple Logic App with Data Operations – Compose actions to create the input JSON messages, as shown below. However, in real-life scenarios, you would expect to receive these payloads from other APIs.

Once we have the JSON messages in the Logic App, I’m using the Data Operations – Parse Action to be able to use Dynamic Content tokens from these JSON objects later. I’ve used the same payload as a sample message to generate the JSON schema.

Mapping a flat JSON object

The easiest part of the mapping is to map a flat JSON object to another one. So let’s start with the participant object within the payload. To transform one JSON object in a particular data model to a different one, we can leverage the Data Operations – Compose action to create an object with the required data model. We can use the dynamic content tokens from the previous Data Operations – Parse actions.

And below is the code behind. There, you can see that we are constructing a JSON object with the required properties and we are using the properties of the parsed input payload to do so.

"Transform_Participant_by_Using_Compose": {
   "inputs": {
      "country": "@{body('Parse_Input_Employee_Details')?['country']}",
      "department": "@{body('Parse_Input_Employee_Details')?['department']}",
      "email": "@{body('Parse_Input_Employee_Details')?['email']}",
      "familyName": "@{body('Parse_Input_Employee_Details')?['lastName']}",
      "givenName": "@{body('Parse_Input_Employee_Details')?['firstName']}",
      "office": "@{body('Parse_Input_Employee_Details')?['location']}"
   },
   "runAfter": {
      "Parse_Input_Steps": [
         "Succeeded"
       ]
   },
   "type": "Compose"
}

			

That was too easy! This is how you map a JSON object with a flat structure in Logic Apps. Of course you can use any of the functions available in the Workflow Definition Language

Mapping repeating records or an array in a JSON object

It’s a common scenario that we have repeating records, or an array of objects in our messages and we need to translate them to a different data model.

To do so, we can use the For Each Loop available in Logic Apps, and use a Compose action inside. I’m showing how to transform the workouts array to the required data model using this approach. I’m utilising the @mul (multiply) function to calculate the distance in Kms and energy in KJ. As mentioned before, you can use any of the available functions in your mappings.

The code behind is shown as follows.

"For_each_Workout": {
    "actions": {
        "Transform_Workouts_by_Using_Compose": {
            "inputs": {
                "date": "@{item()?['date']}",
                "distanceInKms": "@mul(item()?['distanceInMiles'], 1.60934)",
                "energyInKJ": "@mul(item()?['energyInCalories'], 4.184)",
                "time": "@{item()?['time']}",
                "type": "@{item()?['type']}"
            },
            "runAfter": {},
            "type": "Compose"
        }
    },
    "foreach": "@body('Parse_Input_Workouts')?['workouts']",
    "runAfter": {
        "Transform_Participant": [
            "Succeeded"
        ]
    },
    "type": "Foreach"
}
			

Another way to map an array of objects is by using the Data Operations – Select action. The main advantage of this approach is that we don’t need to explicitly implement a For Each loop. Designer and code views of this action are shown below.


"Transform_Workouts_by_Using_Select": {
    "inputs": {
        "from": "@body('Parse_Input_Workouts')?['workouts']",
        "select": {
            "date": "@item()?['date']",
            "distanceInKms": "@mul(item()?['distanceInMiles'],1.60934)",
            "energyInKJ": "@mul(item()?['energyInCalories'],4.184)",
            "time": "@item()?['time']",
            "type": "@item()?['type']"
        }
    },
    "runAfter": {
        "For_each_Workout": [
            "Succeeded"
        ]
    },
    "type": "Select"
}
			

Including an array of objects in the Compose action

To create the JSON object in the Canonical Model, as detailed at the beginning of this post, we need to create the participant object and insert two arrays of objects (steps and workouts) while creating the JSON message. Again, this is possible by using the Data Operations – Compose action. I’m creating the Canonical Model, without the Envelope first for demonstration purposes. The Canonical Model includes, the transformed participant, the steps array and the translated workouts array. I’m composing again the participant, inserting the steps array as they are from the inputs, and inserting the workouts array as output from the Select action.

This is the code behind, As you can see, we can insert arrays of objects while using the Data Operations – Compose action.

"Transform_Payload_to_Canonical_Model": {
   "inputs": {
        "participant": {
            "country": "@{body('Parse_Input_Employee_Details')?['country']}",
            "department": "@{body('Parse_Input_Employee_Details')?['department']}",
            "email": "@{body('Parse_Input_Employee_Details')?['email']}",
            "familyName": "@{body('Parse_Input_Employee_Details')?['lastName']}",
            "givenName": "@{body('Parse_Input_Employee_Details')?['firstName']}",
            "office": "@{body('Parse_Input_Employee_Details')?['location']}"
        },
        "steps": "@body('Parse_Input_Steps')?['steps']",
        "workouts": "@outputs(Transform_Workouts_by_Using_Select)"
    },
    "runAfter": {
        "For_each_Workout": [
            "Succeeded"
            ]
        },
    "type": "Compose"
}

Adding an Envelope Wrapper to the JSON Payload

We can use a Compose Action also for adding the Envelope Wrapper as shown below. We just need to insert the payload within the composed JSON as follows.

And this is the code behind.

{
  "metadata": {
    "entityType": "ActiveChallengeParticipantWeekRecords",
    "messageId": "@{guid()}",
    "timestamp": "@{utcnow('o')}",
    "version": "2017-04-01"
  },
  "payload": "@outputs('Transform_Payload_to_Canonical_Model')"
}
			

Other mapping scenarios

There are other JSON mapping scenarios which can easily be solved using the same actions, such as:

  • Sending more than two payloads to an http triggered Azure function. You might have some scenarios in which you need to send more than one JSON payload to an http triggered Azure function. To do so, you can use a variation of the Wrapper pattern inserting the different payloads in a wrapping request body.
  • Transforming a payload which includes an array of objects or repeating records. In the scenario above, I showed how to include an array into a new message. However, you can apply the same principles to map a JSON object which includes an array of objects into a different data model.

More complex scenarios?

Chances are that you would have transformation requirements that go beyond the current capabilities in Logic Apps. For those cases, you have two main alternatives:

  • Create an Azure Function to handles the transformation using code or
  • Transform the JSON object into XML using the @xml() function; then implement the mapping using an XML map; and finally transform the XML back to JSON using the @json() function. I’m not a big fan of this approach, as transforming JSON into XML and back might have an impact on your data model due to the restrictions of each standard.

Conclusion

In this post, I’ve shown how to use Logic Apps capabilities to transform JSON objects to a different data model. We can leverage Data Operation actions to do so. The Parse action allow us to use the different properties of the JSON object as dynamic content tokens in subsequent actions. The Compose action allows us to create a new JSON message, and it can be used within a For Each loop to work with Arrays. And the Select action is quite handy to map arrays of objects into a different model. Furthermore, for more complex scenarios you can always make use of Azure Functions or, if you are familiar with BizTalk Maps, XML Transforms.

How are you implementing your JSON object transformations in Logic Apps? Feel free to share your ideas or post your questions below.

HTH and happy clouding!

Cross-posted on Mexia Blog

Implementing the Polling Consumer Pattern using Azure Logic Apps

Introduction

When implementing integration projects, it’s quite common that upstream systems don’t have the capabilities to push messages to downstream systems, or that due to different constraints or non-functional requirements, the receivers are required to pull for messages from those systems. Gregor Hohpe describes in his book “Enterprise Integration Patterns” the Polling Consumer Pattern, in which a receiver is in charge of polling for messages from a source system. In this pattern, the receiver usually polls for messages with an incremental approach, i.e. polling only for changes from the source system; as opposed to getting a full extract. In most of these scenarios, the provider system does not keep any state on behalf of the receiver; thus, it is up to the receiver to keep a state which allows it to get changes since the previous successful poll.

Azure Logic Apps provides many trigger connectors which already implement the Polling Consumer Pattern out-of-the-box. For example, the Salesforce adapter, can trigger a workflow when a record is created or modified; the SharePoint adapter can initiate a workflow when a file or an item is created or modified; and the Gmail adapter can start a workflow when an email arrives. All these triggers work on a recurrent basis, e.g. every 5 minutes. For all these triggers, the Logic App adapter has to keep a trigger state or polling watermark which allows the connector to get only changes since the last poll. For example, the Gmail connector has to know if there are new emails since the last time it executed a poll. This trigger state or polling watermark should work even if we temporarily disable the Logic App. Even though there are many trigger connectors that make our life very easy, there might be scenarios in which a polling trigger connector is not available for a particular API or system. For example, what if we need to poll for changes from an Oracle database, from a custom API, or from an Atom feed? In this blog, I will show how to implement a custom Polling Consumer Pattern using Azure Logic Apps for those scenarios in which a trigger connector is not yet available.

Scenario

To illustrate the implementation of the Polling Consuming pattern, I will implement a fictitious scenario in which ‘New Hire´ events in an HR System are used to trigger other processes, e.g. identity and workstation provisioning. You might imagine that the end-to-end scenario could be implemented using the Publish-Subscribe Pattern. That’s true, however, on this blog I will focus only on the Polling Consumer interface on the Publisher side.

The HR System of my scenario provides an Atom feed for polling for updates. This Atom feed is exposed as a RESTful API endpoint and requires two query parameters: ‘updated-min’ and ‘updated-max’. Both parameters are date-time in an ISO 8601 UTC format (e.g. yyyy-MM-ddTHH:mm:ss.fffZ). The lower bound (updated-min) is inclusive, whereas the upper bound (updated-max) is exclusive.

Even though my scenario is using a date-time polling watermark, the same principles can be used for other types of watermarks, such as Int64, base64 tokens, and hashes.

Coming back to my scenario, let’s imagine that my Logic App is to be run every 10 mins, and I start it on the 1st of May at 8 AM UTC time; I would expect to send from my Logic App to the HR System http requests like the following:

The first request would return New Hire events that occurred between 8:00 AM and just before 8:10 AM. The next one from 8:10 to just before 8:20, and so on.

Components

To implement this pattern, I will use:

  1. A Logic App, to implement a custom Polling Consumer Pattern workflow.
  2. An Azure Storage Table to persist the polling watermark.
  3. An Azure Function to extract the current polling watermark.
  4. An Azure Function to update the polling watermark after the poll.

Show me the code!

After describing the scenario, let’s start having fun with the implementation. To implement the pattern, I will follow the steps below:

  1. Create an Azure Resource Group
  2. Create an Azure Storage Account, create a Table, and populate the Table with my Polling Watermark
  3. Create an Azure Function App
  4. Develop and deploy the Azure Functions which will allow me to retrieve and update the Polling Watermark
  5. Develop the Azure Logic App

Each step is described in detail as follows,

1. Create an Azure Resource Group

You might already have an Azure Resource Group which contains other resources for your solution. If that’s the case you can skip this step. I will start creating a new Resource Group for all resources I will be using for this demo. I’ll name it ‘pacopollingconsumer-rgrp’


2. Create an Azure Storage Account, create a Table, and populate the Table with the Polling Watermark

Once I have my Azure Resource Group, I’m going to create an Azure Storage Account. I’ll use this Storage Account to create a Table to persist my polling watermark. I want to create a framework that can be used for more than one scenario; whether it’s polling changes from different entities from the same source system, or from more than one source system. So, I’ll prepare my table to handle more than one entity and more than one source system. I’ll name the table ‘pacopollingconsumerstrg’ and use the default settings.


Once I’ve created the Storage Account, I’ll create a table. I’ll use the Azure Storage Explorer for this. Once, I’ve downloaded it, I will open it and add my Azure Account by signing in to Azure.


After signing in, I select my subscription. Then, I should be able to see all my existing Storage Accounts


I create a new Table by right clicking on the Tables branch


I’ll name the new Table ‘PollingWatermark’


Once the Table has been created, I’ll add a new Entity


As mentioned above, I want to be able to use this table to handle more than one entity and more than one source system. I’ll use the Table PartitionKey to store the Source System, which for this demo I’ll use ‘HRSystem’, and the RowKey to store the Entity, which will be ‘EmployeeNewHire’. I will create a new column of time DateTime to store my Watermark, and I will set my initial value. Bear in mind that the Azure Storage Explore works with local time, however, the value will be stored in UTC.


Cool, now we have our Azure Table Storage ready 🙂

3. Create the Azure Function App

At the time of writing this post, there is no Logic App connector for Azure Table Storage. There is already a user voice for it here, and if you want it, I would suggest you to vote for it. (I already did 😉 ) In the absence of a Logic App connector for Azure Storage Table, we will be using an Azure Function App for it. I’ll create an Azure Function App called ‘pacopollingconsumer-func’ on my resource group, using the recently created storage account for its logs, and will use the consumption plan option, which is priced based on execution, as explained here.


Once I’ve created my Function App, I’ll download the publish profile which I’ll be using later to publish my functions.


4. Develop and Deploy the Azure Functions

Even though you can author your Azure Function from the portal, I really like the option of building and test my code locally with all the advantages of using Visual Studio. At the time of writing, Azure Function Tooling on Visual Studio supports creating C# Function as scripts (.csx files) on VS 2015 as described here, and creating class libraries on Visual Studio 2015 and 2017 as shown here. The use of compiled libraries brings some performance benefits, and will also make easier to transition to the planned Visual Studio 2017 tools for Azure Functions as discussed here. So, based on the recommendations, I’ve decided to use a class library project. I’ve already developed an Azure Functions class library project to implement this pattern, and made it available on GitHub, at https://github.com/pacodelacruz/PollingConsumer. Even though you might want to reuse what I have developed, I strongly suggest you to get familiar with developing Azure Function Apps using class library projects, as described here.

Some notes in regard to my ‘PacodelaCruz.PollingConsumer.FunctionApp’ project

  • In order to run Azure Functions locally, you have to install the Azure Functions CLI available here.
  • You will need to update the External program and Working Directory paths in the Project Properties / Web as described here.


  • You might need to get the already referenced NuGet packages:
    • Microsoft.WindowsAzure.ConfigurationManager
    • Microsoft.AspNet.WebApi
    • Microsoft.Azure.WebJobs.Host

NOTE: I have found that the Newtonsoft.Json.JsonSerializerSettings on the Newtonsoft.Json version 10.0.2 does not work properly with Azure Functions. So, I’m using the Newtonsoft.Json version 9.0.1. I recommend you not to update it for the time being.

  • You might want to update the host.json file according to your needs, instructions here.
  • You will need to update the appsettings.json file on your project to use your Azure Storage connection strings.

Now, let’s explore the different components of the project:

  • The PollingWatermarkEntity class is used to handle the entities on the Azure Table Storage called ‘PollingWatermark‘. If you are not familiar with working with Azure Storage Tables on C#, I would recommend you to have a read through the corresponding documentation.

  • The PollingWatermark class helps us to wrap the PollingWatermarkEntity and make it more user-friendly. By using the constructor, we are naming the PartitionKey as SourceSystem, and the RowKey as Entity. Additionally, we are returning another property called NextWatermark that is going to be used as the upper bound when querying the source system and when updating the Polling Watermark after we have successfully polled the source system.

Now, let’s have a look at the Functions code:

  • GetPollingWatermark function. This Http triggered function returns a JSON object containing a DateTime Polling Watermark which is stored on an Azure Storage Table based on the provided ‘sourceSystem’ and ‘entity’ query parameters in the GET request. The function bindings are defined in the corresponding function.json file.

  • UpdatePollingWatermark function. This Http triggered function updates a DateTime Polling Watermark stored on an Azure Storage Table based on the payload in the JSON format sent as the body on the PATCH request. The JSON payload is the one returned by the GetPollingWatermark function. It uses the ‘NextWatermark’ property as the new value. The function bindings are defined in the corresponding
    function.json file.

Once you have updated the appsettings.json file with your own connection strings, you can test your functions locally. You can use PostMan for this. On Visual Studio, hit F5, and wait until the Azure Functions CLI starts. You should see the URL to run the functions as shown below.


Once your project is running, you can then call the functions from PostMan by adding the corresponding query parameters. By calling the GetPollingWatermark function hosted locally, you should get the PollingWatermark, as previously set, as a JSON object.

[GET] http://localhost:7071/api/GetPollingWatermark?sourceSystem=HRSystem&entity=EmployeeNewHire


To call the UpdatePollingWatermark function you need to use the PATCH method, specify that the body is of type application/json, and add in the Request Body, as obtained in the previous call. After calling it, you should see that the PollingWatermark has been updated based on the sent value. So far so good 🙂

[PATCH] http://localhost:7071/api/UpdatePollingWatermark


After having successfully tested both functions, we are ready to publish our Function App project. To do so, we have to right click on the project, and then click Publish. This will allow us to import the Publish Profile that we downloaded previously in one of the first steps described above.


After having successfully published the Azure Function, now we need to configure the connection strings on the Azure Function app settings. You will need to add a new app setting called PollingWatermarkStorage and set the value to the connection string of the storage account containing the PollingWatermark table.


Now you should be able to test the functions hosted on Azure. You need to go to your Function App, navigate to the corresponding function, and get the function URL. As I set the authentication level to function, a function key will be contained in the URL.


Bear in mind that we need to add the corresponding query params or request body. Your URLs should be something like:

[GET] https://pacopollingconsumer-func.azurewebsites.net/api/GetPollingWatermark?code=%5BfunctionKey%5D&SourceSystem=HRSystem&Entity=EmployeeNewHire

[PATCH] https://pacopollingconsumer-func.azurewebsites.net/api/UpdatePollingWatermark?code=%5BfunctionKey%5D

5. Develop the Logic App implementing the Polling Consumer workflow.

Now that we have implemented the required Azure Function, we are ready to build our Logic App. Below you can see the implemented workflow. I added the following steps:

  • Recurrent trigger
  • Function action, to call the GetPollingWatermark function with the GET method and passing the query parameters.
  • Parse JSON to parse the response body.
  • Http Request action to call the HR System endpoint passing the updated-min and updated-max parameters using the PollingWatermark and NextPollingWatermark properties of the parsed JSON.
  • Call a nested Logic App. Because the Atom Feed is a batch of entries, on the nested Logic App, I will be implementing debatching using SplitOn. You can also use ForEach and send each entry to an Azure Service Bus queue or topic.
  • Function action to update the polling watermark for the next poll by calling the UpdatePollingWatermark function with the PATCH method and passing as request body the response obtained from the previous function call.


In case it’s of help, you can have a look at the code view of the Logic App. Just bear in mind that I removed some sensitive information.

Other potential scenarios

Now that you have seen how I have implemented the custom Polling Consumer Patterns on Azure Logic Apps supported by Azure Functions and Azure Storage Tables, you should be able to implement the same pattern with slight variations for your own scenarios. You might need to use a numeric or a hash polling watermark, or instead of passing it to an API, you might need to pass it to a Function, that on its turn, queries a database for which a connector with a trigger is not yet available.

Conclusion

The Polling Consumer pattern is quite common in integration projects. Logic Apps provides different trigger connectors which implement the Polling Consumer pattern out-of-the-box. However, there are scenarios in which we need to connect to a system for which there is no trigger connector available yet. In these scenarios, we need to implement a custom Polling Consumer pattern. Through this post, we have seen how to implement a custom Polling Consumer Pattern using Azure Logic Apps together with Azure Functions and Azure Storage. To do so, we created an Azure Resource Group, we created an Azure Function, we developed Azure Functions using a C# class library project, we tested the functions locally, we deployed them to Azure, and configured and tested them on Azure as well. Finally, we developed a Logic App to implement the Polling Consumer pattern.

As discussed above, you can implement the same pattern with slight variations to accommodate your own scenarios with different requirements; and I really hope this post will make your implementation easier, faster, and more fun. Do you have any other ideas or suggestions on how to implement the custom Polling Consumer Pattern? Feel free to share them below J

I hope this has been of help, and happy clouding!

Cross posted on Mexia’s Blog

My presentation at the Global Integration Bootcamp 2017 Melbourne about Logic Apps with Azure Functions and the Magic of “Serverless” Integration

I participated as a speaker in the Global Integration Bootcamp 2017, Melbourne. The bootcamp was very hands-on and participants had the chance to work with Azure API Management, Logic Apps, Azure Functions, BizTalk Server, Service Bus, and API Apps. My session was about using Logic Apps and Azure Functions to build Serverless Integration Solutions. In case you are interested, here are my slides. I hope they are helpful and you enjoy them.

 

Happy Clouding!