Article

Implementing Refinitiv Real-Time API applications to work with ATS - Part 2

Pimchaya Wongrukun
Developer Advocate Developer Advocate
Wasin Waeosri
Developer Advocate Developer Advocate
Nipat Kunvutipongsak
Senior Software Engineer Senior Software Engineer
Veerapath Rungruengrayubkul
Developer Advocate Developer Advocate

About this article

Update: March 2021

This is the next part of Implementing Refinitiv Real-Time API application to work with ATS - Part 1 article. In this part I explain how to update data, delete fields and RIC with Real-Time APIs application source code. I also present troubleshooting which helps you to solve common problems which can occur when working with ATS. I strongly recommend you read Implementing Refinitiv Real-Time API application to work with ATS - Part 1 first. It gives you the basic knowledge which you need to know before working with ATS e.g. Posting and ATS concepts, Prerequisites, the overview steps . It also explains how to add RIC and fields before your application can update data, delete fields and RIC which are explained in this part 2.

Sample Posting with Updating data, Deleting Fields and RIC

Updating field values

You can update field values on ATS by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will update the value of field id 22 and 25 of a RIC named NEW.RIC with value 430 and 460 respectively:

    	
            

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="3" postUserId="18" postUserAddr="10.42.61.200" dataSize="24">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEW.RIC"/>

     <dataBody>

          <UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="13">

               <dataBody>

            <fieldList flags="0x08 (HAS_STANDARD_DATA)">

         <fieldEntry fieldId="22" data="0F2B"/>

<fieldEntry fieldId="25" data="0F2E"/>

    </fieldList>

       </dataBody>

  </UPDATE>

     </dataBody>

</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be the RIC name whose data is going to be updated.
  • The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.  
  • The field list are the updated fields i.e. field id 12(HIGH_1) and field id 13(LOW_1) with their updated values on this RIC.
  • Data values in the fields list are encoded OMM.

An example of success Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="3" text="[1]: Contribution Accepted" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEW.RIC"/>

     <dataBody>

     </dataBody>

</ACK>

Notice that:

  • The Ack message's domain type is Market Price. The ackId is 3 which corresponds with the postId(3) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can update the fields' data of the RIC successfully.

The example of each Real-Time APIs snipped source code to create the post message above for updating data are below:

  • EMA Java:
    	
            

 //Consumer.java in ex341_MP_OffStreamPost  folder

   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

   {

...

if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 

refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&

refreshMsg.state().dataState() == OmmState.DataState.OK )

{

PostMsg postMsg = EmaFactory.createPostMsg();

UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();

FieldList nestedFieldList = EmaFactory.createFieldList();

 

//FieldList is a collection in java

nestedFieldList.add(EmaFactory.createFieldEntry().real(22, 43, OmmReal.MagnitudeType.EXPONENT_POS_1));

nestedFieldList.add(EmaFactory.createFieldEntry().real(25, 46, OmmReal.MagnitudeType.EXPONENT_POS_1));

 

nestedUpdateMsg.payload(nestedFieldList );

 

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

((OmmConsumer)event.closure()).submit( postMsg.postId( 3 ).serviceId(267)

.name( "NEW.RIC" ).solicitAck( true ).complete(true).publisherId(18,170540488)

.payload(nestedUpdateMsg), event.handle() );

}

...

}

  • EMA C++:
    	
            

 /* Consumer.cpp in 341__MarketPrice__OffStreamPost */

   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )

   {

...

if ( refreshMsg.getDomainType() == MMT_LOGIN && 

refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&

refreshMsg.getState().getDataState() == OmmState::OkEnum )

{

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

_pOmmConsumer->submit( PostMsg().postId(3).serviceId( 267 ).name( "NEW.RIC").solicitAck(true).publisherId(18,170540488).complete()

.payload(UpdateMsg().payload( FieldList().addReal( 22, 43, OmmReal::ExponentPos1Enum ).addReal(25, 46, OmmReal::ExponentPos1Enum).complete() ) ), ommEvent.getHandle() );

}

...

   }

  • ETA Java:
    	
            

 //PostHandler.java in Consumer folder

   //To declare variables used to create a post message for updating data on ATS

    private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();

    protected FieldList fieldList = CodecFactory.createFieldList();

    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();

    protected UInt tempUInt = CodecFactory.createUInt();

...

//To create a postMessage for updating data on ATS

    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.

    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)

    {

        // First encode message for payload

        postMsg.clear();

 

        // set-up message

        postMsg.msgClass(MsgClasses.POST);

        postMsg.streamId(1);

        postMsg.domainType(DomainTypes.MARKET_PRICE);

        postMsg.containerType(DataTypes.MSG);

 

        // Note: post message key not required for on-stream post

        postMsg.applyPostComplete();

        postMsg.applyAck();

       

        postMsg.applyHasPostId();

        postMsg.postId(3);

 

        postMsg.applyHasMsgKey();

        postMsg.msgKey().applyHasServiceId();

        postMsg.msgKey().serviceId(267);

        postMsg.msgKey().applyHasName();

        postMsg.msgKey().name().data("NEW.RIC");

        

        

        // populate post user info        

        postMsg.postUserInfo().userAddr("10.42.61.200");

        postMsg.postUserInfo().userId(18);       

       

 

        // encode post message

        encIter.clear();

        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        ret = postMsg.encodeInit(encIter, 0);

        if (ret != CodecReturnCodes.ENCODE_CONTAINER)

        {

            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        // get a buffer for nested market price refresh

        postNestedMsgPayLoad = CodecFactory.createBuffer();

        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

 

        // Although we are encoding RWF message, this code

        // encodes nested message into a separate buffer.

        // this is because MarketPrice.encode message is shared by all

        // applications, and it expects to encode the message into a stand alone

        // buffer.

        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        postMsgEncIter.clear();

        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

    ret = encodeUpdateData(postMsgEncIter);

if (ret != CodecReturnCodes.SUCCESS) {

System.out.println("POST DATA TO ATS failed:  <" + CodecReturnCodes.toString(ret));

return CodecReturnCodes.FAILURE;

}

        shouldOffstreamPost = false; //set flag to send post message once

        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        // complete encode message

        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeMsgComplete() failed with return code: " + ret);

            return ret;

        }

 

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId() );

 

        return CodecReturnCodes.SUCCESS;

    }

// To create a payload of the post message 

    private int encodeUpdateData(EncodeIterator encodeIter) {

    fieldList.clear();

        fieldEntry.clear();

 

        tempUInt.clear();

        tempReal.clear();

 

        // set-up message

        Msg msg = encodeUpdateMsg();

 

        // encode message

        int ret = msg.encodeInit(encodeIter, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // encode field list

        fieldList.applyHasStandardData();

        ret = fieldList.encodeInit(encodeIter, null, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // encode fields common to refresh and updates

        DictionaryEntry dictionaryEntry = null;

        // BID

        fieldEntry.clear();

        dictionaryEntry = dictionary.entry(22);

        if (dictionaryEntry != null)

        {

        fieldEntry.fieldId(22);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(43, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                    return ret;

            }

         }

         // ASK

        fieldEntry.clear();

        dictionaryEntry = dictionary.entry(25);

        if (dictionaryEntry != null)

        {

        fieldEntry.fieldId(25);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(46, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                    return ret;

            }

         } 

        // complete encode field list

        ret = fieldList.encodeComplete(encodeIter, true);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // complete encode message

        return msg.encodeComplete(encodeIter, true);

    }

// To create an update message which is in the payload of the post message

    public Msg encodeUpdateMsg()

    {

        updateMsg.clear();

        updateMsg.msgClass(MsgClasses.UPDATE);

        updateMsg.streamId(0);

        updateMsg.domainType(DomainTypes.MARKET_PRICE);

        updateMsg.containerType(DataTypes.FIELD_LIST);

        return updateMsg;

    }

  • ETA C:
    	
            

 /* rsslPostHandler.c in Consumer folder */

static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)

{

RsslRet ret = 0;

RsslPostMsg postMsg = RSSL_INIT_POST_MSG;

RsslDataDictionary* dictionary = getDictionary();

RsslBool isSolicited = RSSL_FALSE; // ??? for post

RsslUInt16 serviceId = (RsslUInt16)getServiceId();

RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;

RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;

RsslBuffer hostName = RSSL_INIT_BUFFER;

char hostNameBuf[] = "10.42.61.200";

 

/* set-up message */

postMsg.msgBase.msgClass = RSSL_MC_POST;

postMsg.msgBase.streamId = 1;

postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;

postMsg.msgBase.containerType = RSSL_DT_MSG;

 

// Note: post message key not required for on-stream post

postMsg.flags = RSSL_PSMF_POST_COMPLETE

| RSSL_PSMF_ACK // request ACK

| RSSL_PSMF_HAS_POST_ID

| RSSL_PSMF_HAS_MSG_KEY;

 

postMsg.postId = 3;

/* populate post user info */

hostName.data = hostNameBuf;

hostName.length = (RsslUInt32)strlen(hostName.data);

if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)

{

printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",

rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));

return ret;

}

postMsg.postUserInfo.postUserId = 18;

 

postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

 

postMsg.msgBase.msgKey.name.data = (char *)"NEW.RIC";

postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("NEW.RIC");

 

postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

 

// encode message 

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

 

if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

ret = encodeUpdateData(chnl, &payloadMsgBuf, dictionary);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: encodeUpdateData() failed\n");

return RSSL_RET_FAILURE;

}

 

shouldOffstreamPost = FALSE; //set flag to send a post message once

 

ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

 

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

 

 

printf("\n\nSENDING POST WITH MESSAGE:\n");

printf(" streamId = %d\n", postMsg.msgBase.streamId);

printf(" postId   = %d\n", postMsg.postId);

return RSSL_RET_SUCCESS;

 

}

RsslRet encodeUpdateData(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)

{

RsslRet ret = 0;

RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;

RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;

RsslMsgBase* msgBase;

RsslMsg* msg;

RsslFieldList fList = RSSL_INIT_FIELD_LIST;

RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;

char errTxt[256];

RsslBuffer errorText = { 255, (char*)errTxt };

RsslBuffer tempBuffer;

RsslReal tempReal = RSSL_INIT_REAL;

RsslDictionaryEntry* dictionaryEntry = NULL;

RsslEncodeIterator encodeIter;

 

double price;

 

/* clear encode iterator */

rsslClearEncodeIterator(&encodeIter);

 

/* set-up message */

/* set message depending on whether refresh or update */

 

msgBase = &updateMsg.msgBase;

msgBase->msgClass = RSSL_MC_UPDATE;

 

msg = (RsslMsg *)&updateMsg;

 

msgBase->domainType = RSSL_DMT_MARKET_PRICE;

msgBase->containerType = RSSL_DT_FIELD_LIST;

 

/* encode message */

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode field list */

fList.flags = RSSL_FLF_HAS_STANDARD_DATA;

if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode fields */

 

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[22];

if (dictionaryEntry)

{

fEntry.fieldId = 22;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 43;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[25];

if (dictionaryEntry)

{

fEntry.fieldId = 25;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 46;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

/* complete encode field list */

if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);

return ret;

}

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

return RSSL_RET_SUCCESS;

}

  • WebSocket API in Ruby:
    	
            

# market_price_posting.rb in ruby folder

if message_type == 'Refresh' then

message_domain = message_json['Domain']

if message_domain != nil then

if message_domain == 'Login' then

mp_post_json_hash = {

'ID' => 1,

'Type' => 'Post',

'Domain' => 'MarketPrice',

'Ack' => true,

'PostID' => 3,

'PostUserInfo' =>  {

'Address' => '10.42.61.200', # Use the IP address as the Post User Address.

'UserID' => 18

},

'Key' => {

'Name' => 'NEW.RIC', # ATS server contribution RIC name

'Service' => 267 # ADS Service ID that connects to ATS server

},

'Message' => {

'ID' => 0,

'Type' => 'Update',

'Domain' => 'MarketPrice',

'Fields' => {'BID' => 430,'ASK' => 460 }

}

}

ws.send mp_post_json_hash.to_json.to_s

end

end

end

Deleting Fields

You can use ATS command, ATS_DELETE, to delete fields by sending a post message from a consumer application as shown in step 3 in the figure above. Here’s the sample of a post message that will delete field id 13 and 25 from a RIC named NEW.RIC:

    	
            

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="4" postUserId="18" postUserAddr="10.42.61.200" dataSize="34">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>

     <dataBody>

          <UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="23">

       <dataBody>

            <fieldList flags="0x08 (HAS_STANDARD_DATA)">

         <fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>

<fieldEntry fieldId="13" data="0F01"/>

<fieldEntry fieldId="25" data="0F02"/>

    </fieldList>

       </dataBody>

  </UPDATE>

     </dataBody>

</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be ATS_DELETE to inform ATS to add the fields.
  • The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.  
  • The field list consists of:
    • Field Id -1 for the RIC/record name whose the fields are deleted.
    • The fields i.e. field id 13(LOW_1) and field id 25(ASK) to be deleted from this RIC.
    • Data values shown in the fields list are encoded OMM.

An example of success Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="4" text="[3]: Delete Accepted" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>

     <dataBody>

     </dataBody>

</ACK>

Notice that:

  • The Ack message's domain type is Market Price. The ackId is 4 which corresponds with the postId(4) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can delete the fields from the RIC successfully.

The example of each Real-Time API snipped source code to create the post message above for deleting the fields are below:

  • EMA Java:
    	
            

//Consumer.java in ex341_MP_OffStreamPost  folder

   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

   {

...

if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 

refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&

refreshMsg.state().dataState() == OmmState.DataState.OK )

{

PostMsg postMsg = EmaFactory.createPostMsg();

UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();

FieldList nestedFieldList = EmaFactory.createFieldList();

 

//FieldList is a collection in java

nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));

 

nestedFieldList.add(EmaFactory.createFieldEntry().real(13, 1, OmmReal.MagnitudeType.EXPONENT_POS_1));

nestedFieldList.add(EmaFactory.createFieldEntry().real(25, 2, OmmReal.MagnitudeType.EXPONENT_POS_1));

 

nestedUpdateMsg.payload(nestedFieldList );

 

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

((OmmConsumer)event.closure()).submit( postMsg.postId( 4 ).serviceId(267)

.name( "ATS_DELETE" ).solicitAck( true ).complete(true).publisherId(18,170540488)

.payload(nestedUpdateMsg), event.handle() );

 

}

...

}

  • EMA C++:
    	
            

/* Consumer.cpp in 341__MarketPrice__OffStreamPost */

   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )

   {

...

if ( refreshMsg.getDomainType() == MMT_LOGIN && 

refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&

refreshMsg.getState().getDataState() == OmmState::OkEnum )

{

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

_pOmmConsumer->submit( PostMsg().postId(4).serviceId( 267 ).name( "ATS_DELETE" ).solicitAck( true ).publisherId(18,170540488).complete()

.payload(UpdateMsg().payload( FieldList().addAscii(-1, "NEW.RIC").addReal( 13, 1, OmmReal::ExponentPos1Enum ).addReal(25, 2, OmmReal::ExponentPos1Enum).complete() )), ommEvent.getHandle() );

}

...

   }

  • ETA Java:
    	
            

//PostHandler.java in Consumer folder

    //To declare variables used to create a post message for deleting some fields of a RIC on ATS

    private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();

    protected FieldList fieldList = CodecFactory.createFieldList();

    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();

    protected UInt tempUInt = CodecFactory.createUInt();

   //To create a postMessage for deleting fields on ATS

    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.

    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)

    {

   

        // First encode message for payload

        postMsg.clear();

 

        // set-up message

        postMsg.msgClass(MsgClasses.POST);

        postMsg.streamId(1);

        postMsg.domainType(DomainTypes.MARKET_PRICE);

        postMsg.containerType(DataTypes.MSG);

 

        // Note: post message key not required for on-stream post

        postMsg.applyPostComplete();

        postMsg.applyAck();

       

        postMsg.applyHasPostId();

        postMsg.postId(4);

 

        postMsg.applyHasMsgKey();

        postMsg.msgKey().applyHasServiceId();

        postMsg.msgKey().serviceId(267);

        postMsg.msgKey().applyHasName();

        postMsg.msgKey().name().data("ATS_DELETE");

        

        

        // populate post user info        

        postMsg.postUserInfo().userAddr("10.42.61.200");

        postMsg.postUserInfo().userId(18);       

       

 

        // encode post message

        encIter.clear();

        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        ret = postMsg.encodeInit(encIter, 0);

        if (ret != CodecReturnCodes.ENCODE_CONTAINER)

        {

            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        // get a buffer for nested market price refresh

        postNestedMsgPayLoad = CodecFactory.createBuffer();

        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

 

        // Although we are encoding RWF message, this code

        // encodes nested message into a separate buffer.

        // this is because MarketPrice.encode message is shared by all

        // applications, and it expects to encode the message into a stand alone

        // buffer.

        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        postMsgEncIter.clear();

        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

    ret = encodeDeleteFields(postMsgEncIter);

  if (ret != CodecReturnCodes.SUCCESS) {

  System.out.println("ATS_ADDFIELD_S failed:  <" + CodecReturnCodes.toString(ret));

  return CodecReturnCodes.FAILURE;

  }

  shouldOffstreamPost = false; //set flag to send post message once

        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        // complete encode message

        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeMsgComplete() failed with return code: " + ret);

            return ret;

        }

 

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId() );

 

        return CodecReturnCodes.SUCCESS;

    }

// To create a payload of the post message 

    private int encodeDeleteFields(EncodeIterator encodeIter) {

    fieldList.clear();

        fieldEntry.clear();

 

        tempUInt.clear();

        tempReal.clear();

 

        // set-up message

        Msg msg = encodeUpdateMsg();

 

        // encode message

        int ret = msg.encodeInit(encodeIter, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // encode field list

        fieldList.applyHasStandardData();

        ret = fieldList.encodeInit(encodeIter, null, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        // X_RIC_NAME

        fieldEntry.clear();

        fieldEntry.fieldId(-1);

        fieldEntry.dataType(DataTypes.ASCII_STRING);

        Buffer aRIC = CodecFactory.createBuffer();

        aRIC.data("NEW.RIC");

        ret = fieldEntry.encode(encodeIter, aRIC);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        

       

        fieldEntry.clear();

        DictionaryEntry dictionaryEntry = dictionary.entry(13);

        if (dictionaryEntry != null)

        {

            fieldEntry.fieldId(13);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(1, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                return ret;

            }

        }

 

        fieldEntry.clear();

        dictionaryEntry = dictionary.entry(25);

        if (dictionaryEntry != null)

        {

            fieldEntry.fieldId(25);

            fieldEntry.dataType(dictionaryEntry.rwfType());

            tempReal.clear();

            tempReal.value(2, RealHints.EXPONENT1);

            ret = fieldEntry.encode(encodeIter, tempReal);

            if (ret < CodecReturnCodes.SUCCESS)

            {

                return ret;

            }

        }

        // complete encode field list

        ret = fieldList.encodeComplete(encodeIter, true);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // complete encode message

        return msg.encodeComplete(encodeIter, true);

    }

    

// To create an update message which is in the payload of the post message

    public Msg encodeUpdateMsg()

    {

        updateMsg.clear();

        updateMsg.msgClass(MsgClasses.UPDATE);

        updateMsg.streamId(0);

        updateMsg.domainType(DomainTypes.MARKET_PRICE);

        updateMsg.containerType(DataTypes.FIELD_LIST);

        return updateMsg;

    }

    

  • ETA C:
    	
            

/* rsslPostHandler.c in Consumer folder */

static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)

{

RsslRet ret = 0;

RsslPostMsg postMsg = RSSL_INIT_POST_MSG;

RsslDataDictionary* dictionary = getDictionary();

RsslBool isSolicited = RSSL_FALSE; // ??? for post

RsslUInt16 serviceId = (RsslUInt16)getServiceId();

RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;

RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;

RsslBuffer hostName = RSSL_INIT_BUFFER;

char hostNameBuf[] = "10.42.61.200";

 

/* set-up message */

postMsg.msgBase.msgClass = RSSL_MC_POST;

postMsg.msgBase.streamId = 1;

postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;

postMsg.msgBase.containerType = RSSL_DT_MSG;

 

// Note: post message key not required for on-stream post

postMsg.flags = RSSL_PSMF_POST_COMPLETE

| RSSL_PSMF_ACK // request ACK

| RSSL_PSMF_HAS_POST_ID

| RSSL_PSMF_HAS_MSG_KEY;

 

postMsg.postId = 4;

 

/* populate post user info */

hostName.data = hostNameBuf;

hostName.length = (RsslUInt32)strlen(hostName.data);

if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)

{

printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",

rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));

return ret;

}

postMsg.postUserInfo.postUserId = 18;

 

postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

 

postMsg.msgBase.msgKey.name.data = (char *)"ATS_DELETE";

postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("ATS_DELETE");

 

postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

 

// encode message 

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

 

if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

ret = encodeDeleteFields(chnl, &payloadMsgBuf, dictionary);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: encodeDeleteFields() failed\n");

return RSSL_RET_FAILURE;

}

 

shouldOffstreamPost = FALSE; //set flag to send a post message once

 

ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

 

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

 

 

printf("\n\nSENDING POST WITH MESSAGE:\n");

printf(" streamId = %d\n", postMsg.msgBase.streamId);

printf(" postId   = %d\n", postMsg.postId);

 

return RSSL_RET_SUCCESS;

 

}

RsslRet encodeDeleteFields(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)

{

RsslRet ret = 0;

RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;

RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;

RsslMsgBase* msgBase;

RsslMsg* msg;

RsslFieldList fList = RSSL_INIT_FIELD_LIST;

RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;

char errTxt[256];

RsslBuffer errorText = { 255, (char*)errTxt };

RsslBuffer tempBuffer;

RsslReal tempReal = RSSL_INIT_REAL;

RsslDictionaryEntry* dictionaryEntry = NULL;

RsslEncodeIterator encodeIter;

 

double price;

 

/* clear encode iterator */

rsslClearEncodeIterator(&encodeIter);

 

/* set-up message */

/* set message depending on whether refresh or update */

 

msgBase = &updateMsg.msgBase;

msgBase->msgClass = RSSL_MC_UPDATE;

 

msg = (RsslMsg *)&updateMsg;

 

msgBase->domainType = RSSL_DMT_MARKET_PRICE;

msgBase->containerType = RSSL_DT_FIELD_LIST;

 

/* encode message */

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode field list */

fList.flags = RSSL_FLF_HAS_STANDARD_DATA;

if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode fields */

/* X_RIC_NAME */

rsslClearFieldEntry(&fEntry);

 

fEntry.fieldId = -1;

fEntry.dataType = RSSL_DT_ASCII_STRING;

 

tempBuffer.data = (char *)"NEW.RIC";

tempBuffer.length = (RsslUInt32)strlen("NEW.RIC");

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[13];

if (dictionaryEntry)

{

fEntry.fieldId = 13;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 1;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

rsslClearFieldEntry(&fEntry);

dictionaryEntry = dictionary->entriesArray[25];

if (dictionaryEntry)

{

fEntry.fieldId = 25;

fEntry.dataType = dictionaryEntry->rwfType;

rsslClearReal(&tempReal);

price = 2;

rsslDoubleToReal(&tempReal, &price, RSSL_RH_EXPONENT_1);

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempReal)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

}

 

/* complete encode field list */

if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);

return ret;

}

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

return RSSL_RET_SUCCESS;

}

  • WebSocket API in Ruby:
    	
            

# market_price_posting.rb in ruby folder

if message_type == 'Refresh' then

message_domain = message_json['Domain']

if message_domain != nil then

if message_domain == 'Login' then

mp_post_json_hash = {

'ID' => 1,

'Type' => 'Post',

'Domain' => 'MarketPrice',

'Ack' => true,

'PostID' => 4,

'PostUserInfo' =>  {

'Address' => '10.42.61.200',

'UserID' => 18

},

'Key' => {

'Name' => 'ATS_DELETE', 

'Service' => 267

},

'Message' => {

'ID' => 0,

'Type' => 'Update',

'Domain' => 'MarketPrice',

'Fields' => {'X_RIC_NAME' => 'NEW.RIC' ,'LOW_1' => 10 ,'ASK' => 20 }

}

}

ws.send mp_post_json_hash.to_json.to_s

end

end

end

Deleting a RIC

You can use ATS command, ATS_DELETE_ALL, to delete a RIC/record by sending a post message from a consumer application as shown in step 3 in the figure above. Here's the sample of a post message that will delete a RIC named NEW.RIC:

    	
            

<POST domainType="MARKET_PRICE" streamId="1" containerType="MSG" flags="0x66 (HAS_POST_ID|HAS_MSG_KEY|POST_COMPLETE|ACK)" postId="5" postUserId="18" postUserAddr="10.42.61.200" dataSize="24">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE_ALL"/>

     <dataBody>

          <UPDATE domainType="MARKET_PRICE" streamId="0" containerType="FIELD_LIST" flags="0x00" updateType="0" dataSize="13">

       <dataBody>

            <fieldList flags="0x08 (HAS_STANDARD_DATA)">

         <fieldEntry fieldId="-1" data="4E45 572E 5249 43"/>

    </fieldList>

       </dataBody>

  </UPDATE>

     </dataBody>

</POST>

Notice that:

  • The post message's domain type is Market Price. The streamId is 1 means the post message is sent via the login stream, off-stream posting. The message contains the postId and the flag ACK(to need an ack message) is set. It also contains Visible Publisher Identifier(VPI). VPI consists of postUserId and postUserAddr.
  • The key name of the post message must be ATS_DELETE_ALL to inform ATS to delete a RIC/Record.
  • The payload of the post message is an Update of Market Price message. The payload of the Update message is a field list.
  • The field list consists of the field Id -1 for the RIC/record name to delete from ATS.
  • Data values shown in the fields list are encoded OMM.

An example of success Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x12 (HAS_TEXT|HAS_MSG_KEY)" ackId="5" text="[3]: Delete Accepted" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE_ALL"/>

     <dataBody>

     </dataBody>

</ACK>

Notice that:

  • The Ack message's domain type is Market Price. The ackId is 5 which corresponds with the postId(5) of the post message. Hence, this is the result of the post message above.
  • There is no NAK(Negative Acknowledge) code so ATS can perform the operation according to the post message successfully. That's mean ATS can delete the RIC/record from ATS successfully.

The example of each Real-Time API snipped source code to create the post message above for deleting a RIC are below:

  • EMA Java:
    	
            

//Consumer.java in ex341_MP_OffStreamPost  folder

   public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)

   {

...

if ( refreshMsg.domainType() == EmaRdm.MMT_LOGIN && 

refreshMsg.state().streamState() == OmmState.StreamState.OPEN &&

refreshMsg.state().dataState() == OmmState.DataState.OK )

{

PostMsg postMsg = EmaFactory.createPostMsg();

UpdateMsg nestedUpdateMsg = EmaFactory.createUpdateMsg();

FieldList nestedFieldList = EmaFactory.createFieldList();

 

//FieldList is a collection in java

nestedFieldList.add(EmaFactory.createFieldEntry().ascii(-1, "NEW.RIC"));

 

nestedUpdateMsg.payload(nestedFieldList );

 

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

((OmmConsumer)event.closure()).submit( postMsg.postId( 5 ).serviceId(267)

.name( "ATS_DELETE_ALL" ).solicitAck( true ).complete(true).publisherId(18,170540488)

.payload(nestedUpdateMsg), event.handle() );

}

...

}

  • EMA C++:
    	
            

 /* Consumer.cpp in 341__MarketPrice__OffStreamPost */

   void AppClient::onRefreshMsg( const RefreshMsg& refreshMsg, const OmmConsumerEvent& ommEvent )

   {

...

if ( refreshMsg.getDomainType() == MMT_LOGIN && 

refreshMsg.getState().getStreamState() == OmmState::OpenEnum &&

refreshMsg.getState().getDataState() == OmmState::OkEnum )

{

 

// The Post User address 170540488 (long) is converted from IP address 10.42.61.200

_pOmmConsumer->submit(PostMsg().postId(5).serviceId( 267 ).name("ATS_DELETE_ALL").solicitAck(true).publisherId(18,170540488).complete()

.payload(UpdateMsg().payload(FieldList().addAscii(-1, "NEW.RIC").complete())), ommEvent.getHandle());

}

...

   }

  • ETA Java:
    	
            

//PostHandler.java in Consumer folder

    //To declare variables used to create a post message for deleting a RIC on ATS

    private UpdateMsg updateMsg = (UpdateMsg)CodecFactory.createMsg();

    protected FieldList fieldList = CodecFactory.createFieldList();

    protected FieldEntry fieldEntry = CodecFactory.createFieldEntry();

    protected UInt tempUInt = CodecFactory.createUInt();

    

//To create a postMessage for deleting a RIC on ATS

    //Next, the method is called by sendOffstreamPostMsg(..) which sends an off-stream post message in Consumer ETA Example application.

    private int encodePostWithMsg(ChannelSession chnlSession, TransportBuffer msgBuf)

    {

   

        // First encode message for payload

        postMsg.clear();

 

        // set-up message

        postMsg.msgClass(MsgClasses.POST);

        postMsg.streamId(1);

        postMsg.domainType(DomainTypes.MARKET_PRICE);

        postMsg.containerType(DataTypes.MSG);

 

        // Note: post message key not required for on-stream post

        postMsg.applyPostComplete();

        postMsg.applyAck();

       

        postMsg.applyHasPostId();

        postMsg.postId(5);

 

        postMsg.applyHasMsgKey();

        postMsg.msgKey().applyHasServiceId();

        postMsg.msgKey().serviceId(267);

        postMsg.msgKey().applyHasName();

        postMsg.msgKey().name().data("ATS_DELETE_ALL");

        

        

        // populate post user info        

        postMsg.postUserInfo().userAddr("10.42.61.200");

        postMsg.postUserInfo().userId(18);       

       

 

        // encode post message

        encIter.clear();

        int ret = encIter.setBufferAndRWFVersion(msgBuf, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("Encoder.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        ret = postMsg.encodeInit(encIter, 0);

        if (ret != CodecReturnCodes.ENCODE_CONTAINER)

        {

            System.out.println("EncodeMsgInit() failed:  <" + CodecReturnCodes.toString(ret) + ">");

            return ret;

        }

 

        // get a buffer for nested market price refresh

        postNestedMsgPayLoad = CodecFactory.createBuffer();

        postNestedMsgPayLoad.data(ByteBuffer.allocate(1024));

 

        // Although we are encoding RWF message, this code

        // encodes nested message into a separate buffer.

        // this is because MarketPrice.encode message is shared by all

        // applications, and it expects to encode the message into a stand alone

        // buffer.

        ret = encIter.encodeNonRWFInit(postNestedMsgPayLoad);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeInit() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        postMsgEncIter.clear();

        ret = postMsgEncIter.setBufferAndRWFVersion(postNestedMsgPayLoad, chnlSession.channel().majorVersion(), chnlSession.channel().minorVersion());

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeIter.setBufferAndRWFVersion() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

    ret = encodeDeleteRIC(postMsgEncIter);

  if (ret != CodecReturnCodes.SUCCESS) {

  System.out.println("ATS_ADDFIELD_S failed:  <" + CodecReturnCodes.toString(ret));

  return CodecReturnCodes.FAILURE;

  }

  shouldOffstreamPost = false; //set flag to send post message once

        ret = encIter.encodeNonRWFComplete(postNestedMsgPayLoad, true);

        if (ret != CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeNonRWFDataTypeComplete() failed:  <" + CodecReturnCodes.toString(ret));

            return CodecReturnCodes.FAILURE;

        }

 

        // complete encode message

        if ((ret = postMsg.encodeComplete(encIter, true)) < CodecReturnCodes.SUCCESS)

        {

            System.out.println("EncodeMsgComplete() failed with return code: " + ret);

            return ret;

        }

 

        System.out.println("\n\nSENDING POST WITH MESSAGE:\n" + "  streamId = " + postMsg.streamId() + "\n  postId   = " + postMsg.postId());

 

        return CodecReturnCodes.SUCCESS;

    }

// To create a payload of the post message 

    private int encodeDeleteRIC(EncodeIterator encodeIter) 

    {

    fieldList.clear();

        fieldEntry.clear();

 

        tempUInt.clear();

        tempReal.clear();

 

        // set-up message

        Msg msg = encodeUpdateMsg();

 

        // encode message

        int ret = msg.encodeInit(encodeIter, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // encode field list

        fieldList.applyHasStandardData();

        ret = fieldList.encodeInit(encodeIter, null, 0);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        // X_RIC_NAME

        fieldEntry.clear();

        fieldEntry.fieldId(-1);

        fieldEntry.dataType(DataTypes.ASCII_STRING);

        Buffer aRIC = CodecFactory.createBuffer();

        aRIC.data("NEW.RIC");

        ret = fieldEntry.encode(encodeIter, aRIC);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

        

        // complete encode field list

        ret = fieldList.encodeComplete(encodeIter, true);

        if (ret < CodecReturnCodes.SUCCESS)

        {

            return ret;

        }

 

        // complete encode message

        return msg.encodeComplete(encodeIter, true);

    }

// To create an update message which is in the payload of the post message

    public Msg encodeUpdateMsg()

    {

        updateMsg.clear();

        updateMsg.msgClass(MsgClasses.UPDATE);

        updateMsg.streamId(0);

        updateMsg.domainType(DomainTypes.MARKET_PRICE);

        updateMsg.containerType(DataTypes.FIELD_LIST);

        return updateMsg;

    }

  • ETA C:
    	
            

/* rsslPostHandler.c in Consumer folder */

static RsslRet encodePostWithMsg(RsslChannel* chnl, RsslBuffer* msgBuf)

{

RsslRet ret = 0;

RsslPostMsg postMsg = RSSL_INIT_POST_MSG;

RsslDataDictionary* dictionary = getDictionary();

RsslBool isSolicited = RSSL_FALSE; // ??? for post

RsslUInt16 serviceId = (RsslUInt16)getServiceId();

RsslBuffer payloadMsgBuf = RSSL_INIT_BUFFER;

RsslEncodeIterator encodeIter = RSSL_INIT_ENCODE_ITERATOR;

RsslBuffer hostName = RSSL_INIT_BUFFER;

char hostNameBuf[] = "10.42.61.200";

 

/* set-up message */

postMsg.msgBase.msgClass = RSSL_MC_POST;

postMsg.msgBase.streamId = 1;

postMsg.msgBase.domainType = RSSL_DMT_MARKET_PRICE;

postMsg.msgBase.containerType = RSSL_DT_MSG;

 

// Note: post message key not required for on-stream post

postMsg.flags = RSSL_PSMF_POST_COMPLETE

| RSSL_PSMF_ACK // request ACK

| RSSL_PSMF_HAS_POST_ID

| RSSL_PSMF_HAS_MSG_KEY;

 

postMsg.postId = 5;

/* populate post user info */

hostName.data = hostNameBuf;

hostName.length = (RsslUInt32)strlen(hostName.data);

if ((ret = rsslHostByName(&hostName, &postMsg.postUserInfo.postUserAddr)) < RSSL_RET_SUCCESS)

{

printf("Populating postUserInfo failed. Error %s (%d) with rsslHostByName: %s\n",

rsslRetCodeToString(ret), ret, rsslRetCodeInfo(ret));

return ret;

}

postMsg.postUserInfo.postUserId = 18;

 

postMsg.msgBase.msgKey.flags = RSSL_MKF_HAS_NAME | RSSL_MKF_HAS_SERVICE_ID;

 

postMsg.msgBase.msgKey.name.data = (char *)"ATS_DELETE_ALL";

postMsg.msgBase.msgKey.name.length = (RsslUInt32)strlen("ATS_DELETE_ALL");

postMsg.msgBase.msgKey.serviceId = (RsslUInt16)267;

 

// encode message 

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

 

if ((ret = rsslEncodeMsgInit(&encodeIter, (RsslMsg*)&postMsg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

ret = rsslEncodeNonRWFDataTypeInit(&encodeIter, &payloadMsgBuf);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

ret = encodeDeleteRIC(chnl, &payloadMsgBuf, dictionary);

 

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: encodeDeleteRIC() failed\n");

return RSSL_RET_FAILURE;

}

 

shouldOffstreamPost = FALSE; //set flag to send a post message once

 

ret = rsslEncodeNonRWFDataTypeComplete(&encodeIter, &payloadMsgBuf, RSSL_TRUE);

if (ret != RSSL_RET_SUCCESS)

{

printf("encodePostWithMsg: rsslEncodeNonRWFDataTypeInit() failed\n");

return RSSL_RET_FAILURE;

}

 

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

 

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

 

 

printf("\n\nSENDING POST WITH MESSAGE:\n");

printf(" streamId = %d\n", postMsg.msgBase.streamId);

printf(" postId   = %d\n", postMsg.postId);

 

return RSSL_RET_SUCCESS;

 

}

RsslRet encodeDeleteRIC(RsslChannel* chnl, RsslBuffer* msgBuf, RsslDataDictionary* dictionary)

{

RsslRet ret = 0;

RsslRefreshMsg refreshMsg = RSSL_INIT_REFRESH_MSG;

RsslUpdateMsg updateMsg = RSSL_INIT_UPDATE_MSG;

RsslMsgBase* msgBase;

RsslMsg* msg;

RsslFieldList fList = RSSL_INIT_FIELD_LIST;

RsslFieldEntry fEntry = RSSL_INIT_FIELD_ENTRY;

char errTxt[256];

RsslBuffer errorText = { 255, (char*)errTxt };

RsslBuffer tempBuffer;

RsslReal tempReal = RSSL_INIT_REAL;

RsslDictionaryEntry* dictionaryEntry = NULL;

RsslEncodeIterator encodeIter;

 

/* clear encode iterator */

rsslClearEncodeIterator(&encodeIter);

 

/* set-up message */

/* set message depending on whether refresh or update */

 

msgBase = &updateMsg.msgBase;

msgBase->msgClass = RSSL_MC_UPDATE;

 

msg = (RsslMsg *)&updateMsg;

 

msgBase->domainType = RSSL_DMT_MARKET_PRICE;

msgBase->containerType = RSSL_DT_FIELD_LIST;

 

/* encode message */

if ((ret = rsslSetEncodeIteratorBuffer(&encodeIter, msgBuf)) < RSSL_RET_SUCCESS)

{

printf("rsslSetEncodeIteratorBuffer() failed with return code: %d\n", ret);

return ret;

}

rsslSetEncodeIteratorRWFVersion(&encodeIter, chnl->majorVersion, chnl->minorVersion);

if ((ret = rsslEncodeMsgInit(&encodeIter, msg, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode field list */

fList.flags = RSSL_FLF_HAS_STANDARD_DATA;

if ((ret = rsslEncodeFieldListInit(&encodeIter, &fList, 0, 0)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListInit() failed with return code: %d\n", ret);

return ret;

}

 

/* encode fields */

/* X_RIC_NAME */

rsslClearFieldEntry(&fEntry);

 

fEntry.fieldId = -1;

fEntry.dataType = RSSL_DT_ASCII_STRING;

 

tempBuffer.data = (char *)"NEW.RIC";

tempBuffer.length = (RsslUInt32)strlen("NEW.RIC");

if ((ret = rsslEncodeFieldEntry(&encodeIter, &fEntry, (void*)&tempBuffer)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldEntry() failed with return code: %d\n", ret);

return ret;

}

 

/* complete encode field list */

if ((ret = rsslEncodeFieldListComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeFieldListComplete() failed with return code: %d\n", ret);

return ret;

}

 

/* complete encode message */

if ((ret = rsslEncodeMsgComplete(&encodeIter, RSSL_TRUE)) < RSSL_RET_SUCCESS)

{

printf("rsslEncodeMsgComplete() failed with return code: %d\n", ret);

return ret;

}

msgBuf->length = rsslGetEncodedBufferLength(&encodeIter);

 

return RSSL_RET_SUCCESS;

}

  • WebSocket API in Ruby:
    	
            

   # market_price_posting.rb in ruby folder

if message_type == 'Refresh' then

message_domain = message_json['Domain']

if message_domain != nil then

if message_domain == 'Login' then

mp_post_json_hash = {

'ID' => 1,

'Type' => 'Post',

'Domain' => 'MarketPrice',

'Ack' => true,

'PostID' => 5,

'PostUserInfo' =>  {

'Address' => '10.42.61.200', 

'UserID' => 18

},

'Key' => {

'Name' => 'ATS_DELETE_ALL', 

'Service' => 267

},

'Message' => {

'ID' => 0,

'Type' => 'Update',

'Domain' => 'MarketPrice',

'Fields' => {'X_RIC_NAME' => 'NEW.RIC'}

}

}

ws.send mp_post_json_hash.to_json.to_s

end

end

end

Troubleshooting

When an application fails to post a message to ATS; an Ack message received from the server contains a NAK code. Hence, you need to review the NAK Code and the text message which indicates what the error is and help you to find out the root cause of the problem.

This section lists the common errors which you can face when working with ATS including their solutions as shown below:

1) text="Unable to find service for post message.", NackCode: 2 or "DeniedBySrc" in WebSocketAPI.

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="DENIED_BY_SRC" text="Unable to find service for post message." dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="2670" name="ATS_INSERT_S"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: The specified service in the post message is incorrect; the server does not provide this service.
  • Solution: Please double check the service name to make sure the correct service name is used. You may want to contact your local MarketData support or administrator for ATS service name. If the infrastructure is hosted by Refinitiv, please contact the account team for the service name. Then, modify your source code to use the correct service in the post message.

2) text="A9: Service is unavailable.", NackCode: 3 or "SourceDown" in WebSocketAPI.

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="SOURCE_DOWN" text="A9: Service is unavailable." dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: The specified service in the post message is down.
  • Solution: Please contact your local MarketData support or administrator who can help you to recover the service. If the infrastructure is hosted by Refinitiv, please contact the account team.

3) text="[610]: Item Already Exists", NackCode: 2 or "DeniedBySrc" in WebSocketAPI.

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="DENIED_BY_SRC" text="[610]: Item Already Exists" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_S"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: An application tries to insert a RIC/record which exists on ATS already.
  • Solution: Make sure that RIC/record in the post message is the one you would like to add to ATS.
    • If yes, you should do nothing because the RIC exists on ATS already.
    • If no, modify the source code to insert the correct RIC.

4) text="[500]: Unknown Item", NackCode: 2 or "DeniedBySrc" in WebSocketAPI

 

- Other ATS commands apart from ATS_INSERT_S or ATS_ADDFIELD_S or ATS_DELETE or ATS_DELETE_ALL.

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="1" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_INSERT_s"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: An application sets an incorrect ATS command in the post message. In the example message above, the ATS command specified in name is ATS_INSERT_s which is incorrect; it should be ATS_INSERT_S. The ATS commands are the case sensitive.
  • Solution: Correct the ATS command in the post message to be the correct one e.g. ATS_INSERT_S or ATS_ADDFIELD_S or ATS_DELETE and ATS_DELETE_ALL according to your requirement.

- Updating data.

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="3" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEWs.RIC"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: An application tries to update data of a RIC which does not exist on ATS. In the example message above, the RIC name is NEWs.RIC.
  • Solution: Make sure that RIC/record in the post message is the one you would like to update data.
    • If yes, add the RIC to ATS first. Please refer to Inserting a RIC/record section.
    • If no, modify the source code to update data of the correct RIC.

- Deleting field(s).

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="4" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: An application tries to delete field(s) of a RIC which never contains these field(s) or have already been deleted.
  • Solution: Make sure that the field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, and you would like the RIC exists on ATS. Please refer to Inserting a RIC/record section to add the RIC with your desired fields.
    • If no, modify the source code to delete the field(s) of the correct RIC according to your requirements.

- Deleting a RIC.

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="5" nakCode="DENIED_BY_SRC" text="[500]: Unknown Item" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE_ALL"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: An application tries to delete a RIC which never exists on ATS or have already been deleted.
  • Solution: Make sure that the RIC in the post message are correct according to your requirements.
    • If yes, you should do nothing because the RIC does not exists on ATS any more.
    • If no, modify the source code to delete the correct RIC according to your requirements.

5) text="[502]: Unknown Fid", NackCode: 2 or "DeniedBySrc" in WebSocketAPI

- Adding field(s).

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="2" nakCode="DENIED_BY_SRC" text="[502]: Unknown Fid" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_ADDFIELD_S"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Causes:
    • An application tries to add field(s) to a RIC which contains these field(s) already.
    • An application tries to add field(s) which ATS does not support.
  • Solution: Make sure that field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, You may want to contact your local MarketData support or administrator for the list of fields which you can add to a RIC on ATS. If ATS is hosted by Refinitiv, please contact the account team.
    • If no, modify the source code to add the correct fields to the correct RIC according to your requirements.

- Updating data.

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="3" nakCode="DENIED_BY_SRC" text="[502]: Unknown Fid" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="NEW.RIC"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Causes: an application tries to update data of a RIC specified in name(NEW.RIC) which does not contains these field(s).
  • Solution: Make sure that field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, add the field(s) to the RIC first. Please refer to Adding Fields section.
    • If no, modify the source code to update data of the correct field(s) to the correct RIC according to your requirements.

- Deleting field(s).

An example of Ack message:

    	
            

<ACK domainType="MARKET_PRICE" streamId="1" containerType="NO_DATA" flags="0x32 (HAS_TEXT|HAS_MSG_KEY|HAS_NAK_CODE)" ackId="4" nakCode="DENIED_BY_SRC" text="[502]: Unknown Fid" dataSize="0">

     <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="267" name="ATS_DELETE"/>

     <dataBody>

     </dataBody>

</ACK>

  • Root Cause: An application tries to delete field(s) of a RIC which never contains these field(s) or have been deleted from the RIC already.
  • Solution: Make sure that field(s) and the RIC in the post message are correct according to your requirements.
    • If yes, you should do nothing because the RIC does not have these field(s) any more.
    • If no, modify the source code to delete the correct field(s) of the correct RIC according to your requirements.

Summary


After finish both articles in this series, you will understand more about ATS and Posting. Also, by using the Refinitiv Real-Time APIs you can fulfill general day-to-day operations with ATS more easily and quickly, including solving common problems. The technical knowledge explained in the article can help you to perform additional advanced use case scenarios to better utilize ATS’s usage further as well. If you would like to acquire ATS, please contact Refinitiv Account team for the process and details. You can contact the support team directly by submitting your query to Get Support in MyRefinitiv if you require any ATS assistance.

References

For further details, please check out the following resources: