Article

How to implement service resiliency on EMA Consumer application

Veerapath Rungruengrayubkul
Developer Advocate Developer Advocate

Enterprise Message API (EMA) currently supports connection failover via the ChannelSet configuration parameter. It allows EMA to try reconnection to the next Channels in the ChannelSet when the connected Channel is failed.
What if, the status of service is down but the connection still is up? In this case, EMA still keeps a connection and not receive any data from the service! The current solution is to implement additional logic at the application level.
This article is going to demonstrate how to implement a service resiliency in EMA C++ Consumer application. The implementation allows EMA to continue to receive data from another ADS server once subscribed service or connection goes down.

Features overview

In addition to the service resiliency, we also utilize a concept of Primary and Backup servers where the Primary server has higher priority. The application should try to receive data from the Primary Server if possible.
Here are the detailed scenarios we are going to implement.

  • Initialize state

At initial, application connects and receives a Refresh message from both servers. Update messages will only be received from the Primary server. After that, it will handle the following scenarios to continue to receive data.

  • Failover

In case that, the subscribed service on the Primary server is down or connection to the Primary server is failed, the application will continue receiving updates from the Backup server.

  • Discover

Once the Primary server becomes available, the application will reconnect, re-subscribe data and then continue receiving Update messages from the Primary server instead.

What is Pause and Resume feature?

The Pause and Resume feature optimizes network bandwidth. You can use Pause/Resume to reduce the amount of data flowing across the network for a single item or for many items that might already be openly streaming data to a client.

This feature requires support on provider side to control data flow, so please ensure that ADS in your environment supports this feature.

To pause and resume data, the client first sends a request to pause an item to the ADS. The ADS receives the pause request and stops sending new data to the client for that item, though the item remains open and in the ADS Cache. The ADS continues to receive messages from the upstream device (or feed) and continues to update the item in its cache (but because of the client’s pause request, it does not send the new data to the client).

When the client wants to start receiving messages for the item again, the client sends a resume to the ADS, which then responds by sending an aggregated update or a refresh (a current image) to the client. After the ADS resumes sending data, the ADS sends all subsequent messages.

Please be aware that the Pause and Resume feature on ADS is only supported for data sent in Marketfeed format and for RDM Level 1 Market Price items.

How could Pause and Resume feature help?

The feature described above should have the least data gap during failover, but it consumes the highest bandwidth as the same RICs are opened on both servers. To solve this bandwidth issue, we use the Pause and Resume feature which can optimize network bandwidth on item streams opened on inactive server.

Application implementation

To implement the features, I have created a “SessionManager” class to monitor the status of service, justify which server should be active at a time. It also manages pause or resume request to ensure that updates are only received from active server.

For service status monitoring, the application needs to receive service status and connection down. Below is the implementation in detail.

  • Status of a service

This information can be retrieved via the Source Directory domain model. At the initial, the application requests Source Directory on both servers.

    	
            

if (primaryConsumer.loginHandle != NULL && backupConsumer.loginHandle != NULL)

{

//Open Directory streams for both Consumers

primaryConsumer.dirHandle = primaryConsumer.ommConsumer->registerClient(ReqMsg().domainType(MMT_DIRECTORY).serviceName(_serviceName), *this, &primaryConsumer);

backupConsumer.dirHandle = backupConsumer.ommConsumer->registerClient(ReqMsg().domainType(MMT_DIRECTORY).serviceName(_serviceName), *this, &backupConsumer);

}

The status is in payload of the Refresh and Update message. Below is the data structure of the payload. The application normally needs to follow this structure to decode the service name in “Name” element entry and service status in the “ServiceState” element entry. As the application has request Source Directory with defined serviceName, we can assume that the application receives the Source Directory for the requested service only and can skip service name decoding.

Below is the snippet code of Directory decoding.

    	
            

if (msg.getDomainType() == MMT_DIRECTORY)

       if (msg.getPayload().getDataType() == DataType::MapEnum)

       {

             //Get Map from Payload

             const Map& map = msg.getPayload().getMap();

             while (map.forth())

             {

                    const MapEntry& me = map.getEntry();

                    if (me.getLoadType() == DataType::FilterListEnum)

                    {

                           const FilterList& ftl = me.getFilterList();

                           while (ftl.forth())

                           {

                                 const FilterEntry& fe = ftl.getEntry();

                                 //Decode "SERVICE_STATE_FILTER" Filter Entry.

                                 if (fe.getFilterId() == refinitiv::ema::rdm::SERVICE_STATE_FILTER)

                                 {

                                        //Get Element List from the Filter Entry.

                                        const ElementList& el = fe.getElementList();

                                        while (el.forth())

                                        {

                                               const ElementEntry& ee = el.getEntry();

                                               EmaString name = ee.getName();

 

                                               //Decode "ServiceState" Element Entry

                                               if (name == refinitiv::ema::rdm::ENAME_SVC_STATE)

                                                     serviceState = ee.getUInt() == 1 ? true : false;

                                               //if (name == refinitiv::ema::rdm::ENAME_ACCEPTING_REQS)

 

                                               //     acceptingReq = ee.getUInt() == 1 ? true : false;

                                               }

 

                                        if (serviceState)

                                               info->isServiceUp = true;

                                        else

                                               info->isServiceUp = false;

  • Status of Connection

In general, the application can rely on the Source Directory response to verify the status of service. For example, when the connection goes up and service becomes available, the application will receive a Refresh message which contains Service State = 1.

However, in case that the connection goes down, the EMA application will not receive any Directory response which states that the service becomes unavailable. We need to use the Login status message instead to verify the connection down, and internally mark service status as down.

The status for Connection down is:

  • StreamState = OmmState.Open
  • DataState = OmmState.Suspect

Below is the snippet code of the Login Status check.

    	
            

    if (state.getStreamState() == OmmState::OpenEnum)

{

if (state.getDataState() == OmmState::SuspectEnum)

info->isServiceUp = false;

}

At this point, the application should have enough information to handle almost scenarios for service status change. Next, the application implements an internal logic to decide which server should be active, and then use the Pause and Resume feature to control which server should provide Updates.

Handle change in Service status event

    	
            

void SessionManager::handleEvent()

{

//Discover to main Consumer

if (primaryConsumer.isServiceUp == true && isActive(&backupConsumer))

{

cout << "Discover to main Consumer" << endl;

...

}

//failover to backup Consumer

else if ((primaryConsumer.isServiceUp == false && backupConsumer.isServiceUp == true) && isActive(&primaryConsumer))

{

cout << "Failover to backup Consumer" << endl;

...

}

}

Define new Active server

    	
            

void SessionManager::defineActiveConsumer()

{

if (primaryConsumer.isServiceUp)

{

//Discover case

activeConsumer = &primaryConsumer;

inactiveConsumer = &backupConsumer;

}

else if (backupConsumer.isServiceUp)

{

//Failover case

activeConsumer = &backupConsumer;

inactiveConsumer = &primaryConsumer;

}

else

{

//Primary always be active.

activeConsumer = &primaryConsumer;

inactiveConsumer = &backupConsumer;

}

}

Pause and Resume usage

An EMA consumer application can request to pause an individual item stream by issuing RequestMsg with the pause flag set. Moreover, it also can request to pause all item streams opened by an OmmConsumer instance by issuing the same RequestMsg with Login stream’s handle.

To resume data flow on a stream a consumer application can issue a subsequent RequestMsg with the interestAfterRefresh flag set.

In this article, we use the Pause all and Resume all to control data flow when an active server is changed to be inactive and vice versa. Application needs to issue Pause all on a previous active server, and then issue Resume all on a new active server to start providing updates.

Please note that with this method, the tick by tick update can be lost during server switching.

    	
            

void SessionManager::handleEvent()

{

//Discover to main Consumer

if (primaryConsumer.isServiceUp == true && isActive(&backupConsumer))

{

cout << "Discover to main Consumer" << endl;

//Pause Backup's stream and resume Primary's Stream

backupConsumer.ommConsumer->reissue(ReqMsg().domainType(MMT_LOGIN).initialImage(false).pause(true).name("user"), backupConsumer.loginHandle);

primaryConsumer.ommConsumer->reissue(ReqMsg().domainType(MMT_LOGIN).initialImage(false).interestAfterRefresh(true).name("user"), primaryConsumer.loginHandle);

}

//failover to backup Consumer

else if ((primaryConsumer.isServiceUp == false && backupConsumer.isServiceUp == true) && isActive(&primaryConsumer))

{

cout << "Failover to backup Consumer" << endl;

//Pause Primary's stream and resume Backup's Stream

primaryConsumer.ommConsumer->reissue(ReqMsg().domainType(MMT_LOGIN).initialImage(false).pause(true).name("user"), primaryConsumer.loginHandle);

backupConsumer.ommConsumer->reissue(ReqMsg().domainType(MMT_LOGIN).initialImage(false).interestAfterRefresh(true).name("user"), backupConsumer.loginHandle);

}

}

We also use individual Pause for a new item request to an inactive server to ensure that there is no update received from the inactive server. 

    	
            

UInt64 SessionManager::registerClient(ReqMsg& reqMsg, OmmConsumerClient& client, void* closure)

{

UInt64 handle1, handle2, keyHandle;

 

//Normal request for Active Consumer

handle1 = activeConsumer->ommConsumer->registerClient(reqMsg.pause(false), client, closure);

//Pause request for Inactive Consumer 

handle2 = inactiveConsumer->ommConsumer->registerClient(reqMsg.pause(true), client, closure);

//Keep Item handle for further usage (i.e. unregisterClient call)

if (activeConsumer == &primaryConsumer)

{

itemHandleList.insert(ItemHandleList::value_type(handle1, handle2));

keyHandle = handle1;

}

else

{

itemHandleList.insert(ItemHandleList::value_type(handle2, handle1));

keyHandle = handle2;

}

return keyHandle;

}

Environment Setup

Below configurations on ADS servers need to be setup.

  • Both Primary and Backup ADS servers need to have the same service name and Market Price domain support. User also has the same permission to access items.
  • Enable Pause and Resume feature Enable the pause and resume support in ADS by setting the parameter supportPauseResume to True and the domainsCached parameter to MARKET_PRICE.
    	
            

*ads*supportPauseResume : True

*<ServiceName>*domainCached: MARKET_PRICE

How to run the application

The application has been provided/tested in Visual Studio 2015 version and tested with Refinitiv Real-Time SDK C++ 1.3.1.L1 version.

  1. Download the application from Github and Refinitiv Real-Time SDK C++ package from the downloads page.
  2. Extract the application and Enterprise Message API C++ packages.
  3. Create "EMAInstall" variable in the Windows Environment Variables and set the value to the installed path of Refinitiv Real-Time SDK package (i.e. C:\RTSDK-2.0.1.L1.win.rrg)
  4. The user name and service name used in the application are hard-code to "user" and "DIRECT_FEED" respectively. You may need to change them to work with your environment.
  5. Compile the application.
    • The RDMFieldDictionary and enumtype.def files will be copied to from "$(EmaInstall)\Cpp-C\etc\" to the executable application's folder. 
    • The EmaConfig.xml will be copied to the executable application's folder.
  6. Configure 2 Consumers; “Consumer_1” and “Consumer_2” under ConsumerList and 2 Channels to the setup ADS servers under ChannelList in the EMAConfig.xml file under the executable application's folder. 
    	
            

<ChannelGroup>

<ChannelList>

<Channel>

<Name value="Channel_1"/>

<ChannelType value="ChannelType::RSSL_SOCKET"/>

<CompressionType value="CompressionType::None"/>

<GuaranteedOutputBuffers value="5000"/>

<Host value="<Primary Server IP>"/>

<Port value="14002"/>

</Channel>

<Channel>

<Name value="Channel_2"/>

<ChannelType value="ChannelType::RSSL_SOCKET"/>

<CompressionType value="CompressionType::None"/>

<GuaranteedOutputBuffers value="5000"/>

<Host value="<Backup Server IP>"/>

<Port value="14002"/>

</Channel>

7. Run the application. You will see that the application receives updates from only Consumer_1 which is the primary server. (in this demo, the primary ADS server’s and backup ADS server’s IP are 192.168.27.53 and 192.168.27.57 respectively).

8. Once the subscribed service on the primary server goes down, the application will failover to receive update from backup ADS server.

    	
            

Update

Host: 192.168.27.53

Handle: 86666624 Closure: 0000000000000000

Item Name: EUR=

Service Name: DIRECT_FEED

Fid: 22 Name: BID value: 0.01

 

Host: 192.168.27.53

Handle: 86666624 Closure: 0000000000000000

Item Name: EUR=

Service Name: DIRECT_FEED

Item State: Open / Suspect / None / 'A23: Service has gone down. Will recall when service becomes available.'

 

active consumer: Consumer_1_1

Failover to backup Consumer

 

Update

Host: 192.168.27.57

Handle: 86666928 Closure: 0000000000000000

Item Name: EUR=

Service Name: DIRECT_FEED

Fid: 22 Name: BID value: 0.01

 

Update

Host: 192.168.27.57

Handle: 86666928 Closure: 0000000000000000

Item Name: EUR=

Service Name: DIRECT_FEED

Fid: 22 Name: BID value: 0.01

9. Once the service on Primary ADS server go up, the application to discover back to receive updates from Primary ADS server.

    	
            

Received directory

new Service status: 1

 

 

active consumer: Consumer_2_2

Discover to main Consumer

 

Refresh

Host: 192.168.27.53

Handle: 86666624 Closure: 0000000000000000

Item Name: EUR=

Service Name: DIRECT_FEED

Item State: Open / Ok / None / 'All is well'

Fid: 22 Name: BID value: 0.01

 

Update

Host: 192.168.27.53

Handle: 86666624 Closure: 0000000000000000

Item Name: EUR=

Service Name: DIRECT_FEED

Fid: 22 Name: BID value: 0.01

Conclusion

At the end of this article, you should be able to get the concept of monitoring Server and Connection Status and data flow control using the Pause and Resume feature. You can apply this concept to your EMA application to improve the resiliency of the streaming connection or implement your own failover logic. Even, the code is for EMA C++, you can apply this concept on EMA Java Application.

Reference