Article

Implementing Refinitiv Real-Time API applications to work with ATS - Part 1

Pimchaya Wongrukun
Developer Advocate Developer Advocate
Wasin Waeosri
Developer Advocate Developer Advocate
Nipat Kunvutipongsak
Senior Software Engineer Senior Software Engineer
Veerapath Rungruengrayubkul
Developer Advocate Developer Advocate

About this article

Update: March 2021

The intended audiences of this article are software developers/engineers interested in or are currently working with ATS (Refinitiv Real-Time Advanced Transformation Server) i.e. contribute records (RICs), fields, data to ATS or delete fields/RICs from ATS. The article will show how to use our strategic APIs, Refinitiv Real-Time SDK (EMA and ETA, formerly known as Elektron SDK)  and WebSocket API to achieve this. For more details of Real-Time SDK family, please refer to Refinitiv Real-Time SDK family.

The article is structured in two parts:

  • Part 1 - ATS and Posting Overview, what they are and how they can work together. The prerequisites and the overview steps to work with ATS. Finally, how to create a new RIC and its fields to ATS.
  • Part 2 - Other operations which are updating field values, deleting fields or a RIC by using Real-Time APIs. The troubleshooting how to solve some common problems when you work with ATS.

Before working with ATS using Real-Time APIs, you should have a basic understanding of how to log in to the server using Real-Time APIs which can be learned from the following

ATS and Posting Overview

What's ATS?

Refinitiv Real-Time Advanced Transformation Server (ATS) is an analytical server that brings real-time data together from multiple sources, including internal contributed data, and transforms, centrally calculates, and publishes the resulting data onto the platform for further distribution and consumption by other applications and users.

Benefits

ATS can also serve as an internal cache for sharing prices across the organization or feeding them into further calculations and logical rules. Prices can be easily sent to ATS with the RtContribute() function from Eikon Excel. ATS also accepts prices in post messages sent from Refinitiv Real-Time APIs to ATS via Refinitiv Real-Time Distribution System. More than 100,000 updates can be shared every second by using ATS as a central cache.
ATS is the component that bridges the space between Eikon and the Enterprise Platform. Business users can quickly create thousands of models that scale and produce real-time results to be shared across all client sites. ATS also provides highlight features e.g. Real-Time Calculations, Fault Tolerance with ATS Resiliency System. For more details, please refer to ATS documents.

Posting to ATS

ATS is the value-added server. It is positioned as a hybrid application that consumes and provides data on the Refinitiv Real-Time Distribution System network. In addition, it collaborates with other components both on the publication and subscription sides.
Through posting, API consumers can easily push content into any cache within the Refinitiv Real-Time infrastructure (i.e., an HTTP POST request). Data contributions/inserts into the ATS or publishing into a cache offer similar capabilities today. When posting, API consumer applications reuse their existing sessions to publish content to any cache(s) residing within the Real-Time infrastructure (i.e., a service provider(s) and/or infrastructure components). When compared to spreadsheets or other applications, posting offers a more efficient form of publishing, because the application does not need to create a separate provider session or manage event streams.

ATS supports OMM MarketPrice data format which increases the overall performance. The two types of posting are on-stream and off-stream:

  • On-Stream Post: Before sending an on-stream post, the client must first open (request) a data stream for an item. After opening the data stream, the client application can then send a post. The route of the post is determined by the route of the data stream.
  • Off-Stream Post: In an off-stream post, the client application can send a post for an item via a Login stream, regardless of whether a data stream first exists. The route of the post is determined by the Core Infrastructure (i.e., Refinitiv Real-Time Advanced Distribution Server, Refinitiv Real-Time Advanced Data Hub, etc.) configuration.

The posting capability offers optional acknowledgments per posted message to indicate receipt of a specific message. The acknowledgment carries success or failure (i.e., a negative acknowledgment or ‘NAK’) information to the consumer. On the consumer side, it can choose whether it wants an acknowledgment back for a particular post message or not.

Visible Publisher Identifier (VIP)

The customer can also specify the originating data contributor which is called Visible Publisher Identifier (VPI) information with the Post message. You can use Visible Publisher Identifier data to identify the user ID and user address for users who post, insert or publish to the ATS server (or Refinitiv Real-Time infrastructure cache).  Visible Publisher Identifier (VPI) consists of:

  • Post User Id (i.e., publisher ID): which should be an ID associated with the user. For example, a DACS user ID or if unavailable, a process id)
  • Post User Address (i.e., publisher address): which normally contains the IP address of the application posting the content. The EMA Java and C++ APIs accept this value as a long value, so the customer must convert the IP address to a long value. 

Optionally, such information can be carried along with republished messages so that receiving consumers can identify the posting user. The application can set VIP information via the following RTSDK APIs methods:

 

EMA Java and C++ APIs:

  • PostMsg.publisherId(long userId, long userAddress)

ETA Java API:

  • PostMsg.postUserInfo().userAddr(long userAddr) or PostMsg.postUserInfo().userAddr(java.lang.String userAddrString)
  • PostMsg.postUserInfo().userId(long userId)

ETA C++ API:

  • RsslPostMsg.postUserInfo.postUserAddr = RsslUint32 address
  • RsslPostMsg.postUserInfo.postUserId RsslUint32 userId

WebSocket API JSON Post Message:

  • {"Type": "Post", "PostUserInfo": { "Address" : <string - address> , "UserId": <int - userId> } }


Prerequisites to Posting messages to ATS via Refinitiv Real-Time Distribution System

  1. ATS and Refinitiv Real-Time Distribution System (Real-Time Advanced Distribution and Real-Time Advanced Data Hub servers) must be ready to use. Refinitiv Real-Time Distribution System can work with ATS properly. RSSL connection on Real-Time Advanced Distribution Server,/Real-Time Advanced Data Hub server must be enabled for consumer applications. To set up Real-Time Advanced Distribution Server, Real-Time Advanced Data Hub, and ATS, please contact the Refinitiv Account team (if the infrastructure is owned by Refinitiv) or your infrastructure administrator team (if the infrastructure is owned by your company).
  2. Refinitiv Real-Time Distribution System must contain the ATS field definition of X_RIC_NAME in the RDMFieldDictionary file as shown below:

    	
            X_RIC_NAME "RIC NAME"              -1  NULL        ALPHANUMERIC       32  RMTES_STRING    32
        
        
    

 

To update Refinitiv Real-Time Distribution System dictionary file, please contact Refinitiv Account team (if the infrastructure is owned by Refinitiv) or your infrastructure administrator team (if the infrastructure is owned by your company).

3. For WebSocket applications, Real-Time Advanced Distribution Server version 3.2.1 or higher supporting WebSocket connection is required. In addition, the NUM_OF_WEBSOCKET_CONNECTIONS license need tneeds installed on Real-Time Advanced Distribution Server to enable WebSocket connection feature. Please contact Refinitiv team who can assist you to obtain the license.

Overview of steps to performing ATS operations

ATS allows you to insert/delete fields or RIC, update data by sending OMM post messages from consumer applications to Refinitiv Real-Time Distribution System which connects to ATS. In this article, we will use off-stream posting which is easier than on-stream posting to perform all operations. The overview steps are shown in the figure below:

Note:

  • Before posting the first message in the step 3, the application needs to verify if it can log in to the server successfully. The article will show how to verify this in the later section.
  • The application can perform the step 3 to send a post message repeatly depending on its requirements.

Each Real-Time API package provides a Posting example application using which you can start working with ATS:

  • Real-Time-SDK Java\EMA package provides ex341_MP_OffStreamPost which demonstrates how to consume data, perform off-stream posting and decode ack/nak messages.
  • Real-Time-SDK Java\ETA package provides Consumer which demonstrates how to consume data, perform off-stream or on-stream posting and decode ack/nak messages.
  • Real-Time-SDK C++\EMA package provides 341__MP__OffStreamPost which demonstrates how to consume data, perform off-stream posting and decode ack/nak messages.
  • Real-Time-SDK C++\ETA package provides rsslConsumer which demonstrates how to consume data, perform off-stream or on-stream posting and decode ack/nak messages.
  • WebSocket API Sample Applications package provides market_price_posting.rb which demonstrates how to consume data, perform on-stream posting and decode ack/nak messages in Ruby language which we will demonstrate in this article to perform each operation.

Sample Posting with Adding RIC and Fields

You can use ATS command, ATS_INSERT_S, to create a RIC/record by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will add a RIC named NEW.RIC with field id 22 and 25 with value of 120 and 150 respectively:

    	
            

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="1" postUserId="18" postUserAddr="10.42.61.200" dataSize="39">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>

     <dataBody>

          <REFRESH domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" groupId="0" State: Open/Ok/None - text: "" dataSize="23">

       <dataBody>

            <fieldList flags="0x08 (HAS_STANDARD_DATA)">

         <fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>

<fieldEntry fieldId="22" data="0F0C"/>

<fieldEntry fieldId="25" data="0F0F"/>

    </fieldList>

       </dataBody>

  </REFRESH>

     </dataBody>

</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be ATS_INSERT_S to inform ATS to insert a RIC/Record.
  • The payload of the post message is a Refresh of Market Price message. The payload of the Refresh message is a field list.
  • The field list consists of:
    • The field Id -1 for the RIC/record name to insert to ATS.
    • The fields i.e. field id 22(BID) and field id 25(ASK) with their initial values to be added to this RIC.
    • Data values shown in the fields list are encoded OMM.

An example of success Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="1" text="[4]: Creation Accepted" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>

     <dataBody>

     </dataBody>

</ACK>

Notice that:

  • The Ack message's domain type is Market Price. ackId is 1 which corresponds with the postId(1) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can insert the RIC/record with the fields and their initial values successfully.

The example of each Real-Time API snipped source code to create the post message above for inserting a RIC are below:

  • EMA Java:
    	
            

 //Consumer.java in ex341_MP_OffStreamPost  folder

   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

   {

...

if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 

refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&

refreshMsg.state().dataState() == OmmState.DataState.OK )

{

PostMsg postMsg = EmaFactory.createPostMsg();

RefreshMsg nestedRefreshMsg = EmaFactory.createRefreshMsg();

FieldList nestedFieldList = EmaFactory.createFieldList();

 

//FieldList is a collection in java

nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));

 

nestedFieldList.add(EmaFactory.createFieldEntry().real(22, 12, OmmReal.MagnitudeType.EXPONENT_POS_1));

nestedFieldList.add(EmaFactory.createFieldEntry().real(25, 15, OmmReal.MagnitudeType.EXPONENT_POS_1));

 

nestedRefreshMsg.payload(nestedFieldList );

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

((OmmConsumer)event.closure()).submit( postMsg.postId(1).serviceId(267)

.name( "ATS_INSERT_S" ).solicitAck( true ).complete(true).publisherId(18,170540488)

.payload(nestedRefreshMsg), event.handle() );

}

...

}

  • EMA C++:
    	
            

 /* Consumer.cpp in 341__MarketPrice__OffStreamPost */

   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )

   {

...

if ( refreshMsg.getDomainType() == MMT_LOGIN && 

refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&

refreshMsg.getState().getDataState() == OmmState::OkEnum )

{

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

_pOmmConsumer->submit( PostMsg().postId( 1 ).serviceId( 267 ).name( "ATS_INSERT_S" ).solicitAck( true)

.publisherId(18,170540488).complete().payload(RefreshMsg().payload( FieldList().addAscii(-1,"NEW.RIC").addReal( 22, 12, OmmReal::ExponentPos1Enum).addReal( 25, 15, OmmReal::ExponentPos1Enum ).complete() ).complete() ), ommEvent.getHandle() );

}

...

   }

  • ETA Java:
    	
            

//PostHandler.java in Consumer folder

    //To declare variables used to create a post message for inserting a RIC on ATS

    private RefreshMsg refreshMsg = (RefreshMsg)CodecFactory.createMsg();

    protected FieldList fieldList = CodecFactory.createFieldList();

    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();

    protected UInt tempUInt = CodecFactory.createUInt();

...

    // To create a post message for inserting a RIC on ATS

    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application. 

    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)

    {

   

        // First encode message for payload

        postMsg.clear();

 

        // set-up message

        postMsg.msgClass(MsgClasses.POST);

        postMsg.streamId(1);

        postMsg.domainType(DomainTypes.MARKET_PRICE);

        postMsg.containerType(DataTypes.MSG);

 

        // Note: post message key not required for on-stream post

        postMsg.applyPostComplete();

        postMsg.applyAck();

       

        postMsg.applyHasPostId();

        postMsg.postId(1);

 

        postMsg.applyHasMsgKey();

        postMsg.msgKey().applyHasServiceId();

        postMsg.msgKey().serviceId(267);

        postMsg.msgKey().applyHasName();

        postMsg.msgKey().name().data("ATS_INSERT_S");

        

        

        // populate post user info        

        postMsg.postUserInfo().userAddr("10.42.61.200");

        postMsg.postUserInfo().userId(18);       

       

 

        // encode post message

        encIter.clear();

        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        ret = postMsg.encodeInit(encIter, 0);

        if (ret != CodecReturnCodes.ENCODE_CONTAINER)

        {

            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        // get a buffer for nested market price refresh

        postNestedMsgPayLoad = CodecFactory.createBuffer();

        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

 

        // Although we are encoding RWF message, this code

        // encodes nested message into a separate buffer.

        // this is because MarketPrice.encode message is shared by all

        // applications, and it expects to encode the message into a stand alone

        // buffer.

        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        postMsgEncIter.clear();

        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

       ret = encodeCreateRIC(postMsgEncIter);

       if (ret != CodecReturnCodes.SUCCESS) {

      System.out.println("ATS_INSERT_S failed:  <" + CodecReturnCodes.toString(ret));

      return CodecReturnCodes.FAILURE;

      }

shouldOffstreamPost = false; //set flag to send post message once

        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        // complete encode message

        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeMsgComplete() failed with return code: " + ret);

            return ret;

        }

 

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId());

 

        return CodecReturnCodes.SUCCESS;

    }

    // To create a payload of the post message 

    private int encodeCreateRIC(EncodeIterator encodeIter) {

    fieldList.clear();

        fieldEntry.clear();

 

        tempUInt.clear();

        tempReal.clear();

 

        // set-up message

        Msg msg = encodeRefreshMsg();

 

        // encode message

        int ret = msg.encodeInit(encodeIter, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // encode field list

        fieldList.applyHasStandardData();

        ret = fieldList.encodeInit(encodeIter, null, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        // X_RIC_NAME

        fieldEntry.clear();

        fieldEntry.fieldId(-1);

        fieldEntry.dataType(DataTypes.ASCII_STRING);

        Buffer aRIC = CodecFactory.createBuffer();

        aRIC.data("NEW.RIC");

        ret = fieldEntry.encode(encodeIter, aRIC);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        

        // BID

        fieldEntry.clear();

        DictionaryEntry dictionaryEntry = dictionary.entry(22);

        if (dictionaryEntry != null)

        {

            fieldEntry.fieldId(22);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(12, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                return ret;

            }

        }

        // ASK

        fieldEntry.clear();

        dictionaryEntry = dictionary.entry(25);

        if (dictionaryEntry != null)

        {

            fieldEntry.fieldId(25);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(15, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                return ret;

            }

        }

 

        // complete encode field list

        ret = fieldList.encodeComplete(encodeIter, true);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // complete encode message

        return msg.encodeComplete(encodeIter, true);

    }

   // To create a refresh message which is in the payload of the post message

    public Msg encodeRefreshMsg()

    {

        refreshMsg.clear();

        refreshMsg.msgClass(MsgClasses.REFRESH);

        refreshMsg.streamId(0);

        refreshMsg.domainType(DomainTypes.MARKET_PRICE);

        refreshMsg.containerType(DataTypes.FIELD_LIST);

refreshMsg.state().streamState(StreamStates.OPEN);

refreshMsg.state().dataState(DataStates.OK);

refreshMsg.state().code(StateCodes.NONE);

refreshMsg.state().text().data("");

 

        return refreshMsg;

    }

  • ETA C: 
    	
            

 /* rsslPostHandler.c in Consumer folder */

static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)

{

RsslRet ret = 0;

RsslPostMsg postMsg = RSSL_INIT_POST_MSG;

RsslDataDictionary* dictionary = getDictionary();

RsslBool isSolicited = RSSL_FALSE; // for post

RsslUInt16 serviceId = (RsslUInt16)getServiceId();

RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;

RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;

RsslBuffer hostName = RSSL_INIT_BUFFER;

char hostNameBuf[] = "10.42.61.200";

 

/* set-up message */

postMsg.msgBase.msgClass = RSSL_MC_POST;

postMsg.msgBase.streamId = 1;

postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;

postMsg.msgBase.containerType = RSSL_DT_MSG;

 

// Note: post message key not required for on-stream post

postMsg.flags = RSSL_PSMF_POST_COMPLETE

| RSSL_PSMF_ACK // request ACK

| RSSL_PSMF_HAS_POST_ID

| RSSL_PSMF_HAS_MSG_KEY;

 

postMsg.postId = 1

 

/* populate post user info */

hostName.data = hostNameBuf;

hostName.length = (RsslUInt32)strlen(hostName.data);

if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)

{

printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",

rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));

return ret;

}

postMsg.postUserInfo.postUserId = 18;

 

postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

 

postMsg.msgBase.msgKey.name.data = "ATS_INSERT_S";

postMsg.msgBase.msgKey.name.length = strlen("ATS_INSERT_S");

 

postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

 

// encode message 

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

 

if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

ret = encodeCreateRIC(chnl, &payloadMsgBuf, dictionary);

 

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: encodeCreateRIC() failed\n");

return RSSL_RET_FAILURE;

}

 

shouldOffstreamPost = FALSE; //set flag to send a post message once

 

ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

 

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

printf("\n\nSENDING POST WITH MESSAGE:\n");

printf(" streamId = %d\n", postMsg.msgBase.streamId);

printf(" postId   = %d\n", postMsg.postId);

return RSSL_RET_SUCCESS;

 

}

RsslRet encodeCreateRIC(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)

{

RsslRet ret = 0;

RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;

RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;

RsslMsgBase* msgBase;

RsslMsg* msg;

RsslFieldList fList = RSSL_INIT_FIELD_LIST;

RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;

char stateText[MAX_ITEM_INFO_STRLEN];

char errTxt[256];

RsslBuffer errorText = { 255, (char*)errTxt };

RsslBuffer tempBuffer;

RsslReal tempReal = RSSL_INIT_REAL;

RsslDictionaryEntry* dictionaryEntry = NULL;

RsslEncodeIterator encodeIter;

 

double price;

 

/* clear encode iterator */

rsslClearEncodeIterator(&encodeIter);

 

/* set-up message */

/* set message depending on whether refresh or update */

 

msgBase = &refreshMsg.msgBase;

msgBase->msgClass = RSSL_MC_REFRESH;

 

refreshMsg.state.streamState = RSSL_STREAM_OPEN;

refreshMsg.state.dataState = RSSL_DATA_OK;

refreshMsg.state.code = RSSL_SC_NONE;

 

snprintf(stateText, 128, "Item Refresh Completed");

refreshMsg.state.text.data = stateText;

refreshMsg.state.text.length = (RsslUInt32)strlen(stateText);

 

msg = (RsslMsg *)&refreshMsg;

 

msgBase->domainType = RSSL_DMT_MARKET_PRICE;

msgBase->containerType = RSSL_DT_FIELD_LIST;

 

/* encode message */

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode field list */

fList.flags = RSSL_FLF_HAS_STANDARD_DATA;

if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode fields */

/* X_RIC_NAME */

rsslClearFieldEntry(&fEntry);

 

fEntry.fieldId = -1;

fEntry.dataType = RSSL_DT_ASCII_STRING;

 

tempBuffer.data = "NEW.RIC";

tempBuffer.length = strlen("NEW.RIC");

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[22];

if (dictionaryEntry)

{

fEntry.fieldId = 22;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 12;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[25];

if (dictionaryEntry)

{

fEntry.fieldId = 25;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 15;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

/* complete encode field list */

if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);

return ret;

}

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

return RSSL_RET_SUCCESS;

}

  • WebSocket API in Ruby:
    	
            

# market_price_posting.rb in ruby folder

if message_type == 'Refresh' then

message_domain = message_json['Domain']

if message_domain != nil then

if message_domain == 'Login' then

mp_post_json_hash = {

'ID' => 1,

'Type' => 'Post',

'Domain' => 'MarketPrice',

'Ack' => true,

'PostID' => 1,

'PostUserInfo' =>  {

'Address' => '10.42.61.200', 

'UserID' => 18

},

'Key' => {

'Name' => 'ATS_INSERT_S', 

'Service' => 267 

},

'Message' => {

'ID' => 0,

'Type' => 'Refresh',

'Domain' => 'MarketPrice',

'Fields' => {'X_RIC_NAME' => 'NEW.RIC' ,'BID' => 120, 'ASK' => 150}

}

}

ws.send mp_post_json_hash.to_json.to_s

end

end

end

Adding Fields

You can use ATS command, ATS_ADDFIELD_S, to add fields by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will add field id 12 and 13 with the value of 220 and 30 respectively to a RIC named NEW.RIC:

    	
            

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="2" postUserId="18" postUserAddr="10.42.61.200" dataSize="34">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_ADDFIELD_S"/>

     <dataBody>

          <UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="23">

       <dataBody>

            <fieldList flags="0x08 (HAS_STANDARD_DATA)">

         <fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>

<fieldEntry fieldId="12" data="0F16"/>

<fieldEntry fieldId="13" data="0F03"/>

    </fieldList>

       </dataBody>

  </UPDATE>

     </dataBody>

</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be ATS_ADDFIELD_S to inform ATS to add the fields.
  • The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.
  • The field list consists of:
    • The Field Id -1 for the RIC/record name which the fields are added.
    • The Fields i.e. field id 12(HIGH_1) and field id 13(LOW_1) with their initial values to be added to this RIC.
    • Data values shown in the fields list are encoded OMM.

An example of success Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="2" text="[4]: Creation Accepted" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_ADDFIELD_S"/>

     <dataBody>

     </dataBody>

</ACK>

Notice that:

  • The Ack message's domain type is Market Price. The ackId is 2 which corresponds with the postId(2) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can add the fields and their initial values to the RIC successfully.

The example of each Real-Time API snipped source code to create the post message above for adding the fields are below:

  • EMA Java:
    	
            

   //Consumer.java in ex341_MP_OffStreamPost  folder

   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

   {

...

if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 

refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&

refreshMsg.state().dataState() == OmmState.DataState.OK )

{

PostMsg postMsg = EmaFactory.createPostMsg();

UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();

FieldList nestedFieldList = EmaFactory.createFieldList();

 

//FieldList is a collection in java

nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));

 

nestedFieldList.add(EmaFactory.createFieldEntry().real(12, 22, OmmReal.MagnitudeType.EXPONENT_POS_1));

nestedFieldList.add(EmaFactory.createFieldEntry().real(13, 3, OmmReal.MagnitudeType.EXPONENT_POS_1));

 

nestedUpdateMsg.payload(nestedFieldList );

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

((OmmConsumer)event.closure()).submit( postMsg.postId( 2 ).serviceId(267)

.name( "ATS_ADDFIELD_S" ).solicitAck( true ).complete(true).publisherId(18,170540488)

.payload(nestedUpdateMsg), event.handle() );

}

...

}

  • EMA C++:
    	
            

  /* Consumer.cpp in 341__MarketPrice__OffStreamPost */

   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )

   {

...

if ( refreshMsg.getDomainType() == MMT_LOGIN && 

refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&

refreshMsg.getState().getDataState() == OmmState::OkEnum )

{

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

_pOmmConsumer->submit(PostMsg().postId(2).serviceId( 267 ).name("ATS_ADDFIELD_S").solicitAck(true).publisherId(18,170540488)

.complete().payload(UpdateMsg().payload(FieldList().addAscii(-1, "NEW.RIC").addReal(12, 22, OmmReal::ExponentPos1Enum).addReal(13, 3, OmmReal::ExponentPos1Enum).complete())), ommEvent.getHandle());

}

...

   }

  • ETA Java:
    	
            

//PostHandler.java in Consumer folder

    //To declare variables used to create a post message for adding some fields of a RIC on ATS

    private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();

    protected FieldList fieldList = CodecFactory.createFieldList();

    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();

    protected UInt tempUInt = CodecFactory.createUInt();

...

//To create a postMessage for adding fields on ATS

    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.

    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)

    {

        // First encode message for payload

        postMsg.clear();

 

        // set-up message

        postMsg.msgClass(MsgClasses.POST);

        postMsg.streamId(1);

        postMsg.domainType(DomainTypes.MARKET_PRICE);

        postMsg.containerType(DataTypes.MSG);

 

        // Note: post message key not required for on-stream post

        postMsg.applyPostComplete();

        postMsg.applyAck();

       

        postMsg.applyHasPostId();

        postMsg.postId(2);

 

        postMsg.applyHasMsgKey();

        postMsg.msgKey().applyHasServiceId();

        postMsg.msgKey().serviceId(267);

        postMsg.msgKey().applyHasName();

        postMsg.msgKey().name().data("ATS_ADDFIELD_S");

        

        

        // populate post user info        

        postMsg.postUserInfo().userAddr("10.42.61.200");

        postMsg.postUserInfo().userId(18);       

       

 

        // encode post message

        encIter.clear();

        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        ret = postMsg.encodeInit(encIter, 0);

        if (ret != CodecReturnCodes.ENCODE_CONTAINER)

        {

            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        // get a buffer for nested market price refresh

        postNestedMsgPayLoad = CodecFactory.createBuffer();

        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

 

        // Although we are encoding RWF message, this code

        // encodes nested message into a separate buffer.

        // this is because MarketPrice.encode message is shared by all

        // applications, and it expects to encode the message into a stand alone

        // buffer.

        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        postMsgEncIter.clear();

        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

    ret = encodeAddFields(postMsgEncIter);

  if (ret != CodecReturnCodes.SUCCESS) {

  System.out.println("ATS_ADDFIELD_S failed:  <" + CodecReturnCodes.toString(ret));

  return CodecReturnCodes.FAILURE;

  }

        shouldOffstreamPost = false; //set flag to send post message once

        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        // complete encode message

        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeMsgComplete() failed with return code: " + ret);

            return ret;

        }

 

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId());

 

        return CodecReturnCodes.SUCCESS;

    }

// To create a payload of the post message 

    private int encodeAddFields(EncodeIterator encodeIter) {

    fieldList.clear();

        fieldEntry.clear();

 

        tempUInt.clear();

        tempReal.clear();

 

        // set-up message

        Msg msg = encodeUpdateMsg();

 

        // encode message

        int ret = msg.encodeInit(encodeIter, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // encode field list

        fieldList.applyHasStandardData();

        ret = fieldList.encodeInit(encodeIter, null, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        // X_RIC_NAME

        fieldEntry.clear();

        fieldEntry.fieldId(-1);

        fieldEntry.dataType(DataTypes.ASCII_STRING);

        Buffer aRIC = CodecFactory.createBuffer();

        aRIC.data("NEW.RIC");

        ret = fieldEntry.encode(encodeIter, aRIC);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        

        

        fieldEntry.clear();

        DictionaryEntry dictionaryEntry = dictionary.entry(12);

        if (dictionaryEntry != null)

        {

            fieldEntry.fieldId(12);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(22, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                return ret;

            }

        }

        

        fieldEntry.clear();

        dictionaryEntry = dictionary.entry(13);

        if (dictionaryEntry != null)

        {

            fieldEntry.fieldId(13);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(3, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                return ret;

            }

        }

 

        // complete encode field list

        ret = fieldList.encodeComplete(encodeIter, true);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // complete encode message

        return msg.encodeComplete(encodeIter, true);

    }

    

    

// To create an update message which is in the payload of the post message

    public Msg encodeUpdateMsg()

    {

        updateMsg.clear();

        updateMsg.msgClass(MsgClasses.UPDATE);

        updateMsg.streamId(0);

        updateMsg.domainType(DomainTypes.MARKET_PRICE);

        updateMsg.containerType(DataTypes.FIELD_LIST);

        return updateMsg;

    }

  • ETA C:
    	
            

 /* rsslPostHandler.c in Consumer folder */

static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)

{

RsslRet ret = 0;

RsslPostMsg postMsg = RSSL_INIT_POST_MSG;

RsslDataDictionary* dictionary = getDictionary();

RsslBool isSolicited = RSSL_FALSE; // ??? for post

RsslUInt16 serviceId = (RsslUInt16)getServiceId();

RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;

RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;

RsslBuffer hostName = RSSL_INIT_BUFFER;

char hostNameBuf[] = "10.42.61.200";

 

/* set-up message */

postMsg.msgBase.msgClass = RSSL_MC_POST;

postMsg.msgBase.streamId = 1;

postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;

postMsg.msgBase.containerType = RSSL_DT_MSG;

 

// Note: post message key not required for on-stream post

postMsg.flags = RSSL_PSMF_POST_COMPLETE

| RSSL_PSMF_ACK // request ACK

| RSSL_PSMF_HAS_POST_ID

| RSSL_PSMF_HAS_MSG_KEY;

 

postMsg.postId = 2;

/* populate post user info */

hostName.data = hostNameBuf;

hostName.length = (RsslUInt32)strlen(hostName.data);

if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)

{

printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",

rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));

return ret;

}

postMsg.postUserInfo.postUserId = 18;

 

postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

 

postMsg.msgBase.msgKey.name.data = (char *)"ATS_ADDFIELD_S";

postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("ATS_ADDFIELD_S");

 

postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

 

// encode message 

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

 

if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

ret = encodeAddFields(chnl, &payloadMsgBuf, dictionary);

 

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: encodeAddFields() failed\n");

return RSSL_RET_FAILURE;

}

 

shouldOffstreamPost = FALSE; //set flag to send a post message once

 

ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

 

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

 

 

printf("\n\nSENDING POST WITH MESSAGE:\n");

printf(" streamId = %d\n", postMsg.msgBase.streamId);

printf(" postId   = %d\n", postMsg.postId);

 

return RSSL_RET_SUCCESS;

 

}

RsslRet encodeAddFields(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)

{

RsslRet ret = 0;

RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;

RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;

RsslMsgBase* msgBase;

RsslMsg* msg;

RsslFieldList fList = RSSL_INIT_FIELD_LIST;

RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;

char errTxt[256];

RsslBuffer errorText = { 255, (char*)errTxt };

RsslBuffer tempBuffer;

RsslReal tempReal = RSSL_INIT_REAL;

RsslDictionaryEntry* dictionaryEntry = NULL;

RsslEncodeIterator encodeIter;

 

double price;

 

/* clear encode iterator */

rsslClearEncodeIterator(&encodeIter);

 

/* set-up message */

/* set message depending on whether refresh or update */

 

msgBase = &updateMsg.msgBase;

msgBase->msgClass = RSSL_MC_UPDATE;

msg = (RsslMsg *)&updateMsg;

 

msgBase->domainType = RSSL_DMT_MARKET_PRICE;

msgBase->containerType = RSSL_DT_FIELD_LIST;

 

/* encode message */

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode field list */

fList.flags = RSSL_FLF_HAS_STANDARD_DATA;

if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode fields */

/* X_RIC_NAME */

rsslClearFieldEntry(&fEntry);

 

fEntry.fieldId = -1;

fEntry.dataType = RSSL_DT_ASCII_STRING;

 

tempBuffer.data = (char *)"NEW.RIC";

tempBuffer.length = (RsslUInt32)strlen("NEW.RIC");

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[12];

if (dictionaryEntry)

{

fEntry.fieldId = 12;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 22;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[13];

if (dictionaryEntry)

{

fEntry.fieldId = 13;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 3;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

/* complete encode field list */

if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);

return ret;

}

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

return RSSL_RET_SUCCESS;

}

  • WebSocket API in Ruby:
    	
            

# market_price_posting.rb in ruby folder

if message_type == 'Refresh' then

message_domain = message_json['Domain']

if message_domain != nil then

if message_domain == 'Login' then

mp_post_json_hash = {

'ID' => 1,

'Type' => 'Post',

'Domain' => 'MarketPrice',

'Ack' => true,

'PostID' => 2,

'PostUserInfo' =>  {

'Address' => '10.42.61.200',

'UserID' => 18

},

'Key' => {

'Name' => 'ATS_ADDFIELD_S', 

'Service' => 267 

},

'Message' => {

'ID' => 0,

'Type' => 'Update',

'Domain' => 'MarketPrice',

'Fields' => {'X_RIC_NAME' => 'NEW.RIC' ,'HIGH_1' => 220,'LOW_1' => 30 }

}

}

ws.send mp_post_json_hash.to_json.to_s

end

end

end

Please refer to my next article Implementing Refinitiv Real-Time API application to work with ATS - Part 2, for the Real-Time APIs application source code to update data, delete fields, and RICs. You can also find troubleshooting with the solutions for common problems that may occur when working with ATS.