Elektron SDK - C/C++

API Family: Elektron

ETA Consumer - Posting data to TR Contribution Channel

Download tutorial source code

Click here to download

Last update May 2020
Environment Windows, Linux
Compilers Refer to the ETA Compiler Guides for a complete list
Prerequisite Downloaded, installed, compiled and ran an ETA consumer example

Posting Directly to the Refinitiv Contribution Channel (RCC)

Knowledge Prerequisite – Must have an understanding of an Elektron Transport API and be familiar with consuming OMM Market Price data. You should also have a basic understanding of Reactor concept and associated topic; Tunnel Streams.

Contributing Data to Refinitiv Elektron Feed

Currently, clients wishing to Contribute data to Refinitiv, typically do so using on-site contribution systems such as MarketLinkIP.

In order to meet the evolving needs of the market, Refinitiv has developed a new Contribution service - the Refinitiv Contribution Channel (RCC). The Contribution Channel is provided through cluster virtual servers that can provide data optimization & conflation capabilities as well as a much higher rate of content onto Elektron.

Whilst it will be possible in the near future to contribute to RCC via the Refinitiv Enterprise Platform (TREP) system, some of our clients use 3rd party systems and will, therefore, need to contribute directly to RCC.

To contribute directly to RCC you will need to develop an OMM Consumer application using one of our Elektron APIs - i.e. Elektron Transport API (ETA) or Elektron Message API (EMA).

The C versions of ETA and EMA on the both Windows and Linux platform offer this functionality (it is supported on Windows since Elektron-SDK version 1.2.0).

This article focuses on the process you will need to implement using ETA and is summarized as follows:

  1. Establish Encrypted Connection
  2. Perform a Device Login
  3. Open a Tunnel Stream
  4. Perform a Client Login
  5. Contribute data using Post Messages

For the purpose of this tutorial, a new ETA example application has been created based on the existing VAConsumer example provided by ETA package as the starting point for the tutorial code to contribute data to RCC.

Basically, VAConsumer delivered by ETA package is a feature-loaded by auxiliary resources to covers many areas such as Market by Price, Yield Curve, Queue Management, Cache Management, Tunnel Stream and etc.

A VACosumerContribution example illustrated here has been simplified to depict the basic workflow required between ETA and TRCE

The source files are recommended to be downloaded, which will be easier to follow this tutorial along with them.

Establish Encrypted Connection

In this example, hostname and port are read via command line arguments and stored in ChannelCommand – in the rsslChannelCommand.h header file.

typedef struct
{
	RsslReactorChannel *reactorChannel;
	RsslReactorConnectOptions cOpts;
	RsslReactorConnectInfo cInfo;
	RsslReactorChannelRole *pRole;
	...

	char hostName[MAX_BUFFER_LENGTH];
	char port[MAX_BUFFER_LENGTH];
} ChannelCommand;

An RsslReactorOMMConsumerRole structure will have been populated with the addresses of callback functions to handle various channel events. For example, the application assigns a function named channelEventCallback to handle RsslReactorChannelEvents, and a loginMsgCallback function to process RsslRDMLoginMsgs received.

static RsslReactor *pReactor = NULL;
...

/*** MAIN ***/
int main(int argc, char **argv)
{
	RsslRet ret;
	RsslCreateReactorOptions reactorOpts;
	RsslErrorInfo rsslErrorInfo;
	int i;

	RsslReactorOMMConsumerRole consumerRole;
	RsslRDMLoginRequest loginRequest;
	RsslReactorDispatchOptions dispatchOpts;
	...

	/* Setup callback functions and use them on all connections*/
	rsslClearOMMConsumerRole(&consumerRole);
	consumerRole.loginMsgCallback = loginMsgCallback;
	consumerRole.base.channelEventCallback = channelEventCallback;	consumerRole.base.defaultMsgCallback = defaultMsgCallback;

Refer to the snippet code above, the application creates the RsslReactorOMMConsumerRole, and registers callback functions. This will be stored in ChannelCommand structure mentioned earlier.

Set the encryption protocol flags as RSSL_ENC_TLSV1_2 and connection Type as RSSL_CONN_TYPE_ENCRYPTED in RsslReactorConnectOptions  which will establish secure TLS (Transport Layer Security) encrypted connection using port 443.

ChannelCommand *pCommand = &chanCommands[i];
RsslReactorConnectOptions *pOpts = &pCommand->cOpts;
RsslReactorConnectInfo *pInfo = &pCommand->cInfo;

pCommand->pRole = (RsslReactorChannelRole*)&consumerRole;
pInfo->rsslConnectOptions.guaranteedOutputBuffers = 500;
pInfo->rsslConnectOptions.majorVersion = RSSL_RWF_MAJOR_VERSION;
pInfo->rsslConnectOptions.minorVersion = RSSL_RWF_MINOR_VERSION;
pInfo->rsslConnectOptions.userSpecPtr = &chanCommands[i];
pInfo->initializationTimeout = 5;
pOpts->reactorConnectionList = pInfo;
pOpts->reconnectAttemptLimit = -1;
pOpts->reconnectMaxDelay = 5000;
pOpts->reconnectMinDelay = 1000;

pOpts->rsslConnectOptions.userSpecPtr = pCommand;
pOpts->rsslConnectOptions.connectionInfo.unified.address = pCommand->hostName;
pOpts->rsslConnectOptions.connectionInfo.unified.serviceName = pCommand->port;
pOpts->rsslConnectOptions.compressionType = RSSL_COMP_NONE;

// Specify a secure connection using TLS version 1.2
pOpts->rsslConnectOptions.encryptionOpts.encryptionProtocolFlags = RSSL_ENC_TLSV1_2;
pOpts->rsslConnectOptions.connectionType = RSSL_CONN_TYPE_ENCRYPTED;
...

Then, the application calls rsslReactorConnect to create a new outbound connection. rsslReactorConnect will create an OMM consumer type connection using the provided configuration and role information.

/* Create an RsslReactor which will manage our channels. */
rsslClearCreateReactorOptions(&reactorOpts);
if (!(pReactor = rsslCreateReactor(&reactorOpts, &rsslErrorInfo)))
{
	printf("Error: %s", rsslErrorInfo.rsslError.text);
	exit(-1);
}

...

/* Establish connection with Reactor and attributes
   from ChannelCommand structure */
if (rsslReactorConnect(pReactor, &pCommand->cOpts, pCommand->pRole, &rsslErrorInfo) != RSSL_RET_SUCCESS)
{
	printf("Error rsslReactorConnect(): %s\n", rsslErrorInfo.rsslError.text);
}

After establishing the underlying connection, a channel event is returned to the application’s RsslReactorChannelEventCallback; this provides the RsslReactorChannel and the state of the current connection.

When a RSSL_RC_CET_CHANNEL_READY event is received by the ChannelEventCallback method as this function has been registered in the RsslReactorOMMConsumerRole.base.channelEventCallback statement earlier.

 

Perform a Device Login

The RsslReactorOMMConsumerRole structure will have been populated with the addresses of callback functions to handle various channel events.

When an RSSL_RC_CET_CHANNEL_READY event is received by the ChannelEventCallback method a communication stream can be opened. The application then authenticates to the connecting server by sending a Login request message.

RsslReactorCallbackRet channelEventCallback(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, RsslReactorChannelEvent *pConnEvent)
{
		...
	
		case RSSL_RC_CET_CHANNEL_READY:
		{
			pCommand->reactorChannelReady = RSSL_TRUE;
			return RSSL_RC_CRET_SUCCESS;
		}
		...
	}

	return RSSL_RC_CRET_SUCCESS;
}

In the example code, it sets a flag to convey that the connection between ETA and the server has been established. So, the main loop will check this logic whether it should continue sending a login request or not.

/* Main loop. The loop
 * calls select() to wait for notification, then calls rsslReactorDispatch(). */
	do
	{
		...

		if (chanCommands[i].reactorChannelReady != RSSL_TRUE)
		{
			continue;
		}
	
		...

		if (chanCommands[i].canSendLoginReissue == RSSL_TRUE &&
			currentTime >= (RsslInt)(chanCommands[i].loginReissueTime))
		{
			RsslReactorSubmitMsgOptions submitMsgOpts;
			RsslErrorInfo rsslErrorInfo;

			rsslClearReactorSubmitMsgOptions(&submitMsgOpts);
			submitMsgOpts.pRDMMsg = (RsslRDMMsg*)chanCommands[i].pRole->ommConsumerRole.pLoginRequest;
			if ((ret = rsslReactorSubmitMsg(pReactor,chanCommands[i]
.reactorChannel,&submitMsgOpts,&rsslErrorInfo)) != RSSL_RET_SUCCESS)
			{
				printf("Login reissue failed:  %d(%s)\n", ret, rsslErrorInfo.rsslError.text);
			}
			else
			{
				printf("Login reissue sent\n");
			}
			chanCommands[i].canSendLoginReissue = RSSL_FALSE;
		}
	}
}

So, if the indicator flag is not set, the application will skip the sending login part statement – rsslReactorSubmitMsg in the snippet code above. The login request message has been created at the beginning of the main program. Please refer to the snippet code below:

RsslRDMLoginRequest loginRequest;
...

/* Initialize the default login request(Use 1 as the Login Stream ID). */
if (rsslInitDefaultRDMLoginRequest(&loginRequest, 1) != RSSL_RET_SUCCESS)
{
	printf("rsslInitDefaultRDMLoginRequest() failed\n");
	cleanUpAndExit(-1);
}
	
/* If a username was specified, change username on login request. */
if (userName.length)
	loginRequest.userName = userName;

/* If an authentication Token was specified, set it on the login request and set the user name type to RDM_LOGIN_USER_AUTHN_TOKEN */
if (authnToken.length)
{
	loginRequest.flags |= RDM_LG_RQF_HAS_USERNAME_TYPE;
	loginRequest.userName = authnToken;
	loginRequest.userNameType = RDM_LOGIN_USER_AUTHN_TOKEN;
		
if(authnExtended.length)
	{
		loginRequest.flags |= RDM_LG_RQF_HAS_AUTHN_EXTENDED;
		loginRequest.authenticationExtended = authnExtended;
	}
}
		
if (appId.length)
{
	loginRequest.flags |= RDM_LG_RQF_HAS_APPLICATION_ID;
	loginRequest.applicationId = appId;
}

...
/* Set the messages to send when the channel is up */
consumerRole.pLoginRequest = &loginRequest;

 

Open a Tunnel Stream

Initialize an RsslTunnelStreamOpenOptions structure (refer to tunnelStreamhandler.c), it is important that the stream id is set to 1, and then call rsslReactorOpenTunnelStream.

The RsslTunnelStreamOpenOptions structure will have been populated with the addresses of callback functions to handle various events.

A market price request should be sent to the tunnel stream to register when an RSSL_STREAM_OPEN event state is received by the status event callback function tunnelStreamStatusEventCallback

RsslRet ret;
RsslTunnelStreamOpenOptions tunnelStreamOpenOptions;
RsslErrorInfo errorInfo;

...

rsslClearTunnelStreamOpenOptions(&tunnelStreamOpenOptions);
tunnelStreamOpenOptions.name = pTunnelHandler->consumerName;
tunnelStreamOpenOptions.streamId = TUNNEL_STREAM_STREAM_ID;
tunnelStreamOpenOptions.domainType = pTunnelHandler->domainType;
tunnelStreamOpenOptions.serviceId = pTunnelHandler->serviceId;
tunnelStreamOpenOptions.statusEventCallback = tunnelStreamStatusEventCallback;
tunnelStreamOpenOptions.defaultMsgCallback = pTunnelHandler->defaultMsgCallback;
tunnelStreamOpenOptions.classOfService.dataIntegrity.type = RDM_COS_DI_RELIABLE;
tunnelStreamOpenOptions.classOfService.flowControl.type = RDM_COS_FC_BIDIRECTIONAL;
tunnelStreamOpenOptions.userSpecPtr = pTunnelHandler;
...
if ((ret = rsslReactorOpenTunnelStream(pReactorChannel, &tunnelStreamOpenOptions, &errorInfo)) != RSSL_RET_SUCCESS)
{
	printf("rsslReactorOpenTunnelStream failed: %s(%s)\n", rsslRetCodeToString(ret), &errorInfo.rsslError.text);
	return;
}



Perform a Client Login

When the Tunnel Stream is established, registered Status Event Callback will be triggered to the application and this is the place to send out the Client login request by credentials using API (rsslTunnelStreamSubmit).

/* Callback for tunnel stream status events. */
/* Process a tunnel stream status event for the TunnelStreamHandler. */
RsslReactorCallbackRet tunnelStreamStatusEventCallback(RsslTunnelStream *pTunnelStream, RsslTunnelStreamStatusEvent *pEvent)
{
	RsslState *pState = pEvent->pState;
	...

	switch (pState->streamState)
	{
	case RSSL_STREAM_OPEN:
	{
		if (pState->dataState == RSSL_DATA_OK
			&& pTunnelHandler->pTunnelStream == NULL)
		{
			if (pEvent->pAuthInfo)
			{
				printf(" Client was authenticated.\n");
			}

			pTunnelHandler->pTunnelStream = pTunnelStream;
			pTunnelHandler->processTunnelStreamOpened(pTunnelStream);

			sendTunnelStreamAuthenticataion(pTunnelHandler);
		}

		break;
	}

	case RSSL_STREAM_CLOSED:
	...

The Login request message has been created and sent in the sendTunnelStreamAuthenticataion function. This function will encode RCC Client credentials - which consist of a RCC username and a password - both of which will be supplied by your Market Data team or Refinitiv Account team. The password can be encrypted and locally stored in the configuration or Windows registry. Then, the password must be decrypted before using it in the login message.  The credentials are protected by the TLS encrypted connection. 

/* After a tunnel stream channel is established, send a request message with trceUsername and trcePassword to the server */
void sendTunnelStreamAuthenticataion(TunnelStreamHandler *pTunnelHandler)
{

	...

		msg.msgBase.msgKey.flags = msgKeyFlags;
		msg.msgBase.msgKey.name = pTunnelHandler->trceUsername;
		msg.msgBase.msgKey.attribContainerType = RSSL_DT_ELEMENT_LIST;

	...

			RsslElementEntry ee = RSSL_INIT_ELEMENT_ENTRY;
			ee.dataType = RSSL_DT_ASCII_STRING;
			ee.name.length = sizeof("Password") - 1;
			ee.name.data = "Password";

			RsslRet rsslRet = rsslEncodeElementEntry(&encodeIter, &ee, &(pTunnelHandler->trcePassword));
			rsslRet = rsslEncodeElementListComplete(&encodeIter, RSSL_TRUE);
	...
}



Contribute data using Post Messages

A post message can be created and sent when an RSSL_MC_REFRESH message is received by the TunnelStreamDefaultMsgCallback function. The address of this function will have been provided to the RsslTunnelStreamOpenOptions structure as above.

Before that the application verifies the client login's respinse to see whether the client login is success or not. The client login’s response may also include a maximum message rate in the refresh response and this message rate should be obeyed. This message looks like:

<!-- End Message (Channel IPC descriptor = 8) -->
<!-- Incoming Message (Channel IPC descriptor = 8) -->
<!-- Time: 18:04:41:526 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="1" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="1" dataSize="177">
    <extendedHeader data="01"/>
    <dataBody>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
        <refreshMsg domainType="RSSL_DMT_LOGIN" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x668 (RSSL_RFMF_HAS_MSG_KEY|RSSL_RFMF_SOLICITED|RSSL_RFMF_REFRESH_COMPLETE|RSSL_RFMF_DO_NOT_CACHE|RSSL_RFMF_PRIVATE_STREAM)" groupId="0" dataState="RSSL_DATA_OK" streamState="RSSL_STREAM_OPEN" code="RSSL_SC_NONE" text="Login accepted by host c454cybdbcat5"  dataSize="0">
            <key  flags="0x22 (RSSL_MKF_HAS_NAME|RSSL_MKF_HAS_ATTRIB)"  name="<RCC Username>" attribContainerType="RSSL_DT_ELEMENT_LIST">
                <attrib>
                    <elementList flags="0x8 (RSSL_ELF_HAS_STANDARD_DATA)">
                        <elementEntry name="TRCE:MaxMessagesPerSecond" dataType="RSSL_DT_UINT" data="300"/>
                    </elementList>
                </attrib>
            </key>
            <dataBody>
            </dataBody>
        </refreshMsg>
    </dataBody>
</genericMsg>

This feature is available in the cloud version so it will not be seen when logging in to the current On-Premises version. Therefore, if the TRCE:MaxMessagesPerSecond element presents, the message rate posted by the application should not exceed this number.  

The response above also shows that the Client Login was accepted and the stream is Open. From the application perspective, it needs to check the registered callback method to tunnelStreamOpenOptions.defaultMsgCallback. For this application, it uses simpleTunnelMsgHandlerConsumerMsgCallback (refer to simpleTunnelMsgHandler.c), so this method has a logic to check whether the incoming message is a refresh message or not. 

RsslReactorCallbackRet simpleTunnelMsgHandlerConsumerMsgCallback(RsslTunnelStream *pTunnelStream, RsslTunnelStreamMsgEvent *pEvent)
{
	switch (pEvent->containerType)
	{
	case RSSL_DT_MSG:
	{
		RsslMsg *msg = pEvent->pRsslMsg;

		switch (msg->msgBase.msgClass)
		{
		case RSSL_MC_REFRESH:
		{
			printf("simpleTunnelMsgHandlerConsumerMsgCallback: received refresh message %s\n\n", msg->refreshMsg.state.text.data);
			if (msg->refreshMsg.state.streamState == RSSL_STREAM_OPEN
				&& msg->refreshMsg.state.dataState == RSSL_DATA_OK)
			{
				SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler = (SimpleTunnelMsgHandler*)pTunnelStream->userSpecPtr;
				pSimpleTunnelMsgHandler->tunnelStreamAuthSucceeded = RSSL_TRUE;
				initItemData(pSimpleTunnelMsgHandler);
			}
		...

The logic above will also set a tunnelStreamAuthSucceeded flag to be true, so It’ll convey the application to be able to send post messages through the Tunnel Stream channel.

void handleSimpleTunnelMsgHandler(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler)
{
	time_t currentTime = 0;
	RsslTunnelStream *pTunnelStream = pSimpleTunnelMsgHandler->tunnelStreamHandler.pTunnelStream;

	handleTunnelStreamHandler(pReactor, pReactorChannel, &pSimpleTunnelMsgHandler->tunnelStreamHandler);

	if (pTunnelStream == NULL)
		return;

	/* Don't try to send messages if tunnel is not established. */
	if (pTunnelStream->state.streamState != RSSL_STREAM_OPEN
		|| pTunnelStream->state.dataState != RSSL_DATA_OK)
		return;

	/* If tunnel is open and some time has passed, send a message. */
	time(&currentTime);
	if (currentTime >= pSimpleTunnelMsgHandler->nextMsgTime && pSimpleTunnelMsgHandler->tunnelStreamAuthSucceeded)
	{
		simpleTunnelMsgHandlerSendMessage(pSimpleTunnelMsgHandler);
		pSimpleTunnelMsgHandler->nextMsgTime = currentTime + (time_t)TUNNEL_MSG_FREQUENCY;
	}
}



So, the code above will call the simpleTunnelMsgHandlerSendMessage() function to construct a POST message. The POST message has to be populated using RsslPostMsg structure and encoded using rsslEncodeMsgInit.

static void simpleTunnelMsgHandlerSendMessage(SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler)
{
	...

	RsslTunnelStream *pTunnelStream = pSimpleTunnelMsgHandler->tunnelStreamHandler.pTunnelStream;

	...

	/* Write text as the data body. */
	RsslPostMsg postMsg = RSSL_INIT_POST_MSG;

	/* set-up message */
	postMsg.msgBase.msgClass = RSSL_MC_POST;
	postMsg.msgBase.streamId = 1;
	postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;
	postMsg.msgBase.containerType = RSSL_DT_MSG;

	postMsg.flags = RSSL_PSMF_POST_COMPLETE
		| RSSL_PSMF_ACK // request ACK
		| RSSL_PSMF_HAS_POST_ID
		| RSSL_PSMF_HAS_SEQ_NUM
		| RSSL_PSMF_HAS_POST_USER_RIGHTS
		| RSSL_PSMF_HAS_MSG_KEY;

	postMsg.postId = pSimpleTunnelMsgHandler->nextPostId++;
	postMsg.postUserRights = RSSL_PSUR_CREATE | RSSL_PSUR_DELETE;

	...
	// encode message 
	if ((ret = rsslSetEncodeIteratorBuffer(&eIter, pBuffer)) < RSSL_RET_SUCCESS)
	{
		printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);
		return;
	}
	rsslSetEncodeIteratorRWFVersion(&eIter, pTunnelStream->classOfService.common.protocolMajorVersion,
		pTunnelStream->classOfService.common.protocolMinorVersion);

	// Convey that there will be a nested MARKET_PRICE UPDATE message in the outer POST message
	ret = rsslEncodeMsgInit(&eIter, (RsslMsg *)&postMsg, 0);
	if (ret < RSSL_RET_SUCCESS)
	{
		printf("encodePostWithMsg: rsslEncodeMsgInit() failed with return code: %d\n", ret);
		return;
	}

	// Encode MARKET_PRICE UPDATE message and its fields.
	ret = encodePostMarketPriceResponse(&eIter, &(pSimpleTunnelMsgHandler->itemData));
	...



OMM Marketprice has to be sent within the post message. In order to send OMM Market price, the structure RsslUpdateMsg has to be populated and encoded using rsslEncodeMsgInit. 

RsslRet encodePostMarketPriceResponse(RsslEncodeIterator *encodeIter, ItemData *itemData)
{
	RsslRet ret = 0;
	RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;
	RsslMsgBase* msgBase;
	RsslMsg* msg;
	char stateText[128];
	char errTxt[256];
	RsslBuffer errorText = { 255, (char*)errTxt };

	/* set-up message */
	msgBase = &updateMsg.msgBase;
	msgBase->msgClass = RSSL_MC_UPDATE;
	/* include msg key in updates for non-interactive provider streams */
	updateMsg.flags = RSSL_UPMF_HAS_MSG_KEY;
	msgBase->msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_NAME_TYPE;

	msgBase->domainType = RSSL_DMT_MARKET_PRICE;
	msgBase->containerType = RSSL_DT_FIELD_LIST;

	/* StreamId */
	msgBase->streamId = 0;

	/* Itemname */
	msgBase->msgKey.name = itemData->itemName;
	msgBase->msgKey.nameType = RDM_INSTRUMENT_NAME_TYPE_RIC;

	msg = (RsslMsg *)&updateMsg;

	/* encode message */
	if ((ret = rsslEncodeMsgInit(encodeIter, msg, 0)) < RSSL_RET_SUCCESS)
	{
		printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);
		return ret;
	}

	RsslFieldList fList = RSSL_INIT_FIELD_LIST;
	RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;

	/* encode field list */
	fList.flags = RSSL_FLF_HAS_STANDARD_DATA;
	if ((ret = rsslEncodeFieldListInit(encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)
	{
		printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);
		return ret;
	}

	/* encode fields */
	/* BID */
	rsslClearFieldEntry(&fEntry);
	fEntry.fieldId = 22;
	fEntry.dataType = RSSL_DT_REAL;

	if ((ret = rsslEncodeFieldEntry(encodeIter, &fEntry, (void*)&(itemData->bid))) < RSSL_RET_SUCCESS)
	{
		printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
		return ret;
	}

	/* ASK */
	rsslClearFieldEntry(&fEntry);
	fEntry.fieldId = 25;
	fEntry.dataType = RSSL_DT_REAL;

	if ((ret = rsslEncodeFieldEntry(encodeIter, &fEntry, (void*)&(itemData->ask))) < RSSL_RET_SUCCESS)
	{
		printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);
		return ret;
	}

	...

The snippet code above shows the application encodes the update message consists of BID and ASK fields in the FieldList structure as a payload of this nested message.

Finally, the encoded Post message has to be sent out using rsslTunnelStreamSubmit.

static void simpleTunnelMsgHandlerSendMessage(SimpleTunnelMsgHandler *pSimpleTunnelMsgHandler)
{
	...

	// Encode MARKET_PRICE UPDATE message and its fields.
	ret = encodePostMarketPriceResponse(&eIter, &(pSimpleTunnelMsgHandler->itemData));
	...

	/* Message encoding complete; submit it. */
	rsslClearTunnelStreamSubmitOptions(&submitOpts);
	submitOpts.containerType = RSSL_DT_MSG;
	if ((ret = rsslTunnelStreamSubmit(pTunnelStream, pBuffer, &submitOpts, &errorInfo)) != RSSL_RET_SUCCESS)
	{
		printf("rsslTunnelStreamSubmit(): Failed <%s>\n", errorInfo.rsslError.text);
		if ((ret2 = rsslTunnelStreamReleaseBuffer(pBuffer, &errorInfo)) != RSSL_RET_SUCCESS)
			printf("rsslTunnelStreamReleaseBuffer(): Failed <%d:%s>\n", ret2, errorInfo.rsslError.text);
		return;
	}

	++pSimpleTunnelMsgHandler->msgCount;
	updateItemData(&(pSimpleTunnelMsgHandler->itemData));
}

Remark: Every encodes initialize function has to be completed by invoking the corresponding rssl encode complete API. For example, encoding function of rsslEncodeMsgInit has to be completed by calling rsslEncodeMsgComplete).

A few things to note here:

  1. The unique Post ID can be retrieved from the Ack response you get back - so you can tie up the Ack to the Post and confirm that each Post has been acknowledged by the server
  2. RCC currently only allows the Update of existing records on the server via an API. New records can be defined manually by a RCC administrator.

Once a PostMsg has been submitted, the application should get back an Acknowledgement from the server via the same registered callback method, which is simpleTunnelMsgHandlerConsumerMsgCallback in this case.

RsslReactorCallbackRet simpleTunnelMsgHandlerConsumerMsgCallback(RsslTunnelStream *pTunnelStream, RsslTunnelStreamMsgEvent *pEvent)
{
	switch (pEvent->containerType)
	{
	case RSSL_DT_MSG:
	{
		RsslMsg *msg = pEvent->pRsslMsg;

		switch (msg->msgBase.msgClass)
		{
		case RSSL_MC_REFRESH:
		{
			...			
		}
		break;
		case RSSL_MC_STATUS:
		{
			...

		}
		break;
		case RSSL_MC_ACK:
		{
			printf("simpleTunnelMsgHandlerConsumerMsgCallback: received ack message\n");
			printf("\tget an ack message with ackId:%d\n", msg->ackMsg.ackId);
			if (msg->ackMsg.flags & RSSL_AKMF_HAS_NAK_CODE)
				printf("\t\t- NAK code\t= %d\n", msg->ackMsg.nakCode);
			if (msg->ackMsg.flags & RSSL_AKMF_HAS_TEXT)
			{
				printf("\t\t- text\t\t= %.*s\n", msg->ackMsg.text.length, msg->ackMsg.text.data);
			}
			printf("\n");

		}
		break;

		...

Note: It is possible to receive a Nack response if the Post is not accepted by the RCC server e.g. if the application uses an invalid RIC or try to update Fields that are not defined in the target RIC.

Here this is the example of a Nack response that contains various reasons:

<!-- End Message (Channel IPC descriptor = 8) -->
<!-- Incoming Message (Channel IPC descriptor = 8) -->
<!-- Time: 18:33:38:770 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="1" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="2" dataSize="34">
    <extendedHeader data="01"/>
    <dataBody>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
        <ackMsg domainType="RSSL_DMT_MARKET_PRICE" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x2A (RSSL_AKMF_HAS_TEXT|RSSL_AKMF_HAS_SEQ_NUM|RSSL_AKMF_HAS_NAK_CODE)" ackId="0" nakCode="RSSL_NAKC_SYMBOL_UNKNOWN" text="Symbol unknown" seqNum="0" dataSize="0">
            <dataBody>
            </dataBody>
        </ackMsg>
    </dataBody>
</genericMsg>

<!-- End Message (Channel IPC descriptor = 8) -->
simpleTunnelMsgHandlerConsumerMsgCallback: received ack message
        get an ack message with ackId:0
                - NAK code      = 10
                - text          = Symbol unknown

<!-- End Message (Channel IPC descriptor = 8) -->
<!-- Incoming Message (Channel IPC descriptor = 8) -->
<!-- Time: 18:35:43:281 -->
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
<genericMsg domainType="RSSL_DMT_SYSTEM" streamId="1" containerType="RSSL_DT_MSG" flags="0x19 (RSSL_GNMF_HAS_EXTENDED_HEADER|RSSL_GNMF_HAS_SEQ_NUM|RSSL_GNMF_MESSAGE_COMPLETE)" seqNum="4" dataSize="33">
    <extendedHeader data="01"/>
    <dataBody>
<!-- rwfMajorVer="14" rwfMinorVer="1" -->
        <ackMsg domainType="RSSL_DMT_MARKET_PRICE" streamId="1" containerType="RSSL_DT_NO_DATA" flags="0x2A (RSSL_AKMF_HAS_TEXT|RSSL_AKMF_HAS_SEQ_NUM|RSSL_AKMF_HAS_NAK_CODE)" ackId="2" nakCode="RSSL_NAKC_DENIED_BY_SRC" text="No such field" seqNum="0" dataSize="0">
            <dataBody>
            </dataBody>
        </ackMsg>
    </dataBody>
</genericMsg>

<!-- End Message (Channel IPC descriptor = 8) -->
simpleTunnelMsgHandlerConsumerMsgCallback: received ack message
        get an ack message with ackId:2
                - NAK code      = 2
                - text          = No such field



Closing summary

A few points worth noting here:

  • The application provides updates for a single RIC, sending out the first Post once it gets a valid Client Login Response and then Posting an update periodically while monitoring an AckMsg back from the server. In reality, the application will be most likely contributing data to several RICs and therefore, it can submit multiple Posts at any time once the connection is established and receives a successful Client Login response. The trigger for sending further updates out etc will be as and when data need to be performed
  • As this is a simple tutorial, the application doesn't verify that each of post message was acknowledged. In a Production application, the application may well want to verify that it receives an AckMsg back for each PostMsg submitted. As mentioned, this can be done by comparing the AckId in the response to the PostId set in each outgoing PostMsg.
  • This tutorial updates the same two BID and ASK fields in all Posts, however, you should only send out those Fields that have changed since your previous submission.

 

Additional Resources

If you have any further questions, kindly post them on our Developer Forum or contact our Data Helpdesk

Existing Example mentioned above:

  • \Eta\Applications\Examples\VAConsumer

Building an application from the tutorial source code (For Elektron-SDK 1.2.x)

For Windows
  • Extract the zip file and place the source folder: VAConsumerContribution in the same path as ETA's Examples directory.

  • Set an environment named EtaInstallPath, and point to an <ELEKTRON_SDK_ROOT_FOLDER>\Cpp-C\Eta folder

  • Open a VisualStudio 2013 project (VAConsumerContribution_VS120.vcxproj), build and run the project.

For Linux
  • Extract the output folder to <ELEKTRON_SDK_ROOT_FOLDER>\Cpp-C\Eta\Applications\Examples. It contains a makefile used for application build.
  • In the VAConsumerContribution folder, build the application with the following command.
gmake

 

  • The executable file will be generated in the following executable directory.
  • > <tutorial install dir>/<executable dir>/vaconsumercontribution
  • Eg: > ./OL6_64_GCC444/Optimized/vaconsumercontribution   (runs the statically build version using GCC ver 4.4.4 for Oracle Linux 6.0)

 

The tutorial application's arguments

Here, there is a list of arguments for this tutorial application.

$vaconsumercontribution ?
Unknown option: ?
Usage: OL7_64_GCC482/Optimized/vaconsumercontribution or
OL7_64_GCC482/Optimized/vaconsumercontribution  [-tcp [<hostname>:<port>]] [-uname <LoginUsername>] [-tunnel] [-trceUser <username>] [-trcePass <password>] [-trcePostItem <itemName>] [-x] [-runtime <seconds>]

 -tcp specifies a connection to open and a list of items to request:
     hostname:        Hostname of provider to connect to
     port:            Port of provider to connect to
         A comma-separated list of these may be specified.
         Example Usage: -tcp localhost:14002
 -uname changes the username used when logging into the provider.
 -tunnel causes the consumer to open a tunnel stream that exchanges basic messages.
 -trceUser is a username to be used to for AAA authentication.
 -trcePass is a obfuscated to be used to for AAA authentication.
 -trcePostItem is an itemName for Post messages sent to RCC server.
 -x provides an XML trace of messages.
 -runtime adjusts the running time of the application.[piyasak@apis43 VAConsumerContribution]$

 

Tutorial Group: 
ETA Consumer