Article

EMA Java Non-Interactive Provider in Scala language

Pimchaya Wongrukun
Developer Advocate Developer Advocate

 Introduction

This article is intended for anyone who is interested in using Enterprise Message API (EMA - formerly known as Elektron Message API) - Java in Scala language to publish real-time level1 data i.e. Market Price, level 2 data i.e. Market By OrderMarket By Price and Market Maker to the server, Advanced Data Hub (ADH), which is a Refinitiv Real-Time Distribution System infrastructure component used to distribute data items to sink components. This article also demonstrates how to use connection failover feature, recovering to publish data and closed the published item stream.

EMA is one part of the Refinitiv Real-Time Software Development Kit or RTSDK (formerly known as ESDK). EMA is an easy-to-use API providing publish real-time data in Refinitiv Wire Format (RWF), high-performance open source API that operates at the Message Layer. Therefore, the developers generally only need to deal with publishing data and status messages. For more details of EMA and the resource e.g. Overview, Quick Start, Documentation, Downloads and Tutorials, please refer to Real-Time SDK - Java.

Scala Overview

Scala was developed specifically with the aim of being a better language than Java. Scala is a modern multi-paradigm programming language designed to express common programming patterns in a concise, elegant, and type-safe way. It smoothly integrates features of object-oriented and functional languages. Scala is designed to interoperate well with the popular Java Runtime Environment (JRE). In particular, the interaction with the mainstream object-oriented Java programming language is as smooth as possible. Hence, you can write a Scala application to call any Java interfaces e.g. EMA Java. Scala syntax is also closed to the Java language so the learning curve for the Java developers is minimized. Therefore, you can develop EMA Java application in Scala within minimum time to create a better Java application e.g. extremely compact and easier to write. For more details of Scala including Documentation, Download, Online Courses, please refer to Scala page.

Non-Interactive Providers (NIP) Overview

If you would like to expose a specific data service to consumer applications, building a Provider application should be considered. Provider applications are able to connect to a ADH server to leverage its real-time and streaming distribution system to publish data. This is an efficient solution to make your own set of capabilities (e.g. content, workflow, etc.) available to your consumer applications.

Non-Interactive Providers are not solicited by infrastructure requests but instead, they publish information (e.g. Available services, data items…) following their own timing. In this model Refinitiv Real-Time Distribution System caches the published information and serves it to the demanding consumer applications as shown in the NIP Feature Diagram below:

Therefore, NIP can publish level1 and level 2 data into the ADH cache without needing to handle/wait for requests from any consumer applications. Whenever a consumer application sends a request for data to Advanced Distribution Server (ADS) which is a component of Refinitiv Real-Time Distribution System,  ADS forwards the request to ADH. Finally, ADH provides data kept in the cache which is received from NIP previously to the consumer application. For more details about Non-Interactive Provider, please refer to API Concepts Guide

Connection Failover

To use this feature, you just specify a list of failover servers via the ChannelSet configuration parameter. This parameter specifies an ordered list of Channel to which NIP will attempt to connect, one at a time, if the previous one fails to connect. When the NIP is disconnected from the current ADH e.g. ADH goes down or the network problem occurs, EMA will automatically retry to connect and log in to the next ADH specified in the ChannelSet.

Recovering to Publish Data

Unlike Connection Failover which is performed by EMA automatically, the NIP needs to perform this process by itself after EMA recovers the connection to another ADH successfully. This is illustrated in ex360_MP_ConnectionRecovery shipped with Real-Time SDK Java Package as well.  We will explain this process in details later in the article.

Prerequisites

  1. Download the latest Real-Time SDK Java Package
  2. Declare the service sent by the NIP in  ADH configuration files of both Real-Time Distribution sites for connection failover feature. This process must be executed by your Market Data administrator team. For information, please refer to ADH Configuration for NI Providers which explains the steps involved in this process. If Market Dat administration team requires any assistance regarding configuring ADH/ADS server, they can submit the query to Refinitiv Real-Time support team directly via Get Support in MyRefinitiv. In this article, we will use the service named NI_PUB which is one of the default service configured in EmaConfig.xml shipped with Real-Time SDK Package. 
  3. In this article, I will use the default configuration for NIP which is Provider_1 node in EmaConfig.xml file shipped with Real-Time SDK package. Provider_1 node consists of all ADH server hosts and ports where NIP will connect to and source directory info which EMA will publish to ADH automatically. To make EMA reads the file, put the configuration file in the working directory. Then, modify it to:
    • Under NiProvider named Provider_1, replace <Channel value="Channel_3"/> with <ChannelSet value="Channel_3, Channel_6"/> to set EMA performs connection failover according to a list of failover servers. For example, when ADH specified in Channel_3 goes down, EMA tries to connect to ADH specified in Channel_6.
    • Modify Host and Port of Channel_3 used by the default Provider_1 to be your primary ADH
    • Copy Channel_3 and rename it to be Channel_6, modify Host and Port to be your secondary ADH
    • By default, Service NI_PUB supports the capabilities: 6(Market Price) and MMT_MARKET_BY_ORDER. Hence, add the remaining level 2 data which are MMT_MARKET_BY_PRICE and 9(Market Maker)

The EMA configuration should be like the below:

    	
            

<NiProviderList>

<NiProvider>

      <Name value="Provider_1"/>

  …

      <ChannelSet value="Channel_3, Channel_6"/>

<Directory value="Directory_1"/>

<ChannelList>

<Channel>

<Name value="Channel_3"/>

<Host value="192.168.27.11"/>

<Port value="14003"/>

</Channel>     

<Channel>

<Name value="Channel_6"/>

<ChannelType value="ChannelType::RSSL_SOCKET"/>

<GuaranteedOutputBuffers value="5000"/>

<ConnectionPingTimeout value="30000"/>

<TcpNodelay value="1"/>

<Host value="192.168.27.12"/>

<Port value="14003"/>

</Channel>

<Directory>

<Name value="Directory_1"/>

<Service>

<Name value="NI_PUB"/>

<InfoFilter>

          <Capabilities>

<CapabilitiesEntry value="6"/>

<CapabilitiesEntry value="MMT_MARKET_BY_ORDER"/>

<CapabilitiesEntry value="MMT_MARKET_BY_PRICE"/>

<CapabilitiesEntry value="9"/>

</Capabilities>       

In the example configuration above, EMA tries to connect to the first channel, Channel_3 (ADH on IP 192.168.27.11), specified in the ChannelSet. Whenever EMA fails to connect to Channel_3, it will try to connect to the next Channel, Chanel_6 (ADH on IP 192.168.27.12).

 The Non-Interactive Provider Sequence Diagram

The figures below are the NIP sequence diagram which shows the interaction between NIP and ADH. The diagram also covers all tasks performed by NIP which we will develop as mentioned in the Introduction section. 

A. The normal situation without any failures

For more details of permission data (Dacs Lock) mentioned in the optional step 5th, please refer to DACS Lock Developers Guide

B. Failure situation that makes NIP disconnects e.g. ADH goes down, the network problem occurs.

Note: A green arrow and orange arrow represents NIP task and ADH task respectively while the gray arrow represents the EMA task. 

To make a simple NIP and focus on the features and the tasks, we let EMA sends automatically a source directory info configured in the EmaConfig.xml file. Alternatively, this can be performed programmatically by the application itself as demonstrated in ex300_MP_Streaming shipped with Real-Time SDK Java Packages  

Source Code

This section is to explain how to develop a NIP application using EMA Java in Scala language according to The Non-Interactive Provider Sequence Diagram section. Our example application consists of 2 categories:

  • Data classes which are MarketPrice, MarketByPrice, MarketByOrder, and MarketMaker. Each class contains all the relevant information regarding an item of each domain type. They are used as the source of canned data for the NIP. Each class has the methods to generate the content of the payload contained in a refresh and an update message according to its domain type i.e. Market Price, Market By OrderMarket By Price and Market Maker
  • NiProvider object and AppClient class. NiProvider is the main class of this application. It publishes data generated by the methods in the data classes, MarketPrice, MarketByPrice, MarketByOrder and MarketMaker to ADH according to the input domainType application’s parameter. NiProvider object performs all tasks shown in The Non-Interactive Provider Sequence Diagram section(green arrows). AppClient class receives and updates the login/connection events to update the ADH connection status if it is ready to be published.  The status is used by NiProvider object.

The overview steps to develop the application are listed below:

  1. Create MarketPrice class in MarketPrice.scala file to generate Market Price data as an example shown below: 
    	
            

class MarketPrice {

  //Refresh message contains all fields below

  //field id 1 PROD_PERM(PERMISSION) will be added in the refresh message if refreshPE > 0

  private var dsplyName: String = "DUMMY MP" //field id 3

  private var currency: Int = 764 //field id 15, 764 is Thai baht

  private var bid: Int = 1875 //field id 22, 2 decimals

  private var ask: Int = 1901 //field id 25, 2 decimals

  private var bidSize: Int = 221900 //field id 30

  private var askSize: Int = 125920 //field id 31

  

  //The method sets new value of each field in a refresh and an update message

  def updateData() {

    bid = bid + 111;

    ask = ask + 121;

    bidSize = bidSize + 120

    askSize= askSize + 110

  }

  

  //The method returns Market Price data contained in the payload of a refresh message

  def generateMPRefresh(refreshPE: Long, update: Boolean) : FieldList =   {

      //updateData() is invoked after fail over is successful, 

      //then creates the payload of a refresh message with new values

      //otherwise, use the initialized values

      if(update)

        updateData()

        

      //Create a field list  

      val fieldList:FieldList = EmaFactory.createFieldList();

      if(refreshPE > 0) {

        fieldList.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))

      }      

      fieldList.add( EmaFactory.createFieldEntry().rmtes(3, ByteBuffer.wrap(dsplyName.getBytes())))

      fieldList.add( EmaFactory.createFieldEntry().enumValue(15,  764))

      fieldList.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))

      fieldList.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))

      fieldList.add( EmaFactory.createFieldEntry().real(30,bidSize,MagnitudeType.EXPONENT_0))

      fieldList.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))

      return fieldList 

   }

  

  //The method returns Market Price data contained in the payload of an update message

  def generateMPUpate() : FieldList =   {

      //Set new value to each field

      updateData()

      

      //Create a field list

      val fieldList:FieldList = EmaFactory.createFieldList();   

      fieldList.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))

      fieldList.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))

      fieldList.add( EmaFactory.createFieldEntry().real(30,bidSize,MagnitudeType.EXPONENT_0))

      fieldList.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))

      return fieldList 

   }

  

  

}

2. Create MarketByOrder class in MarketByOrder.scala file to generate Market By Order data as an example shown below: 

    	
            

class MarketByOrder {

  //Summary data also contains field id 1709 and 3423 with its fixed value.

  //field id 1 PROD_PERM(PERMISSION) will be added in summary data if refreshPE > 0

  private val dsplyName: String = "DUMMY MBO" //field id 3

  private val currency: Int = 0 //field id 15

  private var activeDate: Calendar = null //field id 17

  

  //The key for each map entry

  private var entryKey: Long = 12102

  

  //MapEntry data also contains field id 6522 

  //which is the same value as the activeDate variable

  private var orderId: Long = 1072130 //field id 3426

  private var orderPrc: Int = 7925 //field id 3427, 2 decimals

  private var orderSide: Short = 1 //field id 3428, BID(1) or ASK(2)

  private var orderSize: Int = 500 //field id 3429

  private var prTimMs: Long =  4800050//field id 6520

 

  

  //The method sets new value of each field in a refresh and an update message

  def updateData() {

    orderId = orderId +10

    orderPrc = orderPrc + 121

    if(orderSide == 1)

      orderSide = 2

    else 

      orderSide = 1

    orderSize= orderSize + 100

    prTimMs = prTimMs + 1000

    entryKey = entryKey + 9

  }

  

  //The method returns Market by Order data contained in the payload of a refresh message

  def generateMBORefresh(refreshPE: Long, update: Boolean) : Map =   {

    //updateData() is invoked after fail over is successful, 

    //then creates the payload of a refresh message with new values

    //otherwise, use the initialized values

    if(update)

        updateData()

        

    //Create summary data

    val mapSummaryData: FieldList  = EmaFactory.createFieldList();

    if(refreshPE > 0) {

        mapSummaryData.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))

     }

    

    mapSummaryData.add( EmaFactory.createFieldEntry().rmtes(3, ByteBuffer.wrap(dsplyName.getBytes())))

mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(15,  764));

    activeDate = Calendar.getInstance(TimeZone.getTimeZone("UTC"))

    mapSummaryData.add( EmaFactory.createFieldEntry().date(17, 

                  //month is zero based so + 1

                  activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1, activeDate.get(Calendar.DATE)))

mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(1709,  158));

mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(3423,  1));

val map: Map = EmaFactory.createMap();

map.summaryData(mapSummaryData);

 

//Create a map containing 5 map entries

for( a <- 1 to 5) {

  val entryData: FieldList = EmaFactory.createFieldList()

  entryData.add(EmaFactory.createFieldEntry().rmtes(3426, ByteBuffer.wrap(orderId.toString().getBytes())));

  entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))

  entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));

    entryData.add(EmaFactory.createFieldEntry().real(3429, orderSize, MagnitudeType.EXPONENT_0))

    entryData.add(EmaFactory.createFieldEntry().uintValue(6520,prTimMs))

  entryData.add( EmaFactory.createFieldEntry().date(6522, 

                //month is zero based so + 1

                    activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1 , activeDate.get(Calendar.DATE)))

  map.add(EmaFactory.createMapEntry().keyAscii(entryKey.toString(), MapEntry.MapAction.ADD, entryData));

    if (a < 5)

  updateData();

}

    return map 

   }

  

  //The method returns Market by Order data contained in the payload of an update message

  def generateMBOUpdate() : Map =   {

    //Set new value to each field

    updateData();

    

    //Create a map containing 1 map entry

val map: Map = EmaFactory.createMap();

val entryData: FieldList = EmaFactory.createFieldList()

entryData.add(EmaFactory.createFieldEntry().rmtes(3426, ByteBuffer.wrap(orderId.toString().getBytes())));

  entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))

  entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));

  entryData.add(EmaFactory.createFieldEntry().real(3429, orderSize, MagnitudeType.EXPONENT_0))

  entryData.add(EmaFactory.createFieldEntry().uintValue(6520,prTimMs))

  entryData.add( EmaFactory.createFieldEntry().date(6522, 

                //month is zero based so + 1

                    activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1 , activeDate.get(Calendar.DATE)))

  map.add(EmaFactory.createMapEntry().keyAscii(entryKey.toString(), MapEntry.MapAction.ADD, entryData));

    return map 

  }

}

3. Create MarketByPrice class in MarketByPrice.scala file to generate Market By Price data as an example shown below: 

    	
            

class MarketByPrice {

  //Summary data 

  //field id 1 PROD_PERM(PERMISSION) will be added in summary data if refreshPE > 0

  private val dsplyName: String = "DUMMY MBP" //field id 3

  private val currency: Int = 764 //field id 15, 764 is Thai baht

  private var activeDate: Calendar = null //field id 17

  private var seqNum: Int = 450596 //field id 1021

  private var timActMs: Long = 30802966 //field id 4148

  

  //The key for each map entry

  private var entryKey: String = "113.05A" //it is orderPrc + A e.g. 113.05A

  

  //MapEntry data

  private var orderPrc: Int = 11305 //field id 3427, 2 decimals so it is 113.05

  private var orderSide: Short = 1 //field id 3428, BID(1) or ASK(2)

  private var noOrd: Int = 1 //field id 3430

  private var accSize: Int = 1525//field id 4356

  private var lvTimMs: Long = 18009780 //field id 6527

  

 //The method sets new value of each field in a refresh and an update message

  def updateData(updatedSum: Boolean) {

    //if updateSum is true, summary data's fields in a message are new values 

    if(updatedSum) {

      timActMs = timActMs +1000

      seqNum = seqNum + 25

    }

    

    orderPrc = orderPrc + 213

    if(orderSide == 1)

      orderSide = 2

    else 

      orderSide = 1

    accSize= accSize + 147

    noOrd = noOrd + 1

    lvTimMs = lvTimMs + 1000

    

    //generate entry key based on orderPrc

    entryKey = orderPrc.toString().substring(0,3) + "." +orderPrc.toString().substring(3) + "A"

  }

   //The method returns Market by Price data contained in the payload of a refresh message

  def generateMBPRefresh(refreshPE: Long, update: Boolean) : Map =   {

    //updateData() is invoked after fail over is successful, 

    //then creates the payload of a refresh message with new values

    //otherwise, use the initialized values

    if(update)

        updateData(true)

        

    //Create summary data

    val mapSummaryData: FieldList  = EmaFactory.createFieldList();

    if(refreshPE > 0) {

        mapSummaryData.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))

     }

    

    mapSummaryData.add( EmaFactory.createFieldEntry().rmtes(3, ByteBuffer.wrap(dsplyName.getBytes())))

mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(15,  764));

    activeDate = Calendar.getInstance(TimeZone.getTimeZone("UTC"))

    mapSummaryData.add( EmaFactory.createFieldEntry().date(17, 

                  //month is zero based so + 1

                  activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1, activeDate.get(Calendar.DATE)))

mapSummaryData.add(EmaFactory.createFieldEntry().real(1021,seqNum,MagnitudeType.EXPONENT_0))

mapSummaryData.add(EmaFactory.createFieldEntry().uintValue(4148, timActMs))

val map: Map = EmaFactory.createMap()

map.summaryData(mapSummaryData);

    //Create a map containing 5 map entries

for( a <- 1 to 5) {

  val entryData: FieldList = EmaFactory.createFieldList()

  entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))

  entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));

    entryData.add(EmaFactory.createFieldEntry().uintValue(3430, noOrd))

    entryData.add(EmaFactory.createFieldEntry().real(4356,accSize,MagnitudeType.EXPONENT_0))

  entryData.add( EmaFactory.createFieldEntry().uintValue(6527, lvTimMs))

  map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));

    if (a < 5)

  updateData(false)

}

    return map 

   }

  

  //The method returns Market by Price data contained in the payload of an update message

  def generateMBPUpdate() : Map =   {

    //Set new value to each field

    updateData(true)

    

    //Create summary data

    val mapSummaryData: FieldList  = EmaFactory.createFieldList();

    mapSummaryData.add(EmaFactory.createFieldEntry().real(1021,seqNum,MagnitudeType.EXPONENT_0))

mapSummaryData.add(EmaFactory.createFieldEntry().uintValue(4148, timActMs))

val map: Map = EmaFactory.createMap();

    map.summaryData(mapSummaryData);

    

     //Create a map one map entry

val entryData: FieldList = EmaFactory.createFieldList()

entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))

    entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));

  entryData.add(EmaFactory.createFieldEntry().uintValue(3430, noOrd))

  entryData.add(EmaFactory.createFieldEntry().real(4356,accSize,MagnitudeType.EXPONENT_0))

  entryData.add( EmaFactory.createFieldEntry().uintValue(6527, lvTimMs))

  map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));

    return map 

  }

}

4. Create MarketMaker class in MarketMaker.scala  file to generate Market Maker data as an example shown below:

    	
            

class MarketMaker {

  //SummaryData also contains field id 1709 with fixed value

  //field id 1 PROD_PERM(PERMISSION) will be added in summary data if refreshPE > 0

  private var activeDate: Calendar = null //field id 17

  private var timActMs: Long = 75610613 //field id 4148

  

  //MapEntry data

  private var bid: Int = 4115 //field id 22, 2 decimals

  private var ask: Int = 4282 //field id 25, 2 decimals

  private var bidSize: Int = 100 //field id 30

  private var askSize: Int = 110 //field id 31

  private var askTimMs: Long = 43459027 //field id 4147

  private var bidTimMs: Long = 42911172 //field id 4150

  

  //the prefix for an entryKey

  private val prefix: String = "ARIC_"

  private var entryKey: String = ""

  

  //generate an entryKey for a map entry

  def generateEntryKey(): String = {

    //random 3 characters from A to Z 

    val subfix: Array[Char] = Array( (Random.nextInt(26) + 65).toChar , (Random.nextInt(26) + 65).toChar,  (Random.nextInt(26) + 65).toChar )

    //Example entryKey:ARIC_KRR, ARIC_VXQ

    return(prefix + subfix.mkString)

 }

 

  //The method sets new value of each field in a refresh and an update message

  def updateData() {

    bid = bid + 211;

    ask = ask + 221;

    bidSize = bidSize + 10

    askSize= askSize + 15

    bidTimMs = bidTimMs + 1000

    askTimMs = askTimMs + 1000

  }

  

  //The method returns Market Maker data contained in the payload of a refresh message

  def generateMMRefresh(refreshPE: Long, update: Boolean) : Map =   {

    //updateData() is invoked after fail over is successful, 

    //then creates the payload of a refresh message with new values

    //otherwise, use the initialized values

    if(update)

        updateData()

    

    //Create a sumary data

    val mapSummaryData: FieldList  = EmaFactory.createFieldList();

    if(refreshPE > 0) {

        mapSummaryData.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))

     }

    activeDate = Calendar.getInstance(TimeZone.getTimeZone("UTC"))

    mapSummaryData.add( EmaFactory.createFieldEntry().date(17, 

                  //month is zero based so + 1

                  activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1, activeDate.get(Calendar.DATE)))

mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(1709,  320));

mapSummaryData.add(EmaFactory.createFieldEntry().uintValue(4148,  timActMs));

//Create a map containing 5 map entries

val map: Map = EmaFactory.createMap()

map.summaryData(mapSummaryData)

  for( a <- 1 to 5) { 

  val entryData: FieldList = EmaFactory.createFieldList()

  entryData.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))

      entryData.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))

      entryData.add( EmaFactory.createFieldEntry().real(30, bidSize,MagnitudeType.EXPONENT_0))

      entryData.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))

      entryData.add(EmaFactory.createFieldEntry().uintValue(4147,askTimMs))

      entryData.add(EmaFactory.createFieldEntry().uintValue(4150,bidTimMs))

      entryKey = generateEntryKey()

  map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));

    if (a < 5)

  updateData();

}

    return map 

   }

  

  //The method returns Market Maker data contained in the payload of an update message

  def generateMMUpdate() : Map =   {

    //Set new value to each field

    updateData();

    

    //Create a map containing 1 map entry

val map: Map = EmaFactory.createMap();

val entryData: FieldList = EmaFactory.createFieldList()

entryData.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))

    entryData.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))

    entryData.add( EmaFactory.createFieldEntry().real(30, bidSize,MagnitudeType.EXPONENT_0))

    entryData.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))

    entryData.add(EmaFactory.createFieldEntry().uintValue(4147,askTimMs))

    entryData.add(EmaFactory.createFieldEntry().uintValue(4150,bidTimMs))

    entryKey = generateEntryKey()

  map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));

    return map 

  }

}

5. Create NiProvider.scala  file which contains following:

               A.NiProvider object contains the main (args:Array[String]) which extracts the following program parameters from the command line:

  • -nipNode <NiProviderName>. The Name of NiProvider node in EmaConfig.xml. The default is Provider_1. The node contains the IP and port of ADH server(s) where the application connects to.
  • -service <service_name>. The Service name. The default is NI_PUB.
  • -user <name>. The Name of the application user. The default is user.
  • -domainType <domain_type>. The Domain Type of the published item. The default is MARKET_PRICE. The valid Domain Types supported by this application are MARKET_PRICE, MARKET_BY_ORDER, MARKET_BY_PRICE, MARKET_MAKER.
  • -itemName <a RIC>. The published RIC name. The default is DUMMY.N.
  • -updateTimes <number_updates>. The number of updates to be published. The default is 10 updates.
  • -updateInterval <update_interval_in_sec>. The Update interval in seconds. The default is 5 seconds.
  • -refreshPE <a PE>. a PE for refresh messages. This requires dacsServiceId. The default is no PE.
  • -dacsServiceId <a service id>. The serviceID to generate DACS lock for the PE. The default is no serviceID.

Then, the object performs the tasks according to  The Non-Interactive Provider Sequence Diagram section (green arrows). It calls the method in AppClient class to get the current ADH connection status. The snipped source code in the main (args:Array[String]) method:

    	
            

def main (args:Array[String]) {

     

     //Extract program parameters from the command line.

     getCommandLineOptions(args);

  

     //Show all parameters used by the application

     showAllParameters();

     

     //EMA part to listen to the login events/connection events, publish data

     //according to the parameters from the command line

     var config: OmmNiProviderConfig = null

     var provider: OmmProvider = null

     val nipClient: AppClient  = new AppClient()

     var sendRefreshMsg:Boolean = false

     try {

        //Read a Non-Interactive Provider configuration's node named nipNode's value in EmaConfig.xml

        //the node should contain the ChannelSet parameter for fail over process 

        //Hence, when the first ADH is down, EMA tries to connect to the second ADH defined in the ChannelSet parameter 

        config  = EmaFactory.createOmmNiProviderConfig().providerName(nipNode);

        

        //add the second parameters, nipClient instance, containing the callback methods to process events e.g. login events

        //The events are generated when the login stream changes including the change of the connection state(connection is up or down)  

        provider = EmaFactory.createOmmProvider(config.username(user).operationModel(OmmNiProviderConfig.OperationModel.USER_DISPATCH),nipClient)

        

        //dispatch the login events

        provider.dispatch( 1000000 );

        

        //create a refresh message according to the input domain type

        val aRefreshMsg: RefreshMsg  = EmaFactory.createRefreshMsg().serviceName(service).name(itemName).domainType(domainType)

.state(OmmState.StreamState.OPEN, OmmState.DataState.OK, OmmState.StatusCode.NONE, "UnSolicited Refresh Completed")

.complete(true);

        

       //check if permissionData/DacsLock must be created

       if(addPermissionData) {

         //if yes, create a DacsLock according to a service id and a PE

     permData = createDacsLock(dacsServiceId,refreshPE)

       }

       //if permissionData/DacsLock is created successfully

   if(permData != null) {

      //add PROD_PERM field(refreshPE) and other fields according to the domain type in the payload of refresh message

     //the values of all fields are the initialized values(false)

          aRefreshMsg.payload(domainTypeMapRefresh(domainType)(refreshPE,false))

          //add permissionData/DacsLock in the refresh message

      aRefreshMsg.permissionData(permData.data());

   }

   //if permissionData/DacsLock is not created or failed to create it.

   else {

     //inform the refresh message does not have permData and PROD_PERM field

     println(NO_PERMISSSON_PRODPERM)

     //not add PROD_PERM field(0) 

     //but add other fields according to the domain type in the payload of refresh message

     //the values of all fields are the initialized values(false)

     aRefreshMsg.payload(domainTypeMapRefresh(domainType)(0,false))

   }

   //publish the first data message, refresh message

provider.submit( aRefreshMsg, itemHandle);

println("a Refersh Msg has been published");

//waiting for ADH acknowledges that the nip service is up and get the refresh message

Thread.sleep(5000);

var anUpdateMsg: UpdateMsg = null

var num: Int = 0

println("Start publishing " + updateTimes + " updates every " + updateInterval + " seconds.");

while(num < updateTimes) { //publish the number of update messages according to the updateTimes parameter

    //if the next sent message is an update message(not refresh message)

    if(sendRefreshMsg == false) {

      //wait updateInterval seconds before publishing an update message 

      Thread.sleep(updateInterval*1000) 

    }

    //Let EMA dispatches an event when there is any connection state changes(up to down or down to up). 

    provider.dispatch( 1000000 ); 

    //if ADH is ready to be published; the connection state is up

    if ( nipClient.isConnectionUp())

  { 

    if ( sendRefreshMsg ) 

    // when fail over from down ADH to up ADH is successful, re-send a refresh message to up ADH

  {

        //create a refresh message with new values(true)

        aRefreshMsg.payload(domainTypeMapRefresh(domainType)(refreshPE,true))

        //publish a refresh message

        provider.submit( aRefreshMsg, itemHandle);

        //mark flag not to send a refresh message again

          sendRefreshMsg = false

  }//Otherwise, send update message of the the input domain type.

      //and increase the number of published update messages 

    else {

        anUpdateMsg =  EmaFactory.createUpdateMsg().serviceName(service).name(itemName).domainType(domainType)

        anUpdateMsg.payload(domainTypeMapUpdate(domainType)())

        provider.submit( anUpdateMsg, itemHandle)  

        num += 1

    }

  }

    //if ADH is not ready to be published(connection state is down),

    //wait EMA tries to connect to the next ADH specified in ChannelSet

    //and mark flag to send a refresh message after connect to the next ADH successfully

    else {

      sendRefreshMsg = true;

    }

}

println("All " + updateTimes + " updates have been published ");

println("Close Stream item " + itemName);

//Send close status message to close the item stream.

provider.submit( EmaFactory.createStatusMsg().serviceName(service).name(itemName).domainType(domainType).state(OmmState.StreamState.CLOSED, OmmState.DataState.SUSPECT,

OmmState.StatusCode.NONE, itemName + " Stream Closed"), itemHandle);

//Wait for that consumer applications will receive close stream item message after the last update message

Thread.sleep(10000);

}

      catch {

        case excp @ (_: InterruptedException | _: OmmException) =>

          println("Exception:" +excp.getMessage)

 

      } finally if (provider != null) 

        //log out and disconnect from ADH 

        provider.uninitialize()

   }

B.AppClient class implementing OmmProviderClient which contains the callback methods to receive and process any messages e.g. refresh via onRefreshMsg(..) method, status via onStatusMsg(..) method. AppClient checks if an event informs the ADH connection is ready to be published or not and updates the connection status. The class exposes a method to get the status used by NiProvider object. The snipped source code in AppClient class:

    	
            

class AppClient extends OmmProviderClient {

  //The variable keeping the connection state between non-interactive provider(NIP) and ADH

  //if the connection is ready to publish. 

  //true is connection up and NIP logs in successfully otherwise the variable is false.

  var _connectionUp: Boolean = false

def isConnectionUp(): Boolean = {

return _connectionUp;

}

def onRefreshMsg(refreshMsg: RefreshMsg, event: OmmProviderEvent)

{

    println(refreshMsg);

//After NIP can connect to ADH and logs in, 

    //it receives a message indicating that the login is successful.

if ((refreshMsg.state().streamState() == OmmState.StreamState.OPEN) 

    && (refreshMsg.state().dataState() == OmmState.DataState.OK))

  _connectionUp = true;

//otherwise fail to connect or logs in to ADH

else 

  _connectionUp = false;

}

def onStatusMsg(statusMsg: StatusMsg , event: OmmProviderEvent) 

{

 

  println(statusMsg);

if (statusMsg.hasState())

{

   //After NIP can connect to ADH and logs in, 

      //it receives a message indicating that the login is successful.

if (( statusMsg.state().streamState() == OmmState.StreamState.OPEN) 

    && (statusMsg.state().dataState() == OmmState.DataState.OK))

  _connectionUp = true

//otherwise fail to connect or logs in to ADH

   else 

     _connectionUp = false

}

}

def onGenericMsg(genericMsg: GenericMsg, event: OmmProviderEvent){}

def onAllMsg(msg:Msg ,event:  OmmProviderEvent){}

def onPostMsg(postMsg: PostMsg,providerEvent:  OmmProviderEvent) {}

def onReqMsg(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {}

def onReissue(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {}

def onClose(reqMsg: ReqMsg ,providerEvent:  OmmProviderEvent) {}

}

You can download the complete NIP Java application source code explained above at Refinitiv-API-Samples/Article.EMA.Scala.NonInteractiveProvider GitHub page.

Build and Run the application

EMA, ETA, and EMA's dependency library files are required to build and run the EMA Java NIP application. Fortunately, all files are shipped with the Real-Time SDK Java package. Hence, you just set the CLASSPATH to all of them before building and running the application.

The following example command lines show how to set the CLASSPATH, build and run the application on Windows: 

  1. Set the classpath to EMA, ETA, and EMA's dependency library files. For example:
    	
            
set CLASSPATH=C:\RTSDK-2.0.1.L1.java.rrg\Java\Ema\Libs\ema-3.6.1.0.jar;C:\RTSDK-2.0.1.L1.java.rrg\Java\Eta\Libs\eta-3.6.1.0.jar;C:\RTSDK-2.0.1.L1.java.rrg\Java\Eta\Libs\etaValueAdd-3.6.1.0.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\jdacsetalib.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-collections-3.2.2.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-configuration-1.10.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-lang-2.6.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Ema\Libs\apache\commons-logging-1.2.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\SLF4J\slf4j-1.7.12\slf4j-api-1.7.12.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\SLF4J\slf4j-1.7.12\slf4j-jdk14-1.7.12.jar;C:\RTSDK-2.0.1.L1.java.rrg\RTSDK-BinaryPack\Java\Eta\Libs\xpp3-1.1.4c.jar

2. Compile the application using scalac command: 

    	
            
scalac com\refinitiv\ema\example\scala\*.scala

3. Run the application:

  • Include the path containing the Consumer class files generated from step 2 to the classpath:
    	
            
set CLASSPATH=.;%CLASSPATH%
  • Run the application using scala command. For example:
    	
            
scala com.refinitiv.ema.example.scala.NiProvider -user pimchayaProv -itemName NIP.MP -refreshpe 351 -dacsServiceId 4811

The command above is to log in with the user named pimchayaProv to publish the default domain type (Market Price) with the item named NIP.MP to the default service named NI_PUB. Specify a refresh message contains the permission data consisting of PE 351 and the service Id 4811, Hence, the field named PROD_PERM(PERMISSION field which field id is 1) with value 351(PE) is added. Moreover, the application publishes in the default rate which is 10 updates every 5 seconds.

The Example Result

When you run the NIP application to publish Market Price, you should see the messages like below:

    	
            

The application is using the following parameters:

nipNode=Provider_1

service=NI_PUB

user=pimchayaProv

domainType=MARKET_PRICE

itemName=NIP.MP

updateTimes=10

updateInterval=5

refreshPE=351

dacsServiceId=4811

 

Mar 10, 2021 4:18:14 PM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback

INFO: loggerMsg

    ClientName: ChannelCallbackClient

    Severity: Info

    Text:    Received ChannelUp event on channel Channel_7

        Instance Name Provider_1_1

        Component Version adh3.3.2.L1.linux.tis.rrg 64-bit

loggerMsgEnd

 

 

RefreshMsg

    streamId="1"

    domain="Login Domain"

    solicited

    RefreshComplete

    state="Open / Ok / None / 'Login accepted by host apis30.'"

    itemGroup="00 00"

    name="pimchayaProv"

    nameType="1"

    Attrib dataType="ElementList"

        ElementList

            ElementEntry name="ApplicationId" dataType="Ascii" value="256"

            ElementEntry name="ApplicationName" dataType="Ascii" value="ADH"

            ElementEntry name="Position" dataType="Ascii" value="192.168.68.115/WIN-V793K3HCLOL"

            ElementEntry name="SupportProviderDictionaryDownload" dataType="UInt" value="1"

        ElementListEnd

    AttribEnd

RefreshMsgEnd

 

a Refersh Msg has been published

Start publishing 10 updates every 5 seconds.

All 10 updates have been published

Close Stream item NIP.MP

While the NIP application is running, you can run the EMA Java Consumer named ex100_MP_Streaming to verify the NIP's service and its Market Price data. The Consumer application must connect to ADS (one component of Real-Time Distribution System) connecting to ADH which the NIP connect to. According to the example command line, you need to modify Consumer.java to request to the service named NI_PUB and the item named NIP.MP. The example result shown in ex100_MP_Streaming:

    	
            

Mar 10, 2021 4:18:23 PM com.refinitiv.ema.access.ChannelCallbackClient reactorChannelEventCallback

INFO: loggerMsg

    ClientName: ChannelCallbackClient

    Severity: Info

    Text:    Received ChannelUp event on channel Channel_1

Instance Name Consumer_1_1

Component Version ads3.3.2.L1.linux.tis.rrg 64-bit

loggerMsgEnd

 

 

RefreshMsg

    streamId="5"

    domain="MarketPrice Domain"

    solicited

    RefreshComplete

    state="Open / Ok / None / 'UnSolicited Refresh Completed'"

    itemGroup="00 01"

    permissionData="03 12 cb 35 1c"

    name="NIP.MP"

    nameType="1"

    serviceId="4811"

    serviceName="NI_PUB"

    Payload dataType="FieldList"

        FieldList FieldListNum="0" DictionaryId="0"

            FieldEntry fid="1" name="PROD_PERM" dataType="UInt" value="351"

            FieldEntry fid="3" name="DSPLY_NAME" dataType="Rmtes" value="DUMMY MP"

            FieldEntry fid="15" name="CURRENCY" dataType="Enum" value="764"

            FieldEntry fid="22" name="BID" dataType="Real" value="18.75"

            FieldEntry fid="25" name="ASK" dataType="Real" value="19.01"

            FieldEntry fid="30" name="BIDSIZE" dataType="Real" value="221900.0"

            FieldEntry fid="31" name="ASKSIZE" dataType="Real" value="125920.0"

        FieldListEnd

    PayloadEnd

RefreshMsgEnd

 

UpdateMsg

    streamId="5"

    domain="MarketPrice Domain"

    updateTypeNum="0"

    name="NIP.MP"

    serviceId="4811"

    serviceName="NI_PUB"

    Payload dataType="FieldList"

        FieldList

            FieldEntry fid="22" name="BID" dataType="Real" value="19.86"

            FieldEntry fid="25" name="ASK" dataType="Real" value="20.22"

            FieldEntry fid="30" name="BIDSIZE" dataType="Real" value="222020.0"

            FieldEntry fid="31" name="ASKSIZE" dataType="Real" value="126030.0"

        FieldListEnd

    PayloadEnd

UpdateMsgEnd

 

UpdateMsg

    streamId="5"

    domain="MarketPrice Domain"

    updateTypeNum="0"

    name="NIP.MP"

    serviceId="4811"

    serviceName="NI_PUB"

    Payload dataType="FieldList"

        FieldList

            FieldEntry fid="22" name="BID" dataType="Real" value="20.97"

            FieldEntry fid="25" name="ASK" dataType="Real" value="21.43"

            FieldEntry fid="30" name="BIDSIZE" dataType="Real" value="222140.0"

            FieldEntry fid="31" name="ASKSIZE" dataType="Real" value="126140.0"

        FieldListEnd

    PayloadEnd

UpdateMsgEnd

 

...

Troubleshooting

  • The NIP application shows "<user>, unknown to system."
    	
            

StatusMsg

    streamId="1"

    domain="Login Domain"

    state="Closed / Suspect / Not entitled / 'pimchayaProvx, unknown to system.'"

StatusMsgEnd

Resolution: The application logs in with the user who does not have the permission to access ADH. Please contact your infrastructure administrator or the Account team to ask for a valid username.

  • The NIP application shows“Attempt to submit initial RefreshMsg with a service name of NI_PUA that was not included in the SourceDirectory. Dropping this RefreshMsg.”
    	
            
Exception:Attempt to submit initial RefreshMsg with service name of NI_PUA that was not included in the SourceDirectory. Dropping this RefreshMsg.

Resolution:  The service specified in the command line running NIP does not match with the service’s name of <Directory> node which the application publishes to ADH.

For example, In EMAConfig.xml

    	
            

<Service>

<Name value="NI_PUB"/>

….

</Service>

Hence, you should use -service NI_PUB in the command line running the application.

  • The Consumer application shows "Capability not supported" 
    	
            

StatusMsg

    streamId="5"

    domain="MarketMaker Domain"

    state="Open / Suspect / None / 'Capability not supported'"

    name="NIP.MM"

    serviceName="NI_PUB"

StatusMsgEnd

Resolution: The consumer application requests an item from the NIP service which does not support the domain type of the item. In the example above, the service named NI_PUB does not support Market Maker domain type of the item named NIP.MM. Please make sure that <CapabilitiesEntry value="9"/> which is Market Maker has been added in NI_PUB service defined in EMAConfig.xml read by NIP.

  • The Consumer application shows "Service name of <service> is not found."
    	
            

StatusMsg

    streamId="0"

    domain="MarketPrice Domain"

    state="Closed / Suspect / None / 'Service name of 'NI_PUBX' is not found.'"

    name="NIPX.MP"

    serviceName="NI_PUBX"

StatusMsgEnd

Resolution: The consumer application requests an item from the wrong service (NI_PUBX); NIP does not provide this service name. Please recheck the service name is correctly typed in with your infrastructure administrator or the Account team.

  • The NI application shows“OmmNiProviderConfigImpl::providerName parameter [Provider_1] is an non-existent niprovider name
    	
            

Exception:OmmNiProviderConfigImpl::providerName parameter [Provider_1] is an non-existent niprovider

 Name

Resolution: EMA cannot find Provider_1 configuration node because EMAConfig.xml containing find Provider_1 configuration node is not in the application run directory. Please put EMAConfig.xml in the application run directory.

Summary

After finishing this article, you will understand more about EMA, NIP application and how to develop a NIP application using EMA to publish data in Scala language. Also, you can modify this application to serve your requirements using EMA easier and faster. Moreover, the article includes the common problems and how to solve them. If you do not have Refinitiv Real-Time Distribution System (ADH and ADS), please contact the Account team for the process and details. If you/your Market Data administrator requires any ADH and ADS assistance, you can contact the Refinitiv Real-Time support team directly by submitting your query to Get Support in MyRefinitiv