Introduce how to implement a custom domain for binary blob

Sometimes, there is a scenario that the existing Thomson Reuters’ Domain Message Models are not sufficient to support new requirements because they are out-of-the-box and there are no other existing data models that could serve these purposes.

If you are wondering whether there is a solution regarding the objective mentioned above. Here, this article demonstrates how to realize this idea and empower the EMA capability to fulfill special business logic for you.

Objective

We demonstrate how to create an application that supports a new custom domain. This domain allows a consumer to list and retrieve all available files maintained by a provider. The provider maintains a list of files. The consumer uses this to get a file. Access to the file is controlled by the DACS via the TREP component.

Prerequisite

This topic requires familiarity with EMA Interactive Provider and Consumer communication via the TREP model, message encoding/decoding procedure, Reuters Domain Model – RDMs especially Symbol List, and DACS. We shall start by reviewing some background concepts first.

Step by step

This article consists of three major steps:

  • Step 1: Define Domain Model – This first step explains information structures that are used to encapsulate data and content across the system
  • Step 2: Implement applications – The next step moves to the end-to-end communication between the Provider and the Consumer including source code walkthrough
  • Step 3: Setup DACS and TREP with Permissioning Configurations – This last step focuses on the intermediate components that control an access to content

Step1: Define Domain Model

Elektron SDK and TREP APIs use Open Message Model (OMM) as logical representations of data constructs or messages in communication or transport between Consumer and Provider. The layout and interpretation of any specific arrangement of data structures are referred to as a Domain Message Model (DMM). There are two types of DMM as follows:

  1. Reuters Domain Models (RDMs) are defined by Thomson Reuters for data and used within the Thomson Reuters products, such as  Login, Source Directory, Dictionary, and MarketPrice domains
  2. User Defined Domain Model (a.k.a. a Custom Domain) is defined by parties other than Thomson Reuters

Normally, users should consider using the existing RDMs to express the data, rather than defining a new custom DMM. In case that the RDMs do not serve the data attribute, users can create their own Custom domain model. In this article, we demonstrate how to define a new custom domain and implement applications (Interactive Provider and Consumer) to support this custom domain. The applications in this article provide and consume two types of content:

  1. List of files available on the provider

The consumer can send a request to the provider to get a list of available files maintained by the provider. We apply the existing SymbolList Reuters domain model to support this content. Typically, the payload of SymbolList domain is Map of FieldList where MapEntry’s key is used for symbol and the FieldList in MapEntry’s payload provide additional information about the symbol. However, the provider adopts the SymbolList domain by encoding the file name in MapEntry’s key and uses the field PARCL_SIZE for the file size. The following figure represents a sample of a response message and payload.

  1. Binary file data

The current RDMs aren’t designed for the binary data because they do not support large size of data. To be specific, existing RDMs deliver data in OMM structures i.e. Map of FieldList (MarketByPrice, MarketByOrder), FieldList (MarketPrice) whose entry only support data size up to 65535 bytes. Therefore, we create a new custom domain to support large size of binary data.

 

Below are the details of the new Custom Domain Model.

  • Domain type – 129

Domain type allows a range from 0 to 255, where Thomson Reuters-defined values are between 0 and 127 and user-defined values are between 128 and 255.

  • Message type – Request, Refresh, and Status

Request Message - Consumers use a RequestMsg to express interest in a new stream typically results in the delivery of a RefreshMsg or StatusMsg.

Refresh Message - The Interactive Provider can use this message to respond to a consumer’s request.

Status Message - A provider uses a StatusMsg to close a stream and to indicate successful establishment of a stream when there is no data to convey.

  • Request type – Snapshot

Content in files is static. Thus, the snapshot request (a.k.a. non-streaming request) is suitable for this scenario. The behaviors of the snapshot request include the data stream is considered closed after the full response of data (possibly delivered in multiple parts) is sent to the consumer, the response contains all current information, and the stream is considered complete as soon as the data is complete.

  • Multi-part message

RefreshMsg supports splitting payload content across multiple message parts. The purpose of splitting data into smaller pieces is to avoid message size limitation. For example, TREP currently only supports message size up to 64k Bytes. Consumer application takes responsibility for combining payload of multi-part refresh message.

  • Payload data Type – Opaque data

This data type is used for opaque content. In this scenario, it is a binary content of the requested file and the consumer application just writes the opaque data to a file.

Step2: Implement applications

After visualizing how the data should be, we create the EMA Interactive Provider and EMA Consumer application to demonstrate the scenario. Both applications can connect to each other via the TREP components to publish and consume the following data types defined in the first step:

  1. Symbol List to list all available files
  2. The binary blob content

Interactive Provider

The Provider application’s workflow is similar to the basic Interactive Provider example provided in the EMA package. The Interactive Provider generally needs to open a port to accept connection from an ADH or Consumer by creating an OmmProvider object with a specified port number. Then, the provider needs to control the Directory response to add the list of supported domains (SymbolList and Custom domain) in the Service Info’s capabilities.

To receive request messages from the TREP components or a Consumer, the application needs to create a new class derived from the OmmProviderClient and define a call-back function, onReqMsg, to process incoming request messages. In the main function, the application will continue running for 600 seconds.

void AppClient::onReqMsg( const ReqMsg& reqMsg, const OmmProviderEvent& event )
{
    ...
}

int main( int argc, char* argv[] )
{
    AppClient appClient;
    OmmProvider provider( OmmIProviderConfig().adminControlDirectory( OmmIProviderConfig::UserControlEnum ).port("14002"), appClient );

    for (Int32 i = 0; i < 600; i++)
    {
       sleep(1000);
    }
}           

Then, the application is implemented to handle Login, Directory, SymbolList and Custom domains. The value of MMT_CUSTOM (129) has been defined in the application for the new custom domain.

#define MMT_CUSTOM 129

 

Void AppClient::onReqMsg( const ReqMsg& reqMsg, const OmmProviderEvent& event )
{
       switch ( reqMsg.getDomainType() )
       {
       case MMT_LOGIN:
             processLoginRequest( reqMsg, event );
             break;

       case MMT_DIRECTORY:
             processDirectoryRequest( reqMsg, event );
             break;

       case MMT_SYMBOL_LIST:
             processSymbolListRequest(reqMsg, event);
             break;

       case MMT_CUSTOM:
             processCustomRequest(reqMsg, event);
             break;

       default:
             processInvalidItemRequest( reqMsg, event );
             break;
       }
}

 

For Login request, the application just provides a Login Refresh response with Open Stream State and Ok Data State to accept the Consumer’s connection.

For the Source Directory request, the application sends a Directory response with a service supporting both SymbolList and Custom domain (129) capabilities.

void AppClient::processDirectoryRequest(const ReqMsg& reqMsg, const OmmProviderEvent& event)
{
       Event.getProvider().submit(RefreshMsg().domainType(MMT_DIRECTORY).filter(SERVICE_INFO_FILTER | SERVICE_STATE_FILTER).

             payload(Map().
                    addKeyUInt(2, MapEntry::AddEnum, FilterList().
                           add(SERVICE_INFO_ID, FilterEntry::SetEnum, ElementList().
                                 addAscii(ENAME_NAME, "INTERNAL_FEED").
                                 addArray(ENAME_CAPABILITIES, OmmArray().
                                        addUInt(MMT_SYMBOL_LIST).
                                        addUInt(MMT_CUSTOM).
                                        complete()).
                                 addArray(ENAME_DICTIONARYS_USED, OmmArray().
                                        addAscii("RWFFld").
                                        addAscii("RWFEnum").
                                        complete()).
                                 complete()).
                           add(SERVICE_STATE_ID, FilterEntry::SetEnum, ElementList().
                                 addUInt(ENAME_SVC_STATE, SERVICE_UP).
                                 complete()).
                           complete()).
                    complete()).complete().solicited(true), event.getHandle());
}

 

For the SymbolList request, if the item name is “FILE_LIST”, the application will provide a Symbol List response which contains a list of files (including their names and sizes) in the message payload. Otherwise, the application responds with a Closed Status message indicating that only the “FILE_LIST” item is supported.

A snippet code of the code for symbol encoding is below.

void AppClient::processSymbolListRequest(const ReqMsg& reqMsg, const OmmProviderEvent& event)
{
       ...

       //List files in the "Files" folder.
       std::vector<FileInfo>* fileList = ListFile();

       Map map;
       // Iterate and print values of vector
       for (FileInfo n : *fileList)
       {
             //PARCL_SIZE
             map.addKeyAscii(n.filename.c_str(), MapEntry::AddEnum, FieldList().addRealFromDouble(1351, n.filesize).complete());
       }
       map.complete();

      event.getProvider().submit(RefreshMsg().domainType(MMT_SYMBOL_LIST).name(reqMsg.getName()).serviceName(reqMsg.getServiceName()).solicited(true).
             state(OmmState::OpenEnum, OmmState::OkEnum, OmmState::NoneEnum, "Refresh Completed").
             payload(map).complete(), event.getHandle());
       ...
}

 

To handle the custom domain request, the application opens the file name in the request message and then encodes binary data to the payload of the refresh message. However, if the file is very large, it will split the file’s data into smaller parts, and use multi-part refresh messages to convey the data. The last part of the refresh message contains a complete flag. However, if the file is unavailable, the application will provide a status message with the closed stream state.

It is possible to configure TREP to cache the retrieved data for all domains including the custom domain. However, this can cause errors if TREP is unable to decode the data. To avoid this issue, the DoNotCache flag should be set to true in the Refresh messages of the custom domain.

A snippet code of the code for binary data encoding is below.

void AppClient::processCustomRequest(const ReqMsg& reqMsg, const OmmProviderEvent& event)
{
       ...
             while (!inputFile.eof())
             {
                    RefreshMsg refreshMsg;

                    int currentIndex = inputFile.tellg();
                    // read data as a block:
                    inputFile.read(buffer, size);

                    //Verify whether this message is the last part.
                    if (!inputFile.eof())
                    {
                           //partial refresh
event.getProvider().submit(refreshMsg.doNotCache(false).domainType(MMT_CUSTOM).name(reqMsg.getName()).serviceName(reqMsg.getServiceName()).solicited(true).
state(OmmState::OpenEnum, OmmState::OkEnum, OmmState::NoneEnum, "Partial Refresh").complete(false).partNum(partN).
                                 payload(OmmOpaque().set(EmaBuffer(buffer, size)))
                                 , event.getHandle());
                    }
                    else
                    {
                           //complete refresh                           
event.getProvider().submit(RefreshMsg().doNotCache(false).domainType(MMT_CUSTOM).name(reqMsg.getName()).serviceName(reqMsg.getServiceName()).solicited(true).
state(OmmState::NonStreamingEnum, OmmState::OkEnum, OmmState::NoneEnum, "Refresh Complete").complete(true).partNum(partN).
                                 payload(OmmOpaque().set(EmaBuffer(buffer, length- currentIndex)))
                                 , event.getHandle());
                    }
                    partN++;
             }

 

 

Consumer

The following steps represent how to implement the consumer application to consume the binary data from the provider application.

First, the Consumer application creates an OmmConsumer with an ADS hostname and DACS username to establish a connection to the TREP component.  Then, it requests a list of available files by using a SymbolList domain request and then waits for a response. The SymbolList request is a snapshot request where the interestAfterRefresh is set to false.

int main( int argc, char* argv[] )
{
       try {
             AppClient client;
             OmmConsumer consumer( OmmConsumerConfig().host( "<ads host>:14002" ).username( "user01") );
             UInt64 handle = consumer.registerClient( ReqMsg().domainType( MMT_SYMBOL_LIST ).interestAfterRefresh(false).serviceName("INTERNAL_FEED").name( "FILE_LIST" ), client );



Next, the value of MMT_CUSTOM has been defined in the application to support the custom domain. It is the same number used by the provider application.

#define MMT_CUSTOM 129



To receive response messages from the TREP components, the application needs to create a new class derived from the OmmConsumerClient and define call-back functions (onRefreshMsg, onUpdateMsg, and onStatusMsg) to process incoming response messages. After receiving the response messages via the callback functions, the application verifies the response’s domain type and payload’s data type, before decoding data.

void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& evt)
{
...
       if (refreshMsg.getDomainType() == MMT_SYMBOL_LIST && DataType::MapEnum == refreshMsg.getPayload().getDataType())
             decode(refreshMsg.getPayload().getMap());
       else if (refreshMsg.getDomainType() == MMT_CUSTOM && DataType::OpaqueEnum == refreshMsg.getPayload().getDataType())
             decode(refreshMsg.getPayload().getOpaque(), refreshMsg.getComplete(), static_cast<std::ofstream*>(evt.getClosure()));
}

 

For the SymbolList response, the payload is a map of field list. The application decodes the payload to retrieve the file names from the Map Entry keys, and the file sizes from the PARCL_SIZE (1351) fields in Map Entry’s payload. This application also stores the first file name and its size to be requested via the custom domain.

After that, the application sends a snapshot request message with the custom domain for the first file in the list. It also creates the file in a local directory with the same name as the requested item and passes the file stream object as a closure. The closure can be retrieved from all events which relate to this registerClient() call. This is helpful for the application to refer to the file object when processing the events.

int main( int argc, char* argv[] )
{
       try {
             ...
             //Waiting until the list is received
             while (client.fileName == "")
                    sleep(1000);

             //Open file for writing data
              client.myFile.open(client.fileName.c_str(), ios::out | ios::binary);

             handle = consumer.registerClient(ReqMsg().domainType(MMT_CUSTOM).interestAfterRefresh(false).serviceName("INTERNAL_FEED").name(client.fileName), client, &(client.myFile));

             sleep( 60000 );                         // API calls onRefreshMsg(), onUpdateMsg(), or onStatusMsg()

 

The next step is processing the responses of the custom domain which contains the binary blob data. The application retrieves data from the payload of the refresh messages and then writes it to the file stream. If the Refresh message contains the complete flag which means it is the final refresh, the application will close the file stream.

void AppClient::decode(const OmmOpaque& op, bool isComplete, std::ofstream* file)
{
       EmaBuffer buffer = op.getBuffer();
       file->write(buffer.c_buf(), buffer.length());

       if (isComplete)
       {
             int size = file->tellp();
             if (size == filesize)
                    cout << "Successfully receive complete data, size: " << filesize << endl;
             else
                    cout << "Fail to receive complete data.";

             file->close();
       }
}

 

Step3: Setup DACS with TREP and Permissioning Configuration

Once the end-to-end application can work together properly in the previous step, this final step introduces Data Access Control System (DACS) which is a TREP value-added component that provides client administrators with the ability to manage their exchange fee liabilities and information charges. Data administrators can control user access to information vendors, products, exchanges, and specialist data services, as well as internally published data.

Thomson Reuters Enterprise Platform (TREP) can enable DACS permissioning to control user access to data distributed via the platform. Once a connection is established with TREP, the application supplies a username in the Login request message. TREP automatically authenticates the supplied user with DACS. Depending on the type of permissioning that has been configured, data access requests will be controlled based on whether the user is permissioned for the data.

In this article, DACS is enabled on the ADS component to control user access for data subscriptions. The following parameter is used for DACS enabled on ADS.

*ads*dacs*featureEnabled: True

  Below is the TREP and DACS components’ connection diagram

In this article, we use ADH and ADS to distribute data from a source application to a consumer application and use a DACS component to control access to the data.

Configure DACS Service permissioning

The following steps show how to setup the DACS server to control access to a new service.

  • Create Vendor Service

Normally, Data can either be provided by information vendors such as Thomson Reuters or can be internally generated. Since a vendor may offer more than one type of delivery service, DACS supports the concept of a vendor service (e.g. Reuters). For internally published data, a new Vendor and Vendor Service should be created. The DACS administrator can create the new Vendor and Service in the “Vendor Services” menu. In this article, we create a new vendor called INTERNAL and add an INTERNAL_FILE_SERVICE vendor service into this vendor.

 

  • Add service name and ID used in TREP to the DACS Vendor service.

The DACS administrator needs to add service name and ID to the DACS system via the DACS UI. The service ID can be found using the adsmon tool. Below is the screenshot after adding the INTERNAL_FEED service into the INTERNAL_FILE_SERVICE vendor service. The INTERNAL_FEED service is the service provided by the interactive provider application.

  • Define the type of permissioning

DACS supports content-based entitlements (CBE) and subject-based entitlements (SBE).

  • CBE means that permission checks are performed against the actual contents of the data item. For example, every Elektron real-time data contains a field with an entitlement code (also known as a PE). DACS uses this entitlement code, along with vendor supplied mapping tables, to verify whether a user has permission to access that data item
  • SBE means that the actual subject of the message (as specified in the message name) is used to determine access. The defined subject support the “*” wildcard character. The “*” wildcard can be used to indicate a range of names; e.g. A* would include AA, ABB, ABC, etc. The “?” wildcard can be used to represent a single character.

CBE needs a deep understanding of the content and how the entitlement code is mapped to it, so it could be easier and more manageable to use SBE for the internally published data. We decide to use SBE permissioning to determine access to the binary blob. To demonstrate the SBE permissioning, we have created different types of files in the following format.

<Feed/Source>.<Exchange/Vendor>.<File Name>.<File Extension>

For example,

  1. TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
  2. TRTH.LSE.MARKETPRICE.20180704.REPORT.txt
  3. TRTH.NYS.MARKETPRICE.20180704.DATA.csv.gz
  4. TRTH.HKG.MARKETBYORDER.20180704.DATA.csv.gz

               To control access for users to exchange/vendor, we define the subservices in DACS system as follows.

Sub Vendor Service Item name
TRTH.LSE TRTH.LSE.*.*
TRTH.NYS TRTH.NYS.*.*

The subservices for SBE permissioning can be defined in the “Database->Create->Subject Based Permissioning” menu in the DACS UI. The followings are the subservices used in this article.              The “TRTH.LSE” and “TRTH.NSE” sub vendor services should allow access to all files starting with “TRTH.LSE” and “TRTH.NYS” respectively. If the name does not match any sub vendor service, the item will be denied. For example, with this defined permissioning, the access to all files starting with “TRTH.HKG” is always denied. This permissioning applies to all item requests within the server, so the “FILE_LIST” SymbolList item needs to be defined as well.

Next, we assign the subservices to each user. For example, the below screen shows that the user01 is allowed to access the FILE_LIST and TRTH.LSE subservices but it is not allowed to access the TRTH.NYS subservice. 

Workflow Diagram

The following diagram describes the communication workflow among the Provider application, TREP, the DACS system, and the Consumer application.

How to run example applications

  1. Download the source codes from this link
  2. Download the ElektronSDK 1.2 package from this link and then extract the package
  3. Create an “EMAInstall” Windows environment variable for the location of the Elektron SDK, such as C:\API\Elektron-SDK1.2.0.win.rrg
  4. Modify the “INTERNAL_FEED” service in the “Consumer.cpp” and “IProvider.cpp” files to the service configured in the TREP components and DACS. Then, modify the ADS’s host name, port and DACS user ID in “Consumer.cpp”
int main( int argc, char* argv[] )
{
    try {
        AppClient client;
        OmmConsumer consumer( OmmConsumerConfig().host( "<ADS hostname>:14002" ).username( "user1" ) );

 

  1. Compile both the IProvider_CustomDomain and Consumer_CustomDomain examples
  2. Add binary files in the “Files” folder of the IProvider_CustomDomain running directory. The file name format should be <Feed/Source>.<Exchange/Vendor>.<File Name>.<File Extension>
  3. Configure the DACS system to allow access to the files.
  4. Run IProvider_CustomDomain first.
  5. Verify if the service is up on ADS.
  6. Run Consumer_CustomDomain. Below is an output sample.
DomainType: 10
Item Name: FILE_LIST
Service Name: DIRECT_FEED
Item State: Non-streaming / Ok / None / 'Refresh Completed'
Completed: 1

File NameFile Size
TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz103579 Bytes
TRTH.LSE.MARKETPRICE.20180704.REPORT.txt64765 Bytes
TRTH.NYS.MARKETPRICE.20180704.DATA.csv.gz5.12363e+06 Bytes

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Partial Refresh'
Completed: 0

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Partial Refresh'
Completed: 0

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Partial Refresh'
Completed: 0

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Partial Refresh'
Completed: 0

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Partial Refresh'
Completed: 0

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Partial Refresh'
Completed: 0

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Partial Refresh'
Completed: 0

DomainType: 129
Item Name: TRTH.LSE.MARKETPRICE.20180704.DATA.csv.gz
Service Name: DIRECT_FEED
Item State: Open / Ok / None / 'Refresh Complete'
Completed: 1
Successfully receive complete data, size: 103579

 

 

Summary

In this article, we begin with the concept of Open Message Model (OMM) which is a flexible, logical idea of message structure. A designated structure that is designed for a specific Market Data purposes, is named a Data Domain Model (DMM). Thomson Reuters provides common domain models that are generally used within Thomson Reuters products, Reuters Domain Models (RDMs). In addition, the third party other than Thomson Reuters can also define their own domain models as well, called User Defined Domain Models (or Custom Domains).

Next, we implement and configure each element of the communication: the Provider, the consumer, and TREP with DACS respectively. The provider is a server that hosts files and provides a list of file names or file’s content as requested. The consumer sends request messages to the provider via TREP to retrieve the list of available files and the file’s content.

Then, we demonstrate how to control authorization and authentication via TREP by using DACS. In this article, we use the subject-based entitlements (SBE) to manage permissioning which literally relies on the subject name (i.e. the item name) of the request message and the DACS settings. As SBE is just one of the permissioning systems that can be integrated into this workflow, you can freely adapt to any other preferred permissioning method to serve your particular requirement.

Hopefully, this article is useful to you as an idea of creating a new data model, to support any data transmissions with enhanced business information.