Elektron SDK - Java

API Family: Elektron

ETA Tutorial 7 - Posting data to Contributions Channel

Download tutorial source code

Click here to download

Last update May 2020
Environment Windows, Linux
Compilers JDK 1.7 or greater
Prerequisite Complete the first 4 ETA tutorials in the series, and ETA Java Value Add Consumer example

Introduction

In this tutorial, we will explain and use the ETA Java SDK, to contribute data to Refinitiv Contributions Channel (RCC) directly, without using TREP infrastructure. The reader must have an understanding of an Elektron Transport API and be familiar with the Value Add Consumer example. To understand the contribution concepts, please refer to this article.

Prerequisite

This tutorial requires knowledge of ETA Java Value Add Consumer example. The example is in [Elektron-SDK-Package]\Java\Eta\Applications\Examples\src\main\java\com\thomsonreuters\upa\valueadd\examples\consumer which can be downloaded from Downloads of Elektron SDK - Java Page. For more details of building Value Add Consumer, please refer to Chapter 3 Building an OMM Consumer section in Value Add Developer Guide. It is assumed that the reader is familiar with running ETA application to connect to a provider application (or Elektron/RDF-D/ADS) before, and has experience developing products using the Java programming language in a networked environment.

Contributing Data to Refinitiv Elektron Feed

Currently, clients wishing to contribute data to Refinitiv, typically do so using on-site contribution systems such as MarketLinkIP. In order to meet the evolving needs of the market, Refinitiv has developed a new Contribution service - Refinitiv Contributions Channel (RCC). Refinitiv Contributions Channel is provided through a cluster of virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto Elektron.

There are two key mechanisms through which an API developer can contribute data to Refinitiv head-end. Once contributed, this data can be delivered to other customers who have been permissioned to view it. This service is commonly used by banks and funds to make their data available to trading communities.

The first option is to use and configure the Refinitiv Enterprise Platform (TREP), to take care of contribution specific connection and login details.

The second option, which is discussed in this tutorial, is for clients who do not have an onsite TREP installation and will connect to RCC through the Internet. To contribute directly to RCC, an ETA application undertakes following five steps:

  1. Establish an Encrypted Connection to the server
  2. Perform a Device Login
  3. Open a Tunnel Stream
  4. Perform a Client Login
  5. Contribute data using Post Messages

For the purpose of this tutorial, a new ETA example application: ValueAdd ConsumerContribution has been created based on the existing ValueAdd Consumer (from a com.thomsonreuters.upa.valueadd.examples.consumer package) example provided by ETA package as the starting point for the tutorial code to contribute data to RCC (based on Elektron-SDK1.2.1.java).

Basically, ValueAdd Consumer delivered by ETA package composed of auxiliary classes to covers many working areas such as Market by Price, Yield Curve, Queue Management, Cache Management, Tunnel Stream, and etc.

The ValueAdd ConsumerContribution example illustrated here has been simplified to depict only the basic process/workflow required between ETA and Refinitiv Contributions Channel.

The source files are recommended to be downloaded, which will be easier to follow this tutorial along with them.

1. Establish an Encrypted Connection to the server

To connect to the server, this example uses helper classes to help to parse command-line arguments (ConsumerContributionCmdParser) and store destination server information (ChannelInfo) into a ReactorConnectOptions object. Also, ChannelInfo.ConsumerRole is used to assign designated callback handler classes to process specific incoming event types accordingly (e.g. Channel – Connection, Login, or Directory).

// ConsumerContribution.java
	private void initChannelInfo(ChannelInfo chnlInfo)
	{
        // set up consumer role
		chnlInfo.consumerRole.defaultMsgCallback(this);
		chnlInfo.consumerRole.channelEventCallback(this);
		chnlInfo.consumerRole.loginMsgCallback(this);
		chnlInfo.consumerRole.directoryMsgCallback(this);
        ...
        
        // set up reactor connect options
        chnlInfo.connectOptions.reconnectAttemptLimit(-1); // attempt to recover forever
        chnlInfo.connectOptions.reconnectMinDelay(1000); // 1 second minimum
        chnlInfo.connectOptions.reconnectMaxDelay(60000); // 60 second maximum
        chnlInfo.connectOptions.connectionList().get(0).connectOptions().majorVersion(Codec.majorVersion());
        chnlInfo.connectOptions.connectionList().get(0).connectOptions().minorVersion(Codec.minorVersion());
        chnlInfo.connectOptions.connectionList().get(0).connectOptions().connectionType(chnlInfo.connectionArg.connectionType());
        chnlInfo.connectOptions.connectionList().get(0).connectOptions().unifiedNetworkInfo().serviceName(chnlInfo.connectionArg.port());
        chnlInfo.connectOptions.connectionList().get(0).connectOptions().unifiedNetworkInfo().address(chnlInfo.connectionArg.hostname());
        chnlInfo.connectOptions.connectionList().get(0).connectOptions().userSpecObject(chnlInfo);
        chnlInfo.connectOptions.connectionList().get(0).connectOptions().guaranteedOutputBuffers(1000);
        
        ...
        
        // handler encrypted connection, which the shouldEnableEncrypted must be true in this tutorial 
        chnlInfo.shouldEnableEncrypted = consumerContributionCmdLineParser.enableEncrypted();
         
        if (chnlInfo.shouldEnableEncrypted)
        {
        	ConnectOptions cOpt = chnlInfo.connectOptions.connectionList().get(0).connectOptions();
        	cOpt.connectionType(ConnectionTypes.ENCRYPTED);
            cOpt.tunnelingInfo().tunnelingType("encrypted"); 
            setEncryptedConfiguration(cOpt);        	           	        	
        }
        ...

A keystore file is an important entity that contains a server certification which is used during the HTTPS handshake state (it is a part of the connection establishment). You can find this article regarding how to create the new keystore file. Then, the application passes the keystore file information into the ReactorConnectionOption as well as the server information mentioned earlier.

// ConsumerContribution.java
    private void setEncryptedConfiguration(ConnectOptions options)
    {
    	String keyFile = consumerContributionCmdLineParser.keyStoreFile();
    	String keyPasswd = consumerContributionCmdLineParser.keystorePassword();
        if (keyFile == null)
        {
        	System.err.println("Error: Keystore file not provided.");  
        	System.exit(CodecReturnCodes.FAILURE);        		        		        		
        }                   
        if (keyPasswd == null)
        {
        	System.err.println("Error: Keystore password not provided.");  
        	System.exit(CodecReturnCodes.FAILURE);        		        		        		
        }          
    	    	
    	options.tunnelingInfo().KeystoreFile(keyFile);
       options.tunnelingInfo().KeystorePasswd(keyPasswd); 
       options.tunnelingInfo().SecurityProtocol("TLSv1.2");
    }

Once the application sets all necessary information to the ReactorConnectOption object already, the application then calls Reactor.connect() method to establish the encryption connection to the connecting server. A channel event is created and passed to the application’s reactorChannelEventCallback() method to verify the connection initiation status and let the application knows that the application can read data from the associating channel from now on.

// ConsumerContribution.java
	public int reactorChannelEventCallback(ReactorChannelEvent event)
	{
		ChannelInfo chnlInfo = (ChannelInfo)event.reactorChannel().userSpecObj();
		
		switch(event.eventType())
		{
    		case ReactorChannelEventTypes.CHANNEL_UP:
    		{
    			if (event.reactorChannel().selectableChannel() != null)
                    System.out.println("Channel Up Event: " + event.reactorChannel().selectableChannel());
                else
                    System.out.println("Channel Up Event");
    	        // register selector with channel event's reactorChannel
    	        try
    	        {
    				event.reactorChannel().selectableChannel().register(selector, SelectionKey.OP_READ, event.reactorChannel());
    			}
    	        catch (ClosedChannelException e)
    	        {
    	        	System.out.println("selector register failed: " + e.getLocalizedMessage());
    	        	return ReactorCallbackReturnCodes.SUCCESS;
    			}
    	        break;
    		}
    		...

2. Perform a Device Login

After the Channel state is UP, ETA will create a WorkerEvent that contains a WorkerEventType.CHANNEL_UP that triggers the ETA’s Reactor class to send a Login request message to the connecting server automatically.

// Reactor.java
    private void processChannelUp(WorkerEvent event, ReactorErrorInfo errorInfo)
    {
        ...

            LoginRequest loginRequest = ((ConsumerRole)reactorRole).rdmLoginRequest();
            if (loginRequest != null)
            {
                if (reactorChannel.watchlist() == null) // watchlist not enabled
                {
                    // a rdmLoginRequest was specified, send it out.
                    encodeAndWriteLoginRequest(loginRequest, reactorChannel, errorInfo);
                }
                ...

The Reactor class populates the authentication information from a LoginRequest object to create a message. So, the application needs to prepare username (and/or applicationId) into the LoginRequest object at the initialization step first.

// ConsumerContribution.java
	private void initChannelInfo(ChannelInfo chnlInfo)
	{        
        ...
        // initialize consumer role to default
        chnlInfo.consumerRole.initDefaultRDMLoginRequest();
        chnlInfo.consumerRole.initDefaultRDMDirectoryRequest();

	  // use command line login user name if specified
        if (consumerContributionCmdLineParser.userName() != null && !consumerContributionCmdLineParser.userName().equals(""))
        {
            LoginRequest loginRequest = chnlInfo.consumerRole.rdmLoginRequest();
            loginRequest.userName().data(consumerContributionCmdLineParser.userName());
        }
        
        
        // use command line application id if specified
        if (consumerContributionCmdLineParser.applicationId() != null && !consumerContributionCmdLineParser.applicationId().equals(""))
        {
            LoginRequest loginRequest = chnlInfo.consumerRole.rdmLoginRequest();
            loginRequest.attrib().applicationId().data(consumerContributionCmdLineParser.applicationId());
        }
        ...

3. Open a TunnelStream

Later, the Reactor class will also send a Directory request message to retrieve service information from the connecting server. If the application receives a Directory response successfully, the Reactor will update a channel status to ReactorChannelEventTypes.CHANNEL_READY. The ConsumerContribution class will start initiating a tunnel stream channel then.

 // ConsumerContribution.java
	public int reactorChannelEventCallback(ReactorChannelEvent event)
	{
		ChannelInfo chnlInfo = (ChannelInfo)event.reactorChannel().userSpecObj();
		
		switch(event.eventType())
		{
    		...
    		case ReactorChannelEventTypes.CHANNEL_READY:
    		{
    			// set ReactorChannel on ChannelInfo
    			chnlInfo.reactorChannel = event.reactorChannel();
    			if (event.reactorChannel().selectableChannel() != null)
                    System.out.println("Channel Ready Event: " + event.reactorChannel().selectableChannel());
                else
                    System.out.println("Channel Ready Event");
    			
    			if (tunnelStreamHandler != null)
                {
                    if (tunnelStreamHandler.openStream(chnlInfo, errorInfo) != ReactorReturnCodes.SUCCESS)
                    {
                        if (chnlInfo.reactorChannel.state() != ReactorChannel.State.CLOSED &&
                            chnlInfo.reactorChannel.state() != ReactorChannel.State.DOWN_RECONNECTING)
                        {
                            uninitialize();
                            System.exit(ReactorReturnCodes.FAILURE);
                        }
                    }
                }
                break;
    		}
    		...

The application uses a TunnelStreamHandler class to prepare a TunnelStreamOpenOptions object that will be an argument for a ReactorChannel.openTunnelStream() method. The TunnelStreamHandler in this tutorial implements two interfaces; TunnelStreamStatusEventCallback and TunnelStreamDefaultMsgCallback to receive a tunnel stream channel status and incoming messages from TunnelStream channel. Then, the application registers the TunnelStreamHandler object to receives associating events that relate to the tunnel stream channel to the TunnelStreamOpenOptions object via TunnelStreamOpenOptions.defaultMsgback() and TunnelStreamOpenOptions.statusEventCallback() methods.

After the preparation step is done, the application invokes the ReactorChannel.openTunnelStream() method to open a tunnel stream.

// TunnelStreamHandler.java
    public int openStream(ChannelInfo chnlInfo, ReactorErrorInfo errorInfo)
    {
        int ret;

        _tunnelStreamOpenOptions.clear();
        _tunnelStreamOpenOptions.name("BasicTunnelStream");
        _tunnelStreamOpenOptions.classOfService().flowControl().type(ClassesOfService.FlowControlTypes.BIDIRECTIONAL);
        
        _tunnelStreamOpenOptions.classOfService().flowControl().recvWindowSize(12288);
        _tunnelStreamOpenOptions.classOfService().dataIntegrity().type(ClassesOfService.DataIntegrityTypes.RELIABLE);
        _tunnelStreamOpenOptions.classOfService().authentication().type(ClassesOfService.AuthenticationTypes.NOT_REQUIRED);
        _tunnelStreamOpenOptions.classOfService().guarantee().type(ClassesOfService.GuaranteeTypes.NONE);
        _tunnelStreamOpenOptions.streamId(TUNNEL_STREAM_STREAM_ID);
        _tunnelStreamOpenOptions.domainType(_tunnelDomain);
        _tunnelStreamOpenOptions.serviceId(_serviceId);
        _tunnelStreamOpenOptions.defaultMsgCallback(this);
        _tunnelStreamOpenOptions.statusEventCallback(this);

        if ((ret = chnlInfo.reactorChannel.openTunnelStream(_tunnelStreamOpenOptions, errorInfo)) != ReactorReturnCodes.SUCCESS)
        {
            System.out.println("ReactorChannel.openTunnelStream() failed: " + CodecReturnCodes.toString(ret)
                    + "(" + errorInfo.error().text() + ")");
        }
        
        chnlInfo.tunnelStreamOpenSent = true;
        _chnlInfo = chnlInfo;
    
        return ReactorReturnCodes.SUCCESS;
    }

Important Note:

As of Elektron SDK Java 1.2.1, the application has to specify a ClassOfService().flowControl().recvWindowSize() attribute (to be a positive value). It should not skip this setting because the application may not receive messages from the tunnel stream properly.

4. Perform a client login

If the result of tunnel stream creation is successful, the application will receive a TunnelStreamStatusEvent that contains a State.streamState() = StreamStates.OPEN via the statusEventCallback method registered earlier.

// TunnelStreamHandler.java
    public int statusEventCallback(TunnelStreamStatusEvent event)
    {
        State state = event.state();
        int ret;
        
        System.out.println("Received TunnelStreamStatusEvent for Stream ID " + event.tunnelStream().streamId() + " with " + state + "\n");
        
        switch(state.streamState())
        {
            case StreamStates.OPEN:
                if (state.dataState() == DataStates.OK && _chnlInfo.tunnelStream == null)
                {
                    // Stream is open and ready for use.

                    // Add it to our ChannelInfo
                    _chnlInfo.tunnelStream = event.tunnelStream();
                    
                    _chnlInfo.isTunnelStreamUp = true;
                    
                    if (sendTunnelStreamAuthenticataion(event) < CodecReturnCodes.SUCCESS) {
                    	System.out.println("Error: sendTunnelStreamAuthenticataion");
                    } else {
                    	System.out.println("Success: sendTunnelStreamAuthenticataion");
                    }
                }
                break;
        ...

Then, the application calls a local TunnelStreamHandler.sendTunnelStreamauthentication() method to create a client login message, which contains a credential and it will be sent to AAA server by executing a TunnelStream.submit() method with an encoded message buffer. The password can be encrypted and locally stored in the configuration or Windows registry. Then, the password must be decrypted before using it in the login message.  The credentials are protected by the TLS encrypted connection. 

// TunnelStreamHandler.java
	private int sendTunnelStreamAuthentication(TunnelStreamStatusEvent event) {
		// get buffer to encode message into
		TransportBuffer buffer = _chnlInfo.tunnelStream.getBuffer(1024, _errorInfo);
		if (buffer == null) {
			System.out.println("TunnelStream.getBuffer() failed: "
					+ CodecReturnCodes.toString(_errorInfo.error().errorId()) + "(" + _errorInfo.error().text() + ")");
			return CodecReturnCodes.FAILURE;
		}

        RequestMsg requestMsg = (RequestMsg)CodecFactory.createMsg();
        requestMsg.msgClass(MsgClasses.REQUEST);  /* message is a request */
        requestMsg.domainType(DomainTypes.LOGIN);
        requestMsg.containerType(DataTypes.NO_DATA);
        requestMsg.applyStreaming();       
        requestMsg.streamId(1);
        
        requestMsg.msgKey().applyHasName();
        Buffer temp = CodecFactory.createBuffer();
        temp.data(_consumerContributionCmdLineParser.trccUser());
        requestMsg.msgKey().name(temp);
        
        requestMsg.msgKey().applyHasServiceId();
        requestMsg.msgKey().serviceId(10);
        
        requestMsg.msgKey().applyHasAttrib();
        requestMsg.msgKey().attribContainerType(DataTypes.ELEMENT_LIST);
        
        _encIter.clear();
        _encIter.setBufferAndRWFVersion(buffer, event.reactorChannel().majorVersion(), event.reactorChannel().minorVersion());
        
        int ret = requestMsg.encodeInit(_encIter, 0);
        if (ret != CodecReturnCodes.ENCODE_MSG_KEY_ATTRIB)
            return ret;
        
        elementEntry.clear();
        elementList.clear();
        elementList.applyHasStandardData();
        if ((ret = elementList.encodeInit(_encIter, null, 0)) != CodecReturnCodes.SUCCESS)
            return ret;

        elementEntry.dataType(DataTypes.ASCII_STRING);
        
        temp.clear();
        temp.data("Password");
        elementEntry.name(temp);
        
        temp.clear();
        temp.data(_consumerContributionCmdLineParser.trccPass());
        if ((ret = elementEntry.encode(_encIter, temp)) != CodecReturnCodes.SUCCESS)
            return ret;
        
        if ((ret = elementList.encodeComplete(_encIter, true)) != CodecReturnCodes.SUCCESS)
            return ret;
        
        if (ret != CodecReturnCodes.SUCCESS)
            return ret;
        if ((ret = requestMsg.encodeKeyAttribComplete(_encIter, true)) < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }
        
        if ((ret = requestMsg.encodeComplete(_encIter, true)) < CodecReturnCodes.SUCCESS)
            return ret;

        // submit the encoded data buffer to the tunnel stream
        _tunnelStreamSubmitOptions.clear();
        _tunnelStreamSubmitOptions.containerType(DataTypes.MSG);
        if ((ret = _chnlInfo.tunnelStream.submit(buffer, _tunnelStreamSubmitOptions, _errorInfo)) < ReactorReturnCodes.SUCCESS)
        {
            System.out.println("TunnelStream.submit() failed: " + CodecReturnCodes.toString(ret)
                    + "(" + _errorInfo.error().text() + ")");
            _chnlInfo.tunnelStream.releaseBuffer(buffer, _errorInfo);
            return CodecReturnCodes.FAILURE;
        } else {
        	return CodecReturnCodes.SUCCESS;
        }

5. Contribute data using post messages

A client login’s response will be passed to the TunnelStreamHandler.defaultMsgCallback() method. If the authentication succeeds in the client login process, the application will receive an inner message class; MsgClasses.REFRESH which contains a StreamStates.OPEN and DataStates.OK. The TunnelStreamHandler class then set a ChannelInfo.isTunelStreamAuthSucceeded flag to allow the application to able to encode and send post messages thereafter.

// TunnelStreamHandler.java
    public int defaultMsgCallback(TunnelStreamMsgEvent event)
    {
        if (event.containerType() == DataTypes.MSG)
        {
        	Msg msg = event.msg();
        	if (msg.domainType() == DomainTypes.LOGIN) 
        	{
        		// A client receives login refresh means the AAA server accepts a login request.
            	if (msg.msgClass() == MsgClasses.REFRESH)
            	{
            		RefreshMsg refreshMsg = (RefreshMsg)msg; 
            		System.out.println("TunnelStreamHandler: received a system domain refresh message - " + refreshMsg.state());
            		
            		// When State = Open/OK -> contribute data.
            		if (refreshMsg.state().streamState() == StreamStates.OPEN && refreshMsg.state().dataState() == DataStates.OK)
            		{
            			_chnlInfo.isTunnelStreamAuthSucceeded = true;
            			return ReactorCallbackReturnCodes.SUCCESS;
            		}
            	}
            	// A client receives login status means the AAA server rejects a login request.
            	else if (msg.msgClass() == MsgClasses.STATUS)
            	{
            		StatusMsg statusMsg = (StatusMsg)msg;
            		System.out.println("TunnelStreamHandler: received a system domain status message class - " + statusMsg.state());
            	}
            	else
            	{
            		System.out.println("TunnelStreamHandler: received an unexpected system domain message class: " + MsgClasses.toString(msg.msgClass()));
            	}
        	}
        	...

The client login’s response may include a maximum message rate in the refresh response and this message rate should be obeyed.  This message looks like:

<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="3" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="1" dataSize="127">
    <extendedHeader data="0100"/>
    <dataBody>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
        <refreshMsg domainType="RSSL_DMT_LOGIN" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x648 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_DO_NOT_CACHE|RSSL_RFMF_PRIVATE_STREAM)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="Login accepted by host HOSTNAME via HANDLERNAME"  dataSize="0">
            <key  flags="0x23 (RSSL_MKF_HAS_SERVICE_ID|RSSL_MKF_HAS_NAME|RSSL_MKF_HAS_ATTRIB)"  serviceId="10" name="USERNAME" attribContainerType="RSSL_DT_ELEMENT_LIST">
                <attrib>
                    <elementList flags="0x8 (RSSL_ELF_HAS_STANDARD_DATA)">
                        <elementEntry name="TRCE:MaxMessagesPerSecond" dataType="RSSL_DT_UINT" data="300"/>
                    </elementList>
                </attrib>
            </key>
            <dataBody>
            </dataBody>
        </refreshMsg>
    </dataBody>
</genericMsg>

This feature is available in the cloud version so it will not be seen when logging in to the current On-Premises version. Therefore, if the TRCE:MaxMessagesPerSecond element presents, the message rate posted by the application should not exceed this number.  

The code below demonstrates how to encode a post message with a MARKET_PRICE update message. An encodePostWithMsg() takes a buffer, modifies and adds the buffer input with encoded post data and sends to the tunnel stream channel periodically using the TunnelStream.submit() method (as same as the “4. Perform a client login” step).

// TunnelStreamHandler.java
    public void sendMsg(ReactorChannel reactorChannel)
    {
        long currentTime = System.currentTimeMillis();
        int ret; 

        if (currentTime < _nextSubmitMsgTime)
        {
            return;
        }
        
        _nextSubmitMsgTime = currentTime + TUNNEL_SEND_FREQUENCY * 1000;
        
        if (_chnlInfo != null && _chnlInfo.tunnelStream != null && _chnlInfo.isTunnelStreamUp)
        {
			// get buffer to encode message into
			TransportBuffer buffer = _chnlInfo.tunnelStream.getBuffer(1024, _errorInfo);
			if (buffer == null) {
				System.out.println("TunnelStream.getBuffer() failed: " + 
						CodecReturnCodes.toString(_errorInfo.error().errorId()) + "(" + _errorInfo.error().text() + ")");
				return;
			}
        	
            // submit the encoded data buffer to the tunnel stream
            _tunnelStreamSubmitOptions.clear();
            _tunnelStreamSubmitOptions.containerType(DataTypes.MSG);
            
            encodePostWithMsg(reactorChannel, buffer);
            if ((ret = _chnlInfo.tunnelStream.submit(buffer, _tunnelStreamSubmitOptions, _errorInfo)) < ReactorReturnCodes.SUCCESS)
            {
                System.out.println("TunnelStream.submit() failed: " + CodecReturnCodes.toString(ret)
                        + "(" + _errorInfo.error().text() + ")");
                _chnlInfo.tunnelStream.releaseBuffer(buffer, _errorInfo);
                return;
            }
        }
    }

The TunnelStreamHandler.encodePostWithMsg() method below demonstrates how to encode a post message into the buffer:

// TunnelStreamHandler.java
    private int encodePostWithMsg(ReactorChannel chnl, TransportBuffer msgBuf)
    {
        // First encode message for payload
        postMsg.clear();

        // set-up message
        postMsg.msgClass(MsgClasses.POST);
        postMsg.streamId(1);
        postMsg.domainType(DomainTypes.MARKET_PRICE);
        postMsg.containerType(DataTypes.MSG);

        // Note: post message key not required for on-stream post
        postMsg.applyPostComplete();
        postMsg.applyAck();
        postMsg.applyHasPostId();

        postMsg.postId(nextPostId++);      

        try
    	{
    		postMsg.postUserInfo().userAddr(InetAddress.getLocalHost().getHostAddress());
    	}
    	catch (Exception e)
    	{
    		System.out.println("Populating postUserInfo failed. InetAddress.getLocalHost().getHostAddress exception: " + e.getLocalizedMessage());
    		return CodecReturnCodes.FAILURE;
    	}
        postMsg.postUserInfo().userId(Integer.parseInt(System.getProperty("pid", "1")));
 
       
        // encode post message
        encIter.clear();
        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnl.majorVersion(), chnl.minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        ret = postMsg.encodeInit(encIter, 0);
        if (ret != CodecReturnCodes.ENCODE_CONTAINER)
        {
            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        // get a buffer for nested market price refresh
        postNestedMsgPayLoad = CodecFactory.createBuffer();
        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

        // Although we are encoding RWF message, this code
        // encodes nested message into a separate buffer.
        // this is because MarketPrice.encode message is shared by all
        // applications, and it expects to encode the message into a stand alone
        // buffer.
        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        postMsgEncIter.clear();
        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnl.majorVersion(), chnl.minorVersion());
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }
        
        updateMsg.clear();
        updateMsg.msgClass(MsgClasses.UPDATE);
        updateMsg.streamId(1);
        updateMsg.domainType(DomainTypes.MARKET_PRICE);
        updateMsg.containerType(DataTypes.FIELD_LIST);
        
        /* attrib info */
        updateMsg.applyHasMsgKey();
        updateMsg.msgKey().name().data(_consumerContributionCmdLineParser.trccPostItem());
        updateMsg.msgKey().applyHasName();
        /* name type */
        updateMsg.msgKey().nameType(InstrumentNameTypes.RIC);
        updateMsg.msgKey().applyHasNameType();
        
        // encode message
        ret = updateMsg.encodeInit(postMsgEncIter, 0);
        if (ret < CodecReturnCodes.SUCCESS)
        {
            return ret;
        }

        fList.clear();
        // encode field list
        fList.applyHasStandardData();
        if ((ret = fList.encodeInit(postMsgEncIter, null, 0)) < CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeFieldListInit() failed: <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        // BID
        fEntry.clear();
        dictionaryEntry = dictionary.entry(MarketPriceItem.BID_FID);
        if (dictionaryEntry != null)
        {
            fEntry.fieldId(MarketPriceItem.BID_FID);
            fEntry.dataType(dictionaryEntry.rwfType());
            tempReal.clear();
            tempReal.value(bidValue++, RealHints.EXPONENT_2);
            if ((ret = fEntry.encode(postMsgEncIter, tempReal)) < CodecReturnCodes.SUCCESS)
            {
                System.out.println("EncodeFieldEntry() failed: <" + CodecReturnCodes.toString(ret) + ">");

                return ret;
            }
        }
        // ASK
        fEntry.clear();
        dictionaryEntry = dictionary.entry(MarketPriceItem.ASK_FID);
        if (dictionaryEntry != null)
        {
            fEntry.fieldId(MarketPriceItem.ASK_FID);
            fEntry.dataType(dictionaryEntry.rwfType());
            tempReal.clear();
            tempReal.value(askValue++, RealHints.EXPONENT_2);
            if ((ret = fEntry.encode(postMsgEncIter, tempReal)) < CodecReturnCodes.SUCCESS)
            {
                System.out.println("EncodeFieldEntry() failed: <" + CodecReturnCodes.toString(ret) + ">");
                return ret;
            }
        }

        // complete encode field list
        if ((ret = fList.encodeComplete(postMsgEncIter, true)) < CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeFieldListComplete() failed: <" + CodecReturnCodes.toString(ret) + ">");

            return ret;
        }
        
        // complete encode post message
        if ((ret = postMsg.encodeComplete(postMsgEncIter, true)) < CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeMsgComplete() failed: <" + CodecReturnCodes.toString(ret) + ">");
            return ret;
        }

        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);
        if (ret != CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));
            return CodecReturnCodes.FAILURE;
        }

        // complete encode message
        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)
        {
            System.out.println("EncodeMsgComplete() failed with return code: " + ret);
            return ret;
        }

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId() + "\n  seqNum   = " + postMsg.seqNum());

        return CodecReturnCodes.SUCCESS;

Remark: Every encodes initialize function has to be completed by invoking the corresponding rssl encode complete API. For example, encoding function of EncodeIterator.encodeNonRWFInit() has to be completed by calling EncodeIterator.encodeNonRWFComplete().

A few things to note here:

  • The unique Post ID can be retrieved from the Ack response you get back - so you can tie up the Ack to the Post and confirm that each Post has been acknowledged by the server
  • RCC currently only allows the Update of existing records on the server via an API. New records can be defined manually by a RCC administrator.

Similar to the “Perform a client login” step, a post response will return to the TunnelStream.defaultMsgCallback() method registered previously. The application can decode the ack message that conveys the post result to users.

    public int defaultMsgCallback(TunnelStreamMsgEvent event)
    {
        if (event.containerType() == DataTypes.MSG)
        {
        	Msg msg = event.msg();
        	...
        	else if (msg.domainType() == DomainTypes.MARKET_PRICE)
        	{
        	// A client receives a market_price ack to inform a contribution status from TRCC server.
            	if (msg.msgClass() == MsgClasses.ACK)
            	{
            		AckMsg ackMsg = (AckMsg) msg;
            		if (ackMsg.checkHasNakCode()) {
            			System.out.println("TunnelStreamHandler: received an ack response (post failed), reason=" + ackMsg.text());
            		} else {
            			System.out.println("TunnelStreamHandler: received an ack response (post successfully)");
            		}
            	}
            	return ReactorCallbackReturnCodes.SUCCESS;
        	}        	
        	...

Below is a successful post result (trace and output messages):

<!-- Incoming Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/10.42.68.69:7160 remote=chp02-eap-emea1.thomsonreuters.com/159.220.44.37:443] -->
<!-- Wed Oct 31 16:45:51 ICT 2018 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<GENERIC domainType="SYSTEM" streamId="1" containerType="MSG" flags="0x19 (HAS_EXTENDED_HEADER|HAS_SEQ_NUM|MESSAGE_COMPLETE)" seqNum="3" dataSize="18">
    <extendedHeader data="0100"/>
    <dataBody>
        <ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x08 (HAS_SEQ_NUM)" ackId="1" seqNum="1" dataSize="0">
            <dataBody>
            </dataBody>
        </ACK>
    </dataBody>
</GENERIC>

TunnelStreamHandler: received an ack response (post successfully) for Id=1, seqNum=1

Below is a failed post result (trace and output messages which contain a HAS_NAK_CODE flag and detail information in text attribute):

<!-- Incoming Reactor message -->
<!-- java.nio.channels.SocketChannel[connected local=/10.42.68.69:7160 remote=chp02-eap-emea1.thomsonreuters.com/159.220.44.37:443] -->
<!-- Wed Oct 31 16:45:56 ICT 2018 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<GENERIC domainType="SYSTEM" streamId="1" containerType="MSG" flags="0x19 (HAS_EXTENDED_HEADER|HAS_SEQ_NUM|MESSAGE_COMPLETE)" seqNum="4" dataSize="53">
    <extendedHeader data="0100"/>
    <dataBody>
        <ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x2A (HAS_TEXT|HAS_SEQ_NUM|HAS_NAK_CODE)" ackId="2" nakCode="DENIED_BY_SRC" text="PostMsg has no RSSL_PSMF_ACK flag" seqNum="2" dataSize="0">
            <dataBody>
            </dataBody>
        </ACK>
    </dataBody>
</GENERIC>

TunnelStreamHandler: received an ack response (post failed) for Id=2, seqNum=2, reason=PostMsg has no RSSL_PSMF_ACK flag

Closing summary

A few points worth noting here:

  • The application provides updates for a single RIC, sending out the first Post once it gets a valid Client Login Response and then Posting an update periodically while monitoring an ack message back from the server. In reality, the application will be most likely contributing data to several RICs and therefore, it can submit multiple Posts at any time once the connection is established and receives a successful Client Login response. The trigger for sending further updates out etc will be as and when data need to be performed
  • As this is a simple tutorial, the application does not verify that each of post message was acknowledged. In a Production application, the application may well want to verify that it receives an ack message back for each post message submitted. As mentioned, this can be done by comparing the ackId in the response to the postId set in each outgoing post message.
  • This tutorial updates the same two BID and ASK fields in all posts, however, you should only send out those Fields that have changed since your previous submission.

Additional Resources

If you have any further questions, kindly post them on our Developer Forum (for ETA technical-related queries)

ConsumerContribution based on the existing example from:

  • <ELEKTRON_SDK_PACKAGE>\Java\Eta\Applications\Examples\src\main\java\com\thomsonreuters\upa\valueadd\examples\consumer

Building, and Running an application from the tutorial source code

Extract the zip file and place the source folder to the same directory of the project workspace as the Value Add Consumer example mentioned above. The expected output can be depicted as the following picture:

\Eta\Applications\Examples\src\main\java\com\thomsonreuters\upa\valueadd\examples\consumercontribution

This tutorial code requires the same libraries as Value Add Consumer application without any additional jar files. Once the codes are copied to the project, the IDE should be able to compile this tutorial as well.

Here, there is a list of arguments for this tutorial application.

Usage:
java -cp <classpath> com.thomsonreuters.upa.valueadd.examples.consumercontribution.ConsumerContribution
[-c <hostname>:<port> <service name>] [-bc <hostname>:<port>] -tunnel [-uname <LoginUsername>] [-runtime <seconds>]
 -c specifies a connection to open and a list of items to request or use for queue messaging:
     hostname:        Hostname of provider to connect to
     port:            Port of provider to connect to
     service:         Name of service to request items from on this connection
     domain:itemName  Domain and name of an item to request
         Example Usage: -c localhost:14002 DIRECT_FEED mp:TRI.N
         Note: service and domain:itemName are required by ConnectionArgsParser
         , but they are not used in this example
 -bc specifies a backup connection that is attempted if the primary connection fails
 -tunnel enables consumer to open tunnel stream and send basic text messages (it is required before a -uname argument)
 -uname changes the username used when logging into the provider
 -publisherInfo specifies that the application should add user provided publisher Id and publisher ipaddress when posting
 -connectionType specifies the connection type that the connection should use (possible values is: 'encrypted')
 -keyfile specifies keystore file location and name
 -keypasswd specifies keystore password
 -x provides an XML trace of messages
 -runtime adjusts the running time of the application
 -aid Specifies the Application ID
 -trccUser Specifies a username to be used to for AAA authentication
 -trccPass Specifies a text (password) to be used to for AAA authentication
 -trccPostItem an itemName for Post messages sent to RCC server

 

Tutorial Group: 
ETA Consumer