Elektron SDK - C/C++

API Family: Elektron

EMA Consumer - Posting data to TR Contribution Channel

Download tutorial source code

Click here to download

Last update September 2018
Environment Windows, Linux
Compilers Refer to the EMA Compiler Guides for a complete list
Prerequisite Downloaded, installed, compiled and ran an EMA consumer example

Posting Directly to the Thomson Reuters Contribution Channel

Knowledge Prerequisite – Must have an understanding of an Elektron Message API and be familiar with consuming OMM Market Price data. You should also have a basic understanding of EMA Configuration  (see the end of this article for links to relevant tutorials)

Contributing Data to Thomson Reuters Elektron Feed

Currently, clients wishing to Contribute data to Thomson Reuters, typically do so using on-site contribution systems such as MarketLinkIP.

In order to meet the evolving needs of the market, Thomson Reuters has developed a new Contribution service - the Thomson Reuters Contribution Channel (TRCC). The Contributions Channel is provided through a cluster of virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto Elektron.

Whilst it will be possible in the near future to contribute to TRCC via a Thomson Reuter Enterprise Platform (TREP) system, some of our clients use 3rd party systems and will, therefore, need to contribute directly to TRCC.

To contribute directly to TRCC you will need to develop an OMM Consumer application using one of our Elektron APIs - i.e. Elektron Transport API (ETA) or Elektron Message API (EMA).

Now, EMA supports both of Windows and Linux platform. However, it must connect to an associated server OS host only. Otherwise, the application will encounter an "Initialization timed out" error message.

This article focuses on the process you will need to implement using EMA and is summarized as follows:

  1. Establish an Encrypted Connection to the server
  2. Open a Tunnel Stream
  3. Perform a Client Login
  4. Contribute data using Post Messages

For the purpose of this tutorial, I will use the EMA supplied example 341__MarketPrice__OffStreamPost as the starting point for tutorial code as I will also be using off stream Post Messages to contribute data to TRCC.

Establish Encrypted Connection to TRCC server

As with any OMM Consumer application, one of the first things you would do is connect to and login to the server.

int main( int argc, char* argv[] )
{
	try {
		AppClient client;
		OmmConsumer consumer( OmmConsumerConfig().username( "rmds" ));
		client.setOmmConsumer( consumer );
		consumer.registerClient( ReqMsg().domainType( MMT_LOGIN ), client);

		sleep( 60000 );			// API calls onRefreshMsg(), onUpdateMsg(), or onStatusMsg()
	}
	catch ( const OmmException& excp ) {
		cout << excp << endl;
	}

	return 0;
}

In the snippet above I have removed some of the code from the 341 example which is not relevant for the purposes of this tutorial. The rest of the code remains much the same - except for specifying my username of 'test'.   

The above code results in the API loading the default configuration, attempting to establish a connection and performing a Device Login with the provided username.    

It also creates an instance of callback client class and registers it to receive any Device Login related events/messages.     

I mentioned earlier that I need to establish an encrypted connection - but there is no indication of this in the code above. This is because the connection type is specified in the configuration - and for this example, I will be using the EmaConfig.xml file (as opposed to programmatic config).    

<ConsumerGroup>
    <DefaultConsumer value="Consumer_1"/>
    <ConsumerList>
        <Consumer>
            <Name value="Consumer_1"/>
            <Channel value="Channel_1"/>
...
...
        </Consumer>
    </ConsumerList>
    </ConsumerGroup>
<ChannelGroup>
...
...
<ChannelGroup>
    <ChannelList>
        <Channel>
            <Name value="Channel_1"/>
            <ChannelType value="ChannelType::RSSL_ENCRYPTED"/>
            <Host value="<TRCC_SERVER_HOST_NAME>"/>
            <Port value="443"/>
...
...
        </Channel>
	</ChannelList>
</ChannelGroup>

As you can see I have changed the Channel Type to RSSL_ENCRYPTED (from the default RSSL_SOCKET) and the Port to 443 (from the default 14002). I have also specified the hostname for my TRCC server.

These are the minimum changes required for an encrypted connection to a TRCC server. You will need to confirm the Host and Port value with your internal Market Data team or your Thomson Reuters Account manager.

Open a Tunnel Stream

I could also go ahead and attempt to open a Tunnel Stream in the main() method. However, I think it is better practice to wait for a successful Device Login response - no point attempting a Tunnel Stream if the Device Login fails.   

Earlier I created an instance of the callback client AppClient and registered it to receive the Device Login related events. Below is an example of Device Login response received by the onRefreshMsg() callback handler of this client:

AppClient::OnRefreshMsg :: RefreshMsg
    streamId="1"
    domain="Login Domain"
    Solicited
    RefreshComplete
    ClearCache
    state="Open / Ok / None / 'Refresh Completed'"
    itemGroup=""
    name="test"
    nameType="1"
    Attrib dataType="ElementList"
        ElementList
            ElementEntry name="SingleOpen" dataType="UInt" value="1"
            ElementEntry name="AllowSuspectData" dataType="UInt" value="1"
...
...
            ElementEntry name="ApplicationId" dataType="Ascii" value="256"
            ElementEntry name="ApplicationName" dataType="Ascii" value="ema"
            ElementEntry name="Position" dataType="Ascii" value="127.0.0.1/net"
        ElementListEnd

    AttribEnd
RefreshMsgEnd

The response shows that my Device Login request was accepted and the Stream is Open etc. 

A Tunnel Stream is a private stream that supports additional functionalities such as authentication, flow control, guaranteed delivery etc - which can be specified on a per stream basis.

So, I am going to modify my onRefreshMsg() handler method to create the Tunnel Stream once it receives a valid Device Login response:

void AppClient::onRefreshMsg(const RefreshMsg& refreshMsg, const OmmConsumerEvent& event)
{
    try{
	
	cout << "AppClient::OnRefreshMsg :: " 
	    << refreshMsg << endl; // defaults to refreshMsg.toString()
			
	// If Device Login accepted & stream Opened, then open Tunnel Stream 
	if (refreshMsg.getDomainType() == MMT_LOGIN &&
		refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
		refreshMsg.getState().getDataState() == OmmState::OkEnum)
	{
		CosAuthentication cosAuthentication;
		cosAuthentication.type(CosAuthentication::NotRequiredEnum);

		CosDataIntegrity cosDataIntegrity;
		cosDataIntegrity.type(CosDataIntegrity::ReliableEnum);

		CosFlowControl cosFlowControl;
		cosFlowControl.type(CosFlowControl::BidirectionalEnum)
		    .recvWindowSize(12288)
		    .sendWindowSize(1200);

		CosGuarantee cosGuarantee;
		cosGuarantee.type(CosGuarantee::NoneEnum);

		ClassOfService cos;
		cos.authentication(cosAuthentication)
		    .dataIntegrity(cosDataIntegrity)
		    .flowControl(cosFlowControl)
		    .guarantee(cosGuarantee);

		TunnelStreamRequest tsr;
		tsr.classOfService(cos)
		    .domainType(MMT_SYSTEM)
		    .name("Seikan")
		    .serviceName("DDS_TRCC");

		_tunnelStreamHandle = _pOmmConsumer->registerClient(tsr,*this);
	}
...
}

In the above code snippet, the first thing I check for is that I have received a successful MMT_LOGIN response. If this is the case then I define the Class of Service I require from my Tunnel Stream - in terms of Data Integrity, Authentication, Flow Control, and Guarantee. Exploration of these various properties is outside the scope of this document - suffice to say the above values are appropriate for this tutorial.

I then create a Tunnel Stream request, providing an arbitrary name of 'Seikan' for the Tunnel Stream and the service name of 'DDS_TRCC'. The service name will need to be obtained from your Market Data team or TR Account Manager. Note also, that I use a Domain type of MMT_SYSTEM - which as the name suggests is a System domain for content-neutral requests (i.e. non-data domains). 

Finally, I register this same AppClient instance as the callback client to receive events related to the Tunnel Stream and store the Tunnel Stream handle for later reference.

Perform a Client Login

When the Tunnel Stream has been established, I should get back a Status Message with a domain Type of MMT_SYSTEM and Stream State of Open - an example of which is shown below:   

AppClient::OnStatusMsg :: StatusMsg
    streamId="5"
    domain="System Domain"
    PrivateStream
    ClearCache
    state="Open / Ok / None / ''"
    name="Seikan"
    serviceId="10"
    serviceName="DDS_TRCE"
StatusMsgEnd

The above response is handled and output by the onStatusMsg method of my AppClient class - since I registered this same instance to receive any Tunnel Stream related events. 

Once I get the above response, I can then perform a Client Login to the TRCC server with my TRCC Client credentials - which consist of a TRCC username and an obfuscated password - both of which will be supplied by your Market Data team or TR Account team.

void AppClient::onStatusMsg(const StatusMsg& statusMsg, const OmmConsumerEvent& event)
{
	try {
		cout << "AppClient::OnStatusMsg :: " << statusMsg << endl;		
		// We should get a StatusMsg with Open Stream
		// once Tunnel Stream has been established
		if (!_bSubStreamOpen && 
		    event.getHandle() == _tunnelStreamHandle &&
	            statusMsg.hasState() &&
		    statusMsg.getState().getStreamState() == OmmState::OpenEnum)
		{
			_bSubStreamOpen = true;
			cout << "Client Login to Private Stream : " 
			    << statusMsg.getStreamId() << endl;
			ElementList elmList;
			elmList.addAscii("Password",            
			    "<TRCC_PASSWORD>")
			    .complete();

			ReqMsg reqMsg;

			reqMsg.domainType(MMT_SYSTEM)
				.name("<TRCC_USERNAME>")
				.privateStream(true)
				.attrib(elmList)
				.streamId(statusMsg.getStreamId());
			cout << endl << "Submit GenericMsg from onStatus" << endl;

			_subStreamHandle = _pOmmConsumer->registerClient(reqMsg,
			                *this, (void*)1, _tunnelStreamHandle);
		}
	}
...
}

In the code above, the first thing I do is check that the StatusMsg I have received is related to the Tunnel Stream request  (and not something else like the earlier Device Login) by comparing the events stream handle with the previously store Tunnel Stream Handle.

If the handles match, I then populate an ElementList with the obfuscated Password which is then specified as an attribute of the Client Login request message which I create.   

Note the following for the Client Login request:

  • The domain type is MMT_SYSTEM (content-neutral)
  • <TRCC_USERNAME> must be changed according to your authentication information
  • Private Stream flag is set
  • Password is set as an attribute of the request
  • The StreamId is set to be the same as the received StatusMsg



Using the StreamId of the StatusMsg will ensure the Client Login request is sent using the Tunnel Stream -  I confirmed earlier that this StatusMsg was for the Tunnel Stream.

Finally, I register this AppClient instance as the callback client to receive events related to the Client Login and I store the new stream handle for later reference.

Contribute data using Post Messages

Once I received a successful Client Login response from TRCC I can then go ahead and start contributing data using Post Messages.

Below is an example of a valid Client Login response which takes the form a RefreshMsg:

AppClient::OnRefreshMsg :: RefreshMsg
    streamId="5"
    domain="System Domain"
    Solicited
    RefreshComplete
    PrivateStream
    DoNotCache
    state="Open / Ok / None / 'Login accepted by host c905derdabc1'"
    itemGroup="00 00"
    name="<TRCC_USERNAME>"
    serviceId="10"
    Attrib dataType="ElementList"
        ElementList
            ElementEntry name="Password" 
            dataType="Ascii"   
            value="<TRCC_PASSWORD>"
        ElementListEnd
    AttribEnd
RefreshMsgEnd

The response shows that the Client Login was accepted and the stream is Open etc. As the response is handled by the onRefreshMsg handler I need to add another if statement to the onRefreshMsg() method to test for the valid Client Login response before I start Posting data to the server:

void AppClient::onRefreshMsg(const RefreshMsg& refreshMsg, const OmmConsumerEvent& event)
{
    try{
...
    //Start Posting messages once Client Login accepted
    if (refreshMsg.getDomainType() == MMT_SYSTEM &&
		event.getHandle() == _subStreamHandle &&
	    refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&
	    refreshMsg.getState().getDataState() == OmmState::OkEnum)
	    {
		    _postStreamID=refreshMsg.getStreamId();
	            cout<<"Send First PostMsg()....."<< endl;
		   _pOmmConsumer->submit( PostMsg()
			   .streamId(_postStreamID)
			   .postId( _postID ).domainType(MMT_MARKET_PRICE)
			   .solicitAck( true ).complete()
			   .payload(UpdateMsg().streamId(_postStreamID)
	                    .name("<CONTRIBUTION_ITEM>")
		            .payload( FieldList()
			        .addReal(22, _BID-1, OmmReal::ExponentNeg2Enum)
			        .addReal(25, _ASK-1, OmmReal::ExponentNeg2Enum)
                                .complete() ))
                            .complete(), 
			   _subStreamHandle );
			   
		    _postID++;
		    _BID++;
		    _ASK++;
		}
...
}

A breakdown of the above code snippet:

  • Check if the RefreshMsg is for the Client Login - by checking the domain and comparing the handles
  • Ensure the stream is Open before proceeding to post
  • Extract the StreamId of the Client Login so I can Submit the Post on the same private stream
  • Create a PostMsg with the following properties and values
    • StreamId the same as the Client Login private stream
    • Domain Type of MarketPrice
    • Solicit an Ack (acknowledgment) from server
    • Unique ID for this Post (initialized to 1 in AppClient constructor)
    • PostMsg payload of an UpdateMsg - to update an existing record
    • <CONTRIBUTION_ITEM> must be changed according to an actual item in the TRCC server.
    • UpdateMsg payload consisting of two fields - BID (22) and ASK (25)
  • Submit the PostMsg using the Stream Handle of the Client Login private stream
  • Increment the unique Post ID and my dummy BID and ASK values

A few things to note here:

  1. The unique Post ID can be retrieved from the Ack response you get back - so you can tie up the Ack to the Post and confirm that each Post has been acknowledged by the server
  2. TRCC currently only allows the Update of existing records on the server via an API. New records can be defined manually by a TRCC administrator.

Since I have solicited an Ack response from the server, the application should receive an AckMsg from the server shortly after the Post is submitted. This will result in the onAckMsg() method is invoked.   

Below is some example output from the onAckMsg() method when it receives an Acknowledgement to my first Post. Note the Ack Id=1, which matches the Post ID used in my first PostMsg above:

AppClient::onAckMsg :: AckMsg
    streamId="5"
    domain="MarketPrice Domain"
    ackId="1"
AckMsgEnd

In a production application, you would submit further posts as and when your records field values change.

However, as this is a simple tutorial application, I am going to use the AckMsg response as a trigger to send further Updates.

void AppClient::onAckMsg(const AckMsg& ackMsg, const OmmConsumerEvent& event)
{
	cout << endl << "AppClient::onAckMsg :: " << ackMsg << endl;
	cout << "Posting Update....." << endl;
	_pOmmConsumer->submit(PostMsg()
		.streamId(_postStreamID)
		.postId(_postID).domainType(MMT_MARKET_PRICE)
		.solicitAck(true).complete()
		.payload(UpdateMsg().streamId(_postStreamID)
			.name("<CONTRIBUTION_ITEM>")
			.payload(FieldList()
				.addReal(22, _BID, OmmReal::ExponentNeg2Enum)
				.addReal(25, _ASK, OmmReal::ExponentNeg2Enum)
				.complete()))
		.complete(), 
		_subStreamHandle);

	_postID++;
	_BID++;
	_ASK++;

}

In the above code snippet, when I receive an Ack response, I am pretty much just repeating the earlier code snippet to create a PostMsg with incrementing field values & Post ID and submitting it to the server.

Once I submit the above PostMsg, I should get back an Acknowledgement from the server, which will result onAckMsg() being invoked again, which will send out another PostMsg and so on.

Posting Update.....
AppClient::onAckMsg :: AckMsg
    streamId="5"
    domain="MarketPrice Domain"
    ackId="2"
AckMsgEnd
Posting Update.....
AppClient::onAckMsg :: AckMsg
    streamId="5"
    domain="MarketPrice Domain"
    ackId="3"
AckMsgEnd
Posting Update.....
AppClient::onAckMsg :: AckMsg
    streamId="5"
    domain="MarketPrice Domain"
    ackId="4"
AckMsgEnd

This will continue until the sleep(60000) call in the main() method expires, the main thread resumes execution and the application terminates.

Note that it is possible to receive a Nack response if the Post is not accepted by the TRCC server e.g. if you use an invalid RIC or try to update Fields that are not defined in the target RIC.   

Below is an example of a Nack response I received when I tried to post to an invalid RIC:

AppClient::onAckMsg :: AckMsg
    streamId="5"
    domain="MarketPrice Domain"
    ackId="1"
    nackCode="SymbolUnknown"
    text="Symbol unknown"
AckMsgEnd

You can test for a Nack with the AckMsg::hasNackCode() method, for example, the following outputs the Nack Code in the string and numerical format:

    if ( ackMsg.hasNackCode() )
        cout    << "Nack Code: " << ackMsg.getNackCodeAsString() 
                << " : "         << ackMsg.getNackCode() << endl;

 

Closing summary

A few points worth noting here:   

  • I am only updating a single RIC, sending out my first Post once I get a valid Client Login Response and then Posting an update each time I receive an AckMsg back from the server. In reality, you will be most likely contributing data to several RICs and therefore, you can submit your multiple Posts at any time once you are connected and have received a successful Client Login response. The trigger for sending further updates out etc will be as and when your data needs updating.
  • As this is a simple tutorial I am not verifying that each of my Posts was acknowledged. In a Production application, you may well want to verify that you receive an AckMsg back for each PostMsg you submit. As mentioned, this can be done by comparing the AckId in the response to the PostId you set in your outgoing PostMsg.
  • I am updating the same two BID and ASK fields in all my Posts, however, you should only send out those Fields that have changed since your previous submission.

 

Additional Resources

If you have any further questions I recommend you post them on our Developer

Forum or contact our Data Helpdesk

Existing Tutorials mentioned above:

Using The Tutorial Source Code

  • Windows

Elektron SDK build path and directory structure has changed, beginning with version 1.2.

For users with ESDK version 1.1.x, project files are provided for Visual Studio 2010, 2012 and 2013 in the ESDK 1.1 Build files subdirectory.

The users using ESDK 1.2, a project file is provided for Visual Studio 2013. Users also have an option to generate the project file for any compiler using the CMake process, described in the Quick Start guide.

The example has been built and tested with the EMA 3.2.1 development kit and can also work with older versions of SDK (that support the RSSL_ENCRYPTED channelType).

  • Linux

Extract the zip file and place the source folder: EMA_CPP_TRCC_Tutorial in the EMA examples directory Ema/Examples/Training/Consumer/300_Series_Examples so that it is alongside the EMA example used as the starting point (i.e. 341__MarketPrice__OffStreamPost).

In the EMA_CPP_TRCC_Tutorial directory, compile the code using ESDK Build System convention (from the Quick Start guide).

 

The resulting Consumer executable does not use any arguments.

Tutorial Group: 
EMA Consumer