Elektron SDK - C/C++

API Family: Elektron

ETA Consumer - Request and Decode Machine Readable News

Download tutorial source code

Click here to download

Last updated September 2018
Compilers Visual Studio 2013
Prerequisites Complete the first 5 ETA Consumer tutorials in this series

Introduction

This tutorial assumes you are familiar with consuming MarketPrice data using ETA C and as a minimum, you have worked through the Transport API Consumer Tutorial on the Refinitiv Developer portal and read the "MRN data model and Elektron implementation guide". If you have not worked through the Transport API Consumer Tutorial or are not familiar with the concepts covered, it is strongly recommended that you complete the tutorial first. 

For this tutorial, we will use the source code from the tutorial 5 of the Transport API Consumer Tutorial as the basis and add additional required code to illustrate how to Request and Decode News Text Analytics (MRN) domain from a Machine Readable News (MRN) service.

Machine Readable News

Machine Readable News (MRN) data is published over Elektron using an Open Message Model (OMM) envelope in News Text Analytics domain RSSL messages. The News Text Analytics domain is designed for publishing large complex nested data structures using a FieldList-based Envelope. 

Whilst Elektron can deliver large payloads, to ensure optimized transport of MRN data items (which can extend to several kilobytes), MRN items may be fragmented and delivered as multiple envelope messages. Each envelope message has several fields to hold metadata and one field to hold the actual data (fragment) itself. To further optimize the message transportation, the MRN item is converted to a JSON UTF-8 string, compressed using Zlib prior to splitting into fragments (if required) as part of the message encoding process. The consumer needs to re-assemble the fragments and unzip the completed buffer before it can be used. 

In view of the above, processing a response to MRN request is more involved than a typical MarketPrice response.

Description

The News Text Analytics Domain Model describes how the MRN data items are modeled using OMM. It makes use of the FieldList OMM container for the metadata and the Zlib library to decompress the compressed MRN data fragments.

Initial Refresh Message

The Initial Refresh response does not contain any MRN data – but the relevant Fields are populated with appropriate values. e.g., an MRN_STORY Refresh could contain: 

RIC: MRN_STORY
FIELDLIST (StandardDataCount=16):
  FieldEntry PROD_PERM (1): 10001
  FieldEntry ACTIV_DATE (17): 21/10/2015
  FieldEntry RECORDTYPE (259): 30
  FieldEntry RDN_EXCHD2 (1709): 1370->"MRN"
  FieldEntry TIMACT_MS (4148): 60413133
  FieldEntry GUID (4271): <noData>
  FieldEntry CONTEXT_ID (5357): 3752
  FieldEntry DDS_DSO_ID (6401): 12424
  FieldEntry SPS_SP_RIC (6480): ".[SPSML1L1"
  FieldEntry MRN_V_MAJ (8506): "2"
  FieldEntry MRN_TYPE (8593): "STORY"
  FieldEntry MRN_V_MIN (11787): "10"
  FieldEntry MRN_SRC (12215): "DTC_QA_A"
  FieldEntry FRAG_NUM (32479): 0
  FieldEntry TOT_SIZE (32480): 0
  FieldEntry FRAGMENT (32641): <noData>

        ...

As you can see, the FRAGMENT field which would contain the MRN data fragments is empty – as are the GUID, FRAG_NUM and TOT_SIZE fields.

Update Messages

However, in the subsequent Update messages, these fields will be populated - but none of the feed related / static fields (contained in the Refresh response) will be, for example: 

RIC: MRN_STORY
FieldEntry TIMACT_MS (4148): 37698507
  FieldEntry ACTIV_DATE (17): 22/10/2015
  FieldEntry MRN_TYPE (8593): "STORY"
  FieldEntry MRN_V_MAJ (8506): "2"
  FieldEntry MRN_V_MIN (11787): "10"
  FieldEntry TOT_SIZE (32480): 1006
  FieldEntry FRAG_NUM (32479): 1
  FieldEntry GUID (4271): "HKS6BMNj5_1510222UAxU7Co7ts6yZmhm642yYMvKxt3AXKDp1zPwS"
  FieldEntry MRN_SRC (12215): "DTC_QA_A"
  FieldEntry FRAGMENT (32641):  ZIPPED        
...

Some of the key Fields to note: 

  • MRN_TYPE: Type of MRN item - Headline, Story, News Analytics, News Sentiment Indices
  • TOT_SIZE: Total size in bytes of fragmented data
  • FRAG_NUM: Sequential fragment number
  • GUID: Globally Unique Identifier for the MRN Item
  • MRN_SRC: Source component that published this MRN Item
  • FRAGMENT: Zlib compressed data fragment

Multi Fragment Items

The other point to note is that for a Multi fragment item, Update messages with FRAG_NUM >1 will have fewer FIDs as the metadata is included in the first Update message (FRAG_NUM=1) for that item as shown below:

FIELDLIST (StandardDataCount=10):
  FieldEntry TIMACT_MS (4148): 37740249
  FieldEntry ACTIV_DATE (17): 22/10/2015
  FieldEntry MRN_TYPE (8593): "STORY"
  FieldEntry MRN_V_MAJ (8506): "2"
  FieldEntry MRN_V_MIN (11787): "10"
  FieldEntry TOT_SIZE (32480): 2939
  FieldEntry FRAG_NUM (32479): 1
  FieldEntry GUID (4271): "Bw8SQFvGa_1510222GwUDOcedmj/Aoy2thyoBkTblGmxkWUZZ8be8K"
  FieldEntry MRN_SRC (12215): "DTC_QA_A"
  FieldEntry FRAGMENT (32641):  ZIPPED        
...
...
FIELDLIST (StandardDataCount=4):
  FieldEntry GUID (4271): "Bw8SQFvGa_1510222GwUDOcedmj/Aoy2thyoBkTblGmxkWUZZ8be8K"
  FieldEntry MRN_SRC (12215): "DTC_QA_A"
  FieldEntry FRAG_NUM (32479): 2
  FieldEntry FRAGMENT (32641):  ZIPPED

In the above example, you can see that only the essential fields are repeated i.e. the unique identifier, source name, fragment number and of course the fragment payload. We will use these fields later when re-assembling fragments of a multi-fragment MRN item to ensure that we are appending fragments to the correct MRN item.

Decoding & Re-assembly Overview

As well as using standard ETA/TREP Status & State indicators to identify connectivity or data issues, the FRAG_NUM and TOT_SIZE fields should be used to detect missing fragments.

The FRAG_NUM FID is set to 1 for the first update of each item and is incremented in each subsequent update for that item. This allows you to detect a missing fragment (and ensure correct order of the fragments for re-assembly). 

Additionally, the TOT_SIZE contains the total size of the complete set of fragmented data in bytes. By comparing TOT_SIZE with the sum of the fragment sizes received, it should allow you to confirm when all the fragments for an item have been received and the MRN item is complete.

Using the FRAG_NUM and TOT_SIZE to detect outages requires the consumer to implement timeout functionality because we need to allow time for the missing fragments to reach the consumer before assuming they have been missed. This functionality is outside the scope of this tutorial.

Finally, as the FRAGMENT field contains compressed data, we will need to use the Zlib library to decompress the payload to access the true data content. Note that for a multi-fragment MRN item, you will have to ensure you have received all the fragments before unzipping the complete multi-fragment buffer.

To decode & re-assemble the MRN item we will create a new file, "basicMRNHandler". The file will contain a new struct to represent an MRN item, and some helper functions to decode and assemble multi-fragment items.

Machine Readable News Item Struct

The key functions of the struct simplify the decoding process and the re-assembly of multi-fragment messages.

/* Machine Readable News Item */
typedef struct machineReadableNewsItem{
	RsslBuffer fragmentBuffer;
	char guid[GUID_SIZE];
	char source[MRN_SRC_SIZE];
	char type[MRN_TYPE_SIZE];
	RsslUInt64 fragmentNumber;
	RsslUInt64 expectedSize;
}MachineReadableNewsItem;

 

We also need a method to copy the struct and free the allocated fragment buffer memory

Please remember that we already know the final size of the buffer. So when we allocate the memory, we should use the expected total size.

/*
* Free allocated memory and set buffer length to 0.
* _ptr - The pointer to MachineReadableNewsItem to be freed.
*/
void mrnFree(MachineReadableNewsItem* _ptr)
{
	/* Do not free() if the length is already zero */
	if (_ptr->fragmentBuffer.length > 0)
		free(_ptr->fragmentBuffer.data);
	rsslClearBuffer(&_ptr->fragmentBuffer);
}

/*
* Copy MachineReadableNewsItem. The allocated size of Destination fragmentBuffer is equal to expectedSize of Source
* The method will call mrnFree(_Dest) to free previously allocated fragmentBuffer before reallocate
* _Dest - Pointer to the destination where the content is to be copied.
* _Source - Pointer to the source of data to be copied.
*/
void mrnCopy(MachineReadableNewsItem* _Dest, MachineReadableNewsItem* _Source)
{
	mrnFree(_Dest);
	_Dest->expectedSize = _Source->expectedSize;
	_Dest->fragmentBuffer.data = (char*)calloc(_Source->expectedSize, sizeof(char));
	memcpy(_Dest->fragmentBuffer.data, _Source->fragmentBuffer.data, _Source->fragmentBuffer.length);
	_Dest->fragmentBuffer.length = _Source->fragmentBuffer.length;
	_Dest->fragmentNumber = _Source->fragmentNumber;
	strcpy(_Dest->guid, _Source->guid);
	strcpy(_Dest->source, _Source->source);
	strcpy(_Dest->type, _Source->type);
}

 

For multi-fragment messages, we need a method to check the validity of the received fragment and to build up the fragment buffer.

/*
* Append the fragmentBuffer of _Fragment to _Item. If GUID or Source of _Fragment is not equal to _Item,
* or _Fragment size is larger than expectedSize, it will return RSSL_RET_FAILURE.
* _Item - Pointer to the MachineReadableNewsItem, which should be large enough to contain the concatenated resulting buffer
* _Fragment - Pointer to MachineReadableNewsItem to be appended
*/
RsslRet addFragment(MachineReadableNewsItem* _Item, MachineReadableNewsItem* _Fragment)
{
	if (strncmp(_Item->guid, _Fragment->guid, GUID_SIZE) != 0)
	{
		printf("Cannot add fragment to item: mismatching GUID\n");
		return RSSL_RET_FAILURE;
	}
	if (strncmp(_Item->source, _Fragment->source, MRN_SRC_SIZE) != 0)
	{
		printf("Cannot add fragment to item: mismatching data source\n");
		return RSSL_RET_FAILURE;
	}
	if (_Item->fragmentNumber + 1 != _Fragment->fragmentNumber)
	{
		printf("Cannot add fragment to item: fragment number is not in sequence\n");
		return RSSL_RET_FAILURE;
	}
	if (_Item->fragmentBuffer.length + _Fragment->fragmentBuffer.length > _Item->expectedSize)
	{
		printf("Cannot add fragment to item: fragment size is larger than expected total size\n");
		return RSSL_RET_FAILURE;
	}

	_Item->fragmentNumber = _Fragment->fragmentNumber;
	memcpy((_Item->fragmentBuffer.data + _Item->fragmentBuffer.length), _Fragment->fragmentBuffer.data, _Fragment->fragmentBuffer.length);
	_Item->fragmentBuffer.length = (_Item->fragmentBuffer.length + _Fragment->fragmentBuffer.length);

	printf("Add Fragment:: Expected total buffer: %i current %i\n", _Item->expectedSize, _Item->fragmentBuffer.length);
	return RSSL_RET_SUCCESS;
}

 

And finally, we declare a static global variable for MRN Item. We will use this variable to combine multi fragment items.

/*
* MRN Item for multi-fragment
*/
static MachineReadableNewsItem multiFragItem;

News Text Analytics Send Request

Before sending a request, we have to make sure that the service can handle News Text Analytics domain. We can do so by checking the capabilities of the service.

We will add another if statement to the processDirectoryResponse in basicDirectoryHandler.c which compare the capability number with the News Text Analytic domain enum.

/* check if name matches service name entered by user. 
 * if it does, store the service ID. Then check if 
 * the service support News Text Analytic domain.
 * This will be used to make item requests later. */
tmpServiceNameBuffer.data = service->serviceName;
tmpServiceNameBuffer.length = (RsslUInt32)strlen(service->serviceName);
if (rsslBufferIsEqual(&pService->info.serviceName, &tmpServiceNameBuffer))
{
	RsslUInt32 j;
	service->serviceId = (RsslUInt16)pService->serviceId;
	printf("\tFound your service %s using serviceId: %d\n", service->serviceName, pService->serviceId);
	for (j = 0; j < pService->info.capabilitiesCount; ++j)
	{
		if (pService->info.capabilitiesList[j] == RSSL_DMT_NEWS_TEXT_ANALYTICS)
		{
			service->supportNTA = RSSL_TRUE;
			printf("\tRSSL_DMT_NEWS_TEXT_ANALYTICS domain type is supported.\n");
		}
	}
}

 

Then, we only make item request if the service supports News Text Analytics domain. We add another check to channelEventCallback in "basicConsumer.C".

/*
* Processes events about the state of an RsslReactorChannel.
*/
RsslReactorCallbackRet channelEventCallback(RsslReactor *pReactor, RsslReactorChannel *pReactorChannel, RsslReactorChannelEvent *pConnEvent)
{
	switch (pConnEvent->channelEventType)
	{
...
	case RSSL_RC_CET_CHANNEL_READY:
		if (service.supportNTA == RSSL_TRUE)
		{
			printf("Channel is ready.  Subscribing to %s\n", itemName);
			sendItemRequest(pReactor, pReactorChannel, itemName, service.serviceId, domainType);
		}
		else
		{
			printf("Channel is ready.  But the service does not support News Text Analytic domain.\n");
			cleanUpAndExit(-1);
		}
		break;

 

Sending Machine Readable News request is the same as sending a market price request. In this example, we will simply modify the itemName and domainType that defined within the file "basicConsumer.c".

char itemName[] = "MRN_STORY";
RsslUInt8 domainType = RSSL_DMT_NEWS_TEXT_ANALYTICS;

 

Next, we add the decodeMRNPayload method to processRDMResponse to decode the response.

RsslRet processRDMResponse(RsslReactorChannel *pReactorChannel, RsslMsg *msg, RsslDecodeIterator* dIter)
{
...
	switch (msg->msgBase.msgClass)
	{
...
	case RSSL_MC_UPDATE:
		printf("Domain: %s\n", rsslDomainTypeToString(msg->msgBase.domainType));

		if (msg->msgBase.domainType == RSSL_DMT_NEWS_TEXT_ANALYTICS)
		{
			ret = decodeMRNPayload(getDictionary(), dIter);
			break;
		}

News Text Analytics Message Decoding

For decoding, we can reuse the decodeFieldList and decodeFieldEntry methods. The methods shall dump the metadata fields to the console like before. But we will add the decoded data to the local MRN Item.

/* decode and print out fid value */
dataType = dictionaryEntry->rwfType;
switch (dataType)
{
case RSSL_DT_UINT:
	if ((ret = rsslDecodeUInt(dIter, &fidUIntValue)) == RSSL_RET_SUCCESS)
	{
		printf("" RTR_LLU "\n", fidUIntValue);
		switch (fEntry->fieldId)
		{
		case FRAG_NUM: // Fragment Number
			mrnItem->fragmentNumber = fidUIntValue;
			break;
		case TOT_SIZE: // Expected total size of all fragments
			mrnItem->expectedSize = fidUIntValue;
			break;
		default:
			break;
		}
	}
	else if (ret != RSSL_RET_BLANK_DATA)
	{
		printf("rsslDecodeUInt() failed with return code: %d\n", ret);
		return ret;
	}
	break;
...
case RSSL_DT_ASCII_STRING:
case RSSL_DT_UTF8_STRING:
case RSSL_DT_RMTES_STRING:
	if ((ret = rsslDecodeBuffer(dIter, &fidBufferValue)) == RSSL_RET_SUCCESS)
	{
		printf("%.*s\n", fidBufferValue.length, fidBufferValue.data);
		switch (fEntry->fieldId)
		{
		case GUID_FID: // The Unique (source specific) ID for the News Item
			strncpy(mrnItem->guid, fidBufferValue.data, fidBufferValue.length);
			break;
		case MRN_SRC: // The component which published the data
			strncpy(mrnItem->source, fidBufferValue.data, fidBufferValue.length);
			break;
		case MRN_TYPE: // The type of News Item e.g Story, Headline etc
			strncpy(mrnItem->type, fidBufferValue.data, fidBufferValue.length);
			break;
		default:
			break;
		}
	}
	else if (ret != RSSL_RET_BLANK_DATA)
	{
		printf("rsslDecodeBuffer() failed with return code: %d\n", ret);
		return ret;
	}
	break;

In its standard form the existing decodeFieldEntry method will dump any compressed FRAGMENT field contents as garbage to the console. To avoid this, we have to separate the switch case of RSSL_DT_BUFFER from other buffer type data:

case RSSL_DT_BUFFER:
	if ((ret = rsslDecodeBuffer(dIter, &fidBufferValue)) == RSSL_RET_SUCCESS)
	{
		if (fEntry->fieldId == FRAGMENT)	// The zipped Fragment buffer
		{
			printf("<<COMPRESSED>>\n");
			mrnItem->fragmentBuffer = fidBufferValue;
		}
	}
	else if (ret != RSSL_RET_BLANK_DATA)
	{
		printf("rsslDecodeBuffer() failed with return code: %d\n", ret);
		return ret;
	}
	break;

Build Up and Decompress NTA Item

As mentioned earlier, the Refresh message does not contain any MRN Data fragments, therefore, we only use a custom decompress method for Update messages. However, the refresh message will have fragment number fields as zero. We can use this to filter out the refresh message.

/*
* Start combine and uncompress fragment
*/

/* Is this 1st fragment of a News Item? */
if (mrnItem.fragmentNumber == 1)
{
	/* Is it the complete News Item ? */
	if (isMRNComplete(&mrnItem))	// check length of this single fragment buffer == expected total size of News Item
	{
		decompressedBuf.data = (char*)calloc(mrnItem.fragmentBuffer.length * 4 , sizeof(char));
		printf("\n<<Single Fragment>>\n");
		if (decompress(&mrnItem.fragmentBuffer, &decompressedBuf) == RSSL_RET_SUCCESS)	// decompress the buffer
		{
			printf("%.*s\n\n", decompressedBuf.length, decompressedBuf.data);	// Display the decompressed News Item
		}
		free(decompressedBuf.data);
	}
	else if (mrnItem.expectedSize != 0)
	{ /* Start of a multi-fragment item. so copy this first fragment into our pending item instance */
		mrnCopy(&multiFragItem, &mrnItem);
	}

}
else if (mrnItem.fragmentNumber > 1)
{	// FRAG_NUM > 1 so continue to build up News Item
	if (addFragment(&multiFragItem, &mrnItem) == RSSL_RET_SUCCESS)	// Add the newly received fragment to the pending instance
	{
		if (isMRNComplete(&multiFragItem))	// Does length of the buffer now == expected total size of News Item ?
		{
			decompressedBuf.data = (char*)calloc(mrnItem.fragmentBuffer.length * 4, sizeof(char));
			printf("<<Multi Fragment complete>>\n");
			if (decompress(&multiFragItem.fragmentBuffer, &decompressedBuf) == RSSL_RET_SUCCESS)	// decompress the buffer
			{
				printf("%.*s\n\n", decompressedBuf.length, decompressedBuf.data);	// Display the decompressed News Item
			}
			free(decompressedBuf.data);
		}
	}
}

 

As you can see, if the MRN item is completed in a single fragment, we can immediately decompress and output to the screen. However, if this is the first of a multi-fragment message, we initialize our global multi-fragment storage item with this initial item. 

When we receive further fragments we add them to the global multi-fragment item until all fragments have been received and once all the fragments have been received we can decompress the full buffer and output to the screen.

if (isMRNComplete(&multiFragItem))	// Does length of the buffer now == expected total size of News Item ?
{
	decompressedBuf.data = (char*)calloc(mrnItem.fragmentBuffer.length * 4, sizeof(char));
	printf("<<Multi Fragment complete>>\n");
	if (decompress(&multiFragItem.fragmentBuffer, &decompressedBuf) == RSSL_RET_SUCCESS)	// decompress the buffer
	{
		printf("%.*s\n\n", decompressedBuf.length, decompressedBuf.data);	// Display the decompressed News Item
	}
	free(decompressedBuf.data);
}

 

We will not include the decompress code here as it is not API specific code - the full method can be found in the accompanying source files. For ease of compiling this tutorial, we have used the Zlib 1.2.3 from the ETA package itself. However, this means the tutorial code must be compiled as a static library only. You will have to add a reference to a precompiled Zlib if you wish to build your project as a shared library.

Build and Run

For these instructions, refer to the Build and Run section within the first tutorial of this series. Again, be sure to either define the service name 'elektron' within your environment to point to a valid market data server or simply modify the server configuration parameters in basicConsumer.c file.

Note that you may see what looks like garbage output in the completed MRN console output – this is not unzipped data – it is foreign language characters which cannot be represented correctly on the console. Displaying this using the appropriate character set is outside the scope of this tutorial.

 

Example output from an MRN_STORY response

MRN_STORY
DOMAIN: RSSL_DMT_NEWS_TEXT_ANALYTICS
        TIMACT_MS           33040952
        ACTIV_DATE          26 JAN 2016
        MRN_TYPE            STORY
        MRN_V_MAJ           2
        MRN_V_MIN           10
        TOT_SIZE            462
        FRAG_NUM            1
        GUID                DJAZ0055D_16012612GYMAkuLYlQdJAjNZFNWkEjVbzcApCUOb9pKT
        MRN_SRC             DTC_QA_B
        FRAGMENT            <<COMPRESSED>>

<<Single Fragment>>
{"altId":"nDJAZ0055D","audiences":["NP:MFDJ"],"body":"","firstCreated":"2016-01-26T09:10:40.000Z","headline":"DJ MUTUIONLINE: TI
TOLO ACCELERA AL RIALZO, +5,03%<MOL.MI>","id":"DJAZ0055D_16012612GYMAkuLYlQdJAjNZFNWkEjVbzcApCUOb9pKT","instancesOf":[],"languag
e":"it","mimeType":"text/plain","provider":"NS:DJN","pubStatus":"stat:usable","subjects":["A:1","B:125","B:126","B:127","B:129",
"B:134","BL:62","G:3","G:5J","G:A","G:AL","G:BR","G:BS","G:BV","M:DU","M:J","M:K","M:Z","R:MOL.MI","N2:BISV","N2:BSVC","N2:CEEUE
","N2:CMPNY","N2:COFS","N2:DFIN","N2:EMEAE","N2:EU","N2:EUROP","N2:EZC","N2:FIN","N2:FINS","N2:INTAG","N2:IT","N2:NWEE","N2:STX"
,"N2:WEU"],"takeSequence":1,"urgency":1,"versionCreated":"2016-01-26T09:10:40.000Z"}

Tutorial Summary

So now that we've come to the end of the tutorial let's summarise what this example has demonstrated and what we've learned. The key goal was to show how to request and parse Machine Readable News data. We learned that the Initial Refresh message contains only metadata. Subsequent update messages deliver the MRN item in one or more fragments. If delivered as a single fragment we extract the buffer and decompress to obtain the MRN Item. If delivered as multiple fragments, they need to be concatenated in sequence to obtain the complete buffer, before decompressing.

Multiple NTA items

This tutorial deals with requesting a single MRN Item which makes the handling of multi-fragment items relatively straightforward. When requesting multiple MRN Items you need to allow for the Update messages for the different items arriving interspersed between each other. In view of this, the GUID and MRN_SRC should be used to confirm that a fragment is part of the same MRN Item. As Update messages for multi-fragment messages arrive you would need to ensure the additional fragments are added to the correct incomplete item.

One final thing to note that the decompress method - as with the rest of the source code - is provided for illustration purposes only and is not warranted. 

Thank you for reading.

References

For more information on the NTA domain and MRN Data model, please take a look at the following notes:

MRN Data Model and Elektron Implementation Guide - summary of NTA domain, MRN data model, sample output, and implementation commentary.

Tutorial Group: 
ETA Consumer