Introduction

This article is intended for anyone who is interested in using Elektron Message API (EMA) - Java in Scala language to publish real-time level1 data i.e. Market Price, level 2 data i.e. Market By OrderMarket By Price and Market Maker to the server, Advanced Data Hub (ADH), which is a Thomson Reuters Enterprise Platform (TREP) infrastructure component used to distribute data items to sink components. This article also demonstrates how to use connection failover feature, recovering to publish data and closed the published item stream.

EMA is one part of the Elektron Software Development Kit or Elektron SDK. EMA is an easy-to-use API providing publish real-time data in Reuters Wire Format(RWF), high-performance open source API that operates at the Message Layer. Therefore, the developers generally only need to deal with publishing data and status messages. For more details of EMA and the resource e.g. Overview, Quick Start, Documentation, Downloads and Tutorials, please refer to Elektron SDK - Java.

Scala Overview

Scala was developed specifically with the aim of being a better language than Java. Scala is a modern multi-paradigm programming language designed to express common programming patterns in a concise, elegant, and type-safe way. It smoothly integrates features of object-oriented and functional languages. Scala is designed to interoperate well with the popular Java Runtime Environment (JRE). In particular, the interaction with the mainstream object-oriented Java programming language is as smooth as possible. Hence, you can write a Scala application to call any Java interfaces e.g. EMA Java. Scala syntax is also closed to the Java language so the learning curve for the Java developers is minimized. Therefore, you can develop EMA Java application in Scala within minimum time to create a better Java application e.g. extremely compact and easier to write. For more details of Scala including Documentation, Download, Online Courses, please refer to Scala page.

Non-Interactive Providers (NIP) Overview

If you would like to expose a specific data service to consumer applications, building a Provider application should be considered. Provider applications are able to connect to a TREP to leverage its real-time and streaming distribution system to publish data. This is an efficient solution to make your own set of capabilities (e.g. content, workflow, etc.) available to your consumer applications.

Non-Interactive Providers are not solicited by infrastructure requests but instead, they publish information (e.g. Available services, data items…) following their own timing. In this model Thomson Reuters Enterprise Platform (TREP) caches the published information and serves it to the demanding consumer applications as shown in the NIP Feature Diagram below:

Therefore, NIP can publish level1 and level 2 data into the ADH cache without needing to handle/wait for requests from any consumer applications. Whenever a consumer application sends a request for data to Advanced Distribution Server(ADS) which is a component of TREP, ADS forwards the request to ADH. Finally, ADH provides data kept in the cache which is received from NIP previously to the consumer application. For more details about Non-Interactive Provider, please refer to API Concepts Guide

Connection Failover

To use this feature, you just specify a list of failover servers via the ChannelSet configuration parameter. This parameter specifies an ordered list of Channel to which NIP will attempt to connect, one at a time, if the previous one fails to connect. When the NIP is disconnected from the current ADH e.g. ADH goes down or the network problem occurs, EMA will automatically retry to connect and log in to the next ADH specified in the ChannelSet.

Recovering to Publish Data

Unlike Connection Failover which is performed by EMA automatically, the NIP needs to perform this process by itself after EMA recovers the connection to another ADH successfully. This is illustrated in example360__MarketPrice__ConnectionRecovery shipped with Elektron SDK Java Package as well.  We will explain this process in details later in the article.

Prerequisites

  1. Download the latest Elektron SDK Java Package
  2. Declare the service sent by the NIP in TREP configuration files of both TREP sites for connection failover feature. This process must be executed by your TREP administration team. For information, please refer to TREP Configuration for NI Providers which explains the steps involved in this process. If TREP administration team requires any assistance regarding configuring TREP, they can submit the query to TREP support team directly via Get Support in MyRefinitiv. In this article, we will use the service named NI_PUB which is one of the default service configured in EmaConfig.xml shipped with Elektron SDK Package. 
  3. In this article, I will use the default configuration for NIP which is Provider_1 node in EmaConfig.xml file shipped with Elektron SDK package. Provider_1 node consists of all ADH server hosts and ports where NIP will connect to and source directory info which EMA will publish to ADH automatically. To make EMA reads the file, put the configuration file in the working directory. Then, modify it to:
    • Under NiProvider named Provider_1, replace <Channel value="Channel_3"/> with <ChannelSet value="Channel_3, Channel_6"/> to set EMA performs connection failover according to a list of failover servers. For example, when ADH specified in Channel_3 goes down, EMA tries to connect to ADH specified in Channel_6.
    • Modify Host and Port of Channel_3 used by the default Provider_1 to be your primary ADH
    • Copy Channel_3 and rename it to be Channel_6, modify Host and Port to be your secondary ADH
    • By default, Service NI_PUB supports the capabilities: 6(Market Price) and MMT_MARKET_BY_ORDER. Hence, add the remaining level 2 data which are MMT_MARKET_BY_PRICE and 9(Market Maker)

The EMA configuration should be like the below:

<NiProviderList>	
	<NiProvider>
      		<Name value="Provider_1"/>
	  	…
     		<ChannelSet value="Channel_3, Channel_6"/>
		<Directory value="Directory_1"/>
		…
	…
<ChannelList>	
	…
	<Channel>
		<Name value="Channel_3"/>
		…
		<Host value="192.168.27.11"/>
		<Port value="14003"/>
	</Channel>     
	 …
	<Channel>
		<Name value="Channel_6"/>
		<ChannelType value="ChannelType::RSSL_SOCKET"/>
		<GuaranteedOutputBuffers value="5000"/>
		<ConnectionPingTimeout value="30000"/>
		<TcpNodelay value="1"/>
		<Host value="192.168.27.12"/>
		<Port value="14003"/>
	</Channel>
	…
	<Directory>
		<Name value="Directory_1"/>
		<Service>
			<Name value="NI_PUB"/>	
			<InfoFilter>
				…	
         			<Capabilities>
					<CapabilitiesEntry value="6"/>
					<CapabilitiesEntry value="MMT_MARKET_BY_ORDER"/>
					<CapabilitiesEntry value="MMT_MARKET_BY_PRICE"/>
					<CapabilitiesEntry value="9"/>
				</Capabilities>		      
	…
…

In the example configuration above, EMA tries to connect to the first channel, Channel_3 (ADH on IP 192.168.27.11), specified in the ChannelSet. Whenever EMA fails to connect to Channel_3, it will try to connect to the next Channel, Chanel_6 (ADH on IP 192.168.27.12).

 The Non-Interactive Provider Sequence Diagram

The figures below are the NIP sequence diagram which shows the interaction between NIP and ADH. The diagram also covers all tasks performed by NIP which we will develop as mentioned in the Introduction section. 

  1. The normal situation without any failures

For more details of permission data (Dacs Lock) mentioned in the optional step 5th, please refer to DACS Lock Developers Guide

  1. Failure situation that makes NIP disconnects e.g. ADH goes down, the network problem occurs.

Note: A green arrow and orange arrow represents NIP task and ADH task respectively while the gray arrow represents the EMA task. 

To make a simple NIP and focus on the features and the tasks, we let EMA sends automatically a source directory info configured in the EmaConfig.xml file. Alternatively, this can be performed programmatically by the application itself as demonstrated in example300__MarketPrice__Streaming shipped with Elektron SDK Java Packages  

Source Code

This section is to explain how to develop a NIP application using EMA Java in Scala language according to The Non-Interactive Provider Sequence Diagram section. Our example application consists of 2 categories:

  • Data classes which are MarketPrice, MarketByPrice, MarketByOrder, and MarketMaker. Each class contains all the relevant information regarding an item of each domain type. They are used as the source of canned data for the NIP. Each class has the methods to generate the content of the payload contained in a refresh and an update message according to its domain type i.e. Market Price, Market By OrderMarket By Price and Market Maker
  • NiProvider object and AppClient class. NiProvider is the main class of this application. It publishes data generated by the methods in the data classes, MarketPrice, MarketByPrice, MarketByOrder and MarketMaker to ADH according to the input domainType application’s parameter. NiProvider object performs all tasks shown in The Non-Interactive Provider Sequence Diagram section(green arrows). AppClient class receives and updates the login/connection events to update the ADH connection status if it is ready to be published.  The status is used by NiProvider object.

The overview steps to develop the application are listed below:

  1. Create MarketPrice class in MarketPrice.scala file to generate Market Price data as an example shown below: 
    class MarketPrice {
      //Refresh message contains all fields below
      //field id 1 PROD_PERM(PERMISSION) will be added in the refresh message if refreshPE > 0
      private var dsplyName: String = "DUMMY MP" //field id 3
      private var currency: Int = 764 //field id 15, 764 is Thai baht
      private var bid: Int = 1875 //field id 22, 2 decimals
      private var ask: Int = 1901 //field id 25, 2 decimals
      private var bidSize: Int = 221900 //field id 30
      private var askSize: Int = 125920 //field id 31
      
      //The method sets new value of each field in a refresh and an update message
      def updateData() {
        bid = bid + 111;
        ask = ask + 121;
        bidSize = bidSize + 120
        askSize= askSize + 110
      }
      
      //The method returns Market Price data contained in the payload of a refresh message
      def generateMPRefresh(refreshPE: Long, update: Boolean) : FieldList =   {
          //updateData() is invoked after fail over is successful, 
          //then creates the payload of a refresh message with new values
          //otherwise, use the initialized values
          if(update)
            updateData()
            
          //Create a field list  
          val fieldList:FieldList = EmaFactory.createFieldList();
          if(refreshPE > 0) {
            fieldList.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))
          }      
          fieldList.add( EmaFactory.createFieldEntry().rmtes(3, ByteBuffer.wrap(dsplyName.getBytes())))
          fieldList.add( EmaFactory.createFieldEntry().enumValue(15,  764))
          fieldList.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))
          fieldList.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))
          fieldList.add( EmaFactory.createFieldEntry().real(30,bidSize,MagnitudeType.EXPONENT_0))
          fieldList.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))
          return fieldList 
       }
      
      //The method returns Market Price data contained in the payload of an update message
      def generateMPUpate() : FieldList =   {
          //Set new value to each field
          updateData()
          
          //Create a field list
          val fieldList:FieldList = EmaFactory.createFieldList();   
          fieldList.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))
          fieldList.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))
          fieldList.add( EmaFactory.createFieldEntry().real(30,bidSize,MagnitudeType.EXPONENT_0))
          fieldList.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))
          return fieldList 
       }
      
      
    }

     

  2. Create MarketByOrder class in MarketByOrder.scala file to generate Market By Order data as an example shown below: 
    class MarketByOrder {
      //Summary data also contains field id 1709 and 3423 with its fixed value.
      //field id 1 PROD_PERM(PERMISSION) will be added in summary data if refreshPE > 0
      private val dsplyName: String = "DUMMY MBO" //field id 3
      private val currency: Int = 0 //field id 15
      private var activeDate: Calendar = null //field id 17
      
      //The key for each map entry
      private var entryKey: Long = 12102
      
      //MapEntry data also contains field id 6522 
      //which is the same value as the activeDate variable
      private var orderId: Long = 1072130 //field id 3426
      private var orderPrc: Int = 7925 //field id 3427, 2 decimals
      private var orderSide: Short = 1 //field id 3428, BID(1) or ASK(2)
      private var orderSize: Int = 500 //field id 3429
      private var prTimMs: Long =  4800050//field id 6520
    
      
      //The method sets new value of each field in a refresh and an update message
      def updateData() {
        orderId = orderId +10
        orderPrc = orderPrc + 121
        if(orderSide == 1)
          orderSide = 2
        else 
          orderSide = 1
        orderSize= orderSize + 100
        prTimMs = prTimMs + 1000
        entryKey = entryKey + 9
      }
      
      //The method returns Market by Order data contained in the payload of a refresh message
      def generateMBORefresh(refreshPE: Long, update: Boolean) : Map =   {
        //updateData() is invoked after fail over is successful, 
        //then creates the payload of a refresh message with new values
        //otherwise, use the initialized values
        if(update)
            updateData()
            
        //Create summary data
        val mapSummaryData: FieldList  = EmaFactory.createFieldList();
        if(refreshPE > 0) {
            mapSummaryData.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))
         }
        
        mapSummaryData.add( EmaFactory.createFieldEntry().rmtes(3, ByteBuffer.wrap(dsplyName.getBytes())))
    		mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(15,  764));
        activeDate = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
        mapSummaryData.add( EmaFactory.createFieldEntry().date(17, 
                      //month is zero based so + 1
                      activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1, activeDate.get(Calendar.DATE)))
    		mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(1709,  158));
    		mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(3423,  1));
    		val map: Map = EmaFactory.createMap();
    		map.summaryData(mapSummaryData);
    
    		//Create a map containing 5 map entries
    		for( a <- 1 to 5) {
    		  val entryData: FieldList = EmaFactory.createFieldList()
    		  entryData.add(EmaFactory.createFieldEntry().rmtes(3426, ByteBuffer.wrap(orderId.toString().getBytes())));
      		entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))		
      		entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));
      	  entryData.add(EmaFactory.createFieldEntry().real(3429, orderSize, MagnitudeType.EXPONENT_0))
      	  entryData.add(EmaFactory.createFieldEntry().uintValue(6520,prTimMs))
      		entryData.add( EmaFactory.createFieldEntry().date(6522, 
      		              //month is zero based so + 1
                        activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1 , activeDate.get(Calendar.DATE)))
      		map.add(EmaFactory.createMapEntry().keyAscii(entryKey.toString(), MapEntry.MapAction.ADD, entryData));
      	  if (a < 5)
    			  updateData();
    		}
        return map 
       }
      
      //The method returns Market by Order data contained in the payload of an update message
      def generateMBOUpdate() : Map =   {
        //Set new value to each field
        updateData();
        
        //Create a map containing 1 map entry
    		val map: Map = EmaFactory.createMap();
    		val entryData: FieldList = EmaFactory.createFieldList()
    		entryData.add(EmaFactory.createFieldEntry().rmtes(3426, ByteBuffer.wrap(orderId.toString().getBytes())));
      	entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))		
      	entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));
      	entryData.add(EmaFactory.createFieldEntry().real(3429, orderSize, MagnitudeType.EXPONENT_0))
      	entryData.add(EmaFactory.createFieldEntry().uintValue(6520,prTimMs))
      	entryData.add( EmaFactory.createFieldEntry().date(6522, 
      		              //month is zero based so + 1
                        activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1 , activeDate.get(Calendar.DATE)))
      	map.add(EmaFactory.createMapEntry().keyAscii(entryKey.toString(), MapEntry.MapAction.ADD, entryData));
        return map 
      }
    }

     

  3. Create MarketByPrice class in MarketByPrice.scala file to generate Market By Price data as an example shown below: 
    class MarketByPrice {
      //Summary data 
      //field id 1 PROD_PERM(PERMISSION) will be added in summary data if refreshPE > 0
      private val dsplyName: String = "DUMMY MBP" //field id 3
      private val currency: Int = 764 //field id 15, 764 is Thai baht
      private var activeDate: Calendar = null //field id 17
      private var seqNum: Int = 450596 //field id 1021
      private var timActMs: Long = 30802966 //field id 4148
      
      //The key for each map entry
      private var entryKey: String = "113.05A" //it is orderPrc + A e.g. 113.05A
      
      //MapEntry data
      private var orderPrc: Int = 11305 //field id 3427, 2 decimals so it is 113.05
      private var orderSide: Short = 1 //field id 3428, BID(1) or ASK(2)
      private var noOrd: Int = 1 //field id 3430
      private var accSize: Int = 1525//field id 4356
      private var lvTimMs: Long = 18009780 //field id 6527
      
     //The method sets new value of each field in a refresh and an update message
      def updateData(updatedSum: Boolean) {
        //if updateSum is true, summary data's fields in a message are new values 
        if(updatedSum) {
          timActMs = timActMs +1000
          seqNum = seqNum + 25
        }
        
        orderPrc = orderPrc + 213
        if(orderSide == 1)
          orderSide = 2
        else 
          orderSide = 1
        accSize= accSize + 147
        noOrd = noOrd + 1
        lvTimMs = lvTimMs + 1000
        
        //generate entry key based on orderPrc
        entryKey = orderPrc.toString().substring(0,3) + "." +orderPrc.toString().substring(3) + "A"
      }
       //The method returns Market by Price data contained in the payload of a refresh message
      def generateMBPRefresh(refreshPE: Long, update: Boolean) : Map =   {
        //updateData() is invoked after fail over is successful, 
        //then creates the payload of a refresh message with new values
        //otherwise, use the initialized values
        if(update)
            updateData(true)
            
        //Create summary data
        val mapSummaryData: FieldList  = EmaFactory.createFieldList();
        if(refreshPE > 0) {
            mapSummaryData.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))
         }
        
        mapSummaryData.add( EmaFactory.createFieldEntry().rmtes(3, ByteBuffer.wrap(dsplyName.getBytes())))
    		mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(15,  764));
        activeDate = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
        mapSummaryData.add( EmaFactory.createFieldEntry().date(17, 
                      //month is zero based so + 1
                      activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1, activeDate.get(Calendar.DATE)))
    		mapSummaryData.add(EmaFactory.createFieldEntry().real(1021,seqNum,MagnitudeType.EXPONENT_0))
    		mapSummaryData.add(EmaFactory.createFieldEntry().uintValue(4148, timActMs))
    		val map: Map = EmaFactory.createMap()
    		map.summaryData(mapSummaryData);
    		
        //Create a map containing 5 map entries
    		for( a <- 1 to 5) {
    		  val entryData: FieldList = EmaFactory.createFieldList()
      		entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))		
      		entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));
      	  entryData.add(EmaFactory.createFieldEntry().uintValue(3430, noOrd))
      	  entryData.add(EmaFactory.createFieldEntry().real(4356,accSize,MagnitudeType.EXPONENT_0))
      		entryData.add( EmaFactory.createFieldEntry().uintValue(6527, lvTimMs))
      		map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));
      	  if (a < 5)
    			  updateData(false)
    		}
        return map 
       }
      
      //The method returns Market by Price data contained in the payload of an update message
      def generateMBPUpdate() : Map =   {
        //Set new value to each field
        updateData(true)
        
        //Create summary data
        val mapSummaryData: FieldList  = EmaFactory.createFieldList();
        mapSummaryData.add(EmaFactory.createFieldEntry().real(1021,seqNum,MagnitudeType.EXPONENT_0))
    		mapSummaryData.add(EmaFactory.createFieldEntry().uintValue(4148, timActMs))
    		val map: Map = EmaFactory.createMap();
        map.summaryData(mapSummaryData);
        
         //Create a map one map entry
    		val entryData: FieldList = EmaFactory.createFieldList()
    		entryData.add(EmaFactory.createFieldEntry().real(3427, orderPrc, MagnitudeType.EXPONENT_NEG_2))		
        entryData.add(EmaFactory.createFieldEntry().enumValue(3428, orderSide));
      	entryData.add(EmaFactory.createFieldEntry().uintValue(3430, noOrd))
      	entryData.add(EmaFactory.createFieldEntry().real(4356,accSize,MagnitudeType.EXPONENT_0))
      	entryData.add( EmaFactory.createFieldEntry().uintValue(6527, lvTimMs))
      	map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));
        return map 
      }
    }

     

  4. Create MarketMaker class in MarketMaker.scala  file to generate Market Maker data as an example shown below:
    class MarketMaker {
      //SummaryData also contains field id 1709 with fixed value
      //field id 1 PROD_PERM(PERMISSION) will be added in summary data if refreshPE > 0
      private var activeDate: Calendar = null //field id 17
      private var timActMs: Long = 75610613 //field id 4148
      
      //MapEntry data
      private var bid: Int = 4115 //field id 22, 2 decimals
      private var ask: Int = 4282 //field id 25, 2 decimals
      private var bidSize: Int = 100 //field id 30
      private var askSize: Int = 110 //field id 31
      private var askTimMs: Long = 43459027 //field id 4147
      private var bidTimMs: Long = 42911172 //field id 4150
      
      //the prefix for an entryKey
      private val prefix: String = "ARIC_"
      private var entryKey: String = ""
      
      //generate an entryKey for a map entry
      def generateEntryKey(): String = {
        //random 3 characters from A to Z 
        val subfix: Array[Char] = Array( (Random.nextInt(26) + 65).toChar , (Random.nextInt(26) + 65).toChar,  (Random.nextInt(26) + 65).toChar )
        //Example entryKey:ARIC_KRR, ARIC_VXQ
        return(prefix + subfix.mkString)
     }
    
      //The method sets new value of each field in a refresh and an update message
      def updateData() {
        bid = bid + 211;
        ask = ask + 221;
        bidSize = bidSize + 10
        askSize= askSize + 15
        bidTimMs = bidTimMs + 1000
        askTimMs = askTimMs + 1000
      }
      
      //The method returns Market Maker data contained in the payload of a refresh message
      def generateMMRefresh(refreshPE: Long, update: Boolean) : Map =   {
        //updateData() is invoked after fail over is successful, 
        //then creates the payload of a refresh message with new values
        //otherwise, use the initialized values
        if(update)
            updateData()
        
        //Create a sumary data
        val mapSummaryData: FieldList  = EmaFactory.createFieldList();
        if(refreshPE > 0) {
            mapSummaryData.add( EmaFactory.createFieldEntry().uintValue(1, refreshPE))
         }
        activeDate = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
        mapSummaryData.add( EmaFactory.createFieldEntry().date(17, 
                      //month is zero based so + 1
                      activeDate.get(Calendar.YEAR), activeDate.get(Calendar.MONTH) +1, activeDate.get(Calendar.DATE)))
    		mapSummaryData.add(EmaFactory.createFieldEntry().enumValue(1709,  320));
    		mapSummaryData.add(EmaFactory.createFieldEntry().uintValue(4148,  timActMs));
    		
    		//Create a map containing 5 map entries
    		val map: Map = EmaFactory.createMap()
    		map.summaryData(mapSummaryData)
    	  for( a <- 1 to 5) { 
    		  val entryData: FieldList = EmaFactory.createFieldList()
    		  entryData.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))
          entryData.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))
          entryData.add( EmaFactory.createFieldEntry().real(30, bidSize,MagnitudeType.EXPONENT_0))
          entryData.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))
          entryData.add(EmaFactory.createFieldEntry().uintValue(4147,askTimMs))
          entryData.add(EmaFactory.createFieldEntry().uintValue(4150,bidTimMs))
          entryKey = generateEntryKey()
      		map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));
      	  if (a < 5)
    			  updateData();
    		}
        return map 
       }
      
      //The method returns Market Maker data contained in the payload of an update message
      def generateMMUpdate() : Map =   {
        //Set new value to each field
        updateData();
        
        //Create a map containing 1 map entry
    		val map: Map = EmaFactory.createMap();
    		val entryData: FieldList = EmaFactory.createFieldList()
    		entryData.add( EmaFactory.createFieldEntry().real(22, bid, MagnitudeType.EXPONENT_NEG_2))
        entryData.add( EmaFactory.createFieldEntry().real(25, ask, MagnitudeType.EXPONENT_NEG_2))
        entryData.add( EmaFactory.createFieldEntry().real(30, bidSize,MagnitudeType.EXPONENT_0))
        entryData.add( EmaFactory.createFieldEntry().real(31, askSize, MagnitudeType.EXPONENT_0))
        entryData.add(EmaFactory.createFieldEntry().uintValue(4147,askTimMs))
        entryData.add(EmaFactory.createFieldEntry().uintValue(4150,bidTimMs))
        entryKey = generateEntryKey()
      	map.add(EmaFactory.createMapEntry().keyAscii(entryKey, MapEntry.MapAction.ADD, entryData));
        return map 
      }
    }

     

  5. Create NiProvider.scala  file which contains following:

               A.NiProvider object contains the main (args:Array[String]) which extracts the following program parameters from the command line:

  • -nipNode <NiProviderName>. The Name of NiProvider node in EmaConfig.xml. The default is Provider_1. The node contains the IP and port of ADH server(s) where the application connects to.
  • -service <service_name>. The Service name. The default is NI_PUB.
  • -user <name>. The Name of the application user. The default is user.
  • -domainType <domain_type>. The Domain Type of the published item. The default is MARKET_PRICE. The valid Domain Types supported by this application are MARKET_PRICE, MARKET_BY_ORDER, MARKET_BY_PRICE, MARKET_MAKER.
  • -itemName <a RIC>. The published RIC name. The default is DUMMY.N.
  • -updateTimes <number_updates>. The number of updates to be published. The default is 10 updates.
  • -updateInterval <update_interval_in_sec>. The Update interval in seconds. The default is 5 seconds.
  • -refreshPE <a PE>. a PE for refresh messages. This requires dacsServiceId. The default is no PE.
  • -dacsServiceId <a service id>. The serviceID to generate DACS lock for the PE. The default is no serviceID.

Then, the object performs the tasks according to  The Non-Interactive Provider Sequence Diagram section (green arrows). It calls the method in AppClient class to get the current ADH connection status. The snipped source code in the main (args:Array[String]) method:

def main (args:Array[String]) {
     
     //Extract program parameters from the command line.
     getCommandLineOptions(args);
	  
     //Show all parameters used by the application
     showAllParameters();
     
     //EMA part to listen to the login events/connection events, publish data
     //according to the parameters from the command line
     var config: OmmNiProviderConfig = null
     var provider: OmmProvider = null
     val nipClient: AppClient  = new AppClient()
     var sendRefreshMsg:Boolean = false
     try {
        //Read a Non-Interactive Provider configuration's node named nipNode's value in EmaConfig.xml
        //the node should contain the ChannelSet parameter for fail over process 
        //Hence, when the first ADH is down, EMA tries to connect to the second ADH defined in the ChannelSet parameter 
        config  = EmaFactory.createOmmNiProviderConfig().providerName(nipNode);
        
        //add the second parameters, nipClient instance, containing the callback methods to process events e.g. login events
        //The events are generated when the login stream changes including the change of the connection state(connection is up or down)  
        provider = EmaFactory.createOmmProvider(config.username(user).operationModel(OmmNiProviderConfig.OperationModel.USER_DISPATCH),nipClient)
        
        //dispatch the login events
        provider.dispatch( 1000000 );
        
        //create a refresh message according to the input domain type
        val aRefreshMsg: RefreshMsg  = EmaFactory.createRefreshMsg().serviceName(service).name(itemName).domainType(domainType)
					.state(OmmState.StreamState.OPEN, OmmState.DataState.OK, OmmState.StatusCode.NONE, "UnSolicited Refresh Completed")
					.complete(true);
        
       //check if permissionData/DacsLock must be created
       if(addPermissionData) {
         //if yes, create a DacsLock according to a service id and a PE
		     permData = createDacsLock(dacsServiceId,refreshPE)
       }
       //if permissionData/DacsLock is created successfully
		   if(permData != null) {
		      //add PROD_PERM field(refreshPE) and other fields according to the domain type in the payload of refresh message
		     //the values of all fields are the initialized values(false)
          aRefreshMsg.payload(domainTypeMapRefresh(domainType)(refreshPE,false))
          //add permissionData/DacsLock in the refresh message
		      aRefreshMsg.permissionData(permData.data());
		   }
		   //if permissionData/DacsLock is not created or failed to create it.
		   else {
		     //inform the refresh message does not have permData and PROD_PERM field
		     println(NO_PERMISSSON_PRODPERM)
		     //not add PROD_PERM field(0) 
		     //but add other fields according to the domain type in the payload of refresh message
		     //the values of all fields are the initialized values(false)
		     aRefreshMsg.payload(domainTypeMapRefresh(domainType)(0,false))
		   }
		   //publish the first data message, refresh message
			 provider.submit( aRefreshMsg, itemHandle);
			 println("a Refersh Msg has been published");
			 //waiting for ADH acknowledges that the nip service is up and get the refresh message
			 Thread.sleep(5000);
			 var anUpdateMsg: UpdateMsg = null
			 var num: Int = 0
			 println("Start publishing " + updateTimes + " updates every " + updateInterval + " seconds.");
			 while(num < updateTimes) { //publish the number of update messages according to the updateTimes parameter
			    //if the next sent message is an update message(not refresh message)
			    if(sendRefreshMsg == false) {
			      //wait updateInterval seconds before publishing an update message 
			      Thread.sleep(updateInterval*1000) 
			    }
			    //Let EMA dispatches an event when there is any connection state changes(up to down or down to up). 
			    provider.dispatch( 1000000 ); 
			    //if ADH is ready to be published; the connection state is up
			    if ( nipClient.isConnectionUp())
				  { 
				    if ( sendRefreshMsg ) 
				    // when fail over from down ADH to up ADH is successful, re-send a refresh message to up ADH
					  {
				        //create a refresh message with new values(true)
				        aRefreshMsg.payload(domainTypeMapRefresh(domainType)(refreshPE,true))
				        //publish a refresh message
				        provider.submit( aRefreshMsg, itemHandle);
				        //mark flag not to send a refresh message again
			          sendRefreshMsg = false
					  }//Otherwise, send update message of the the input domain type.
			      //and increase the number of published update messages 
				    else {
			        anUpdateMsg =  EmaFactory.createUpdateMsg().serviceName(service).name(itemName).domainType(domainType)
			        anUpdateMsg.payload(domainTypeMapUpdate(domainType)())
			        provider.submit( anUpdateMsg, itemHandle)  
			        num += 1
				    }
				  }
			    //if ADH is not ready to be published(connection state is down),
			    //wait EMA tries to connect to the next ADH specified in ChannelSet
			    //and mark flag to send a refresh message after connect to the next ADH successfully
			    else {
			      sendRefreshMsg = true;
			    }
			 }
			 println("All " + updateTimes + " updates have been published ");
			 println("Close Stream item " + itemName);
			 //Send close status message to close the item stream.
			 provider.submit( EmaFactory.createStatusMsg().serviceName(service).name(itemName).domainType(domainType).state(OmmState.StreamState.CLOSED, OmmState.DataState.SUSPECT,
					OmmState.StatusCode.NONE, itemName + " Stream Closed"), itemHandle);
			 //Wait for that consumer applications will receive close stream item message after the last update message
			 Thread.sleep(10000);
			}
      catch {
        case excp @ (_: InterruptedException | _: OmmException) =>
          println("Exception:" +excp.getMessage)

      } finally if (provider != null) 
        //log out and disconnect from ADH 
        provider.uninitialize()
			
   }

B.AppClient class implementing OmmProviderClient which contains the callback methods to receive and process any messages e.g. refresh via onRefreshMsg(..) method, status via onStatusMsg(..) method. AppClient checks if an event informs the ADH connection is ready to be published or not and updates the connection status. The class exposes a method to get the status used by NiProvider object. The snipped source code in AppClient class:

class AppClient extends OmmProviderClient {
  //The variable keeping the connection state between non-interactive provider(NIP) and ADH
  //if the connection is ready to publish. 
  //true is connection up and NIP logs in successfully otherwise the variable is false.
  var _connectionUp: Boolean = false
	
	def isConnectionUp(): Boolean = {
		return _connectionUp;
	}
		
	def onRefreshMsg(refreshMsg: RefreshMsg, event: OmmProviderEvent)
	{
    println(refreshMsg);
		//After NIP can connect to ADH and logs in, 
    //it receives a message indicating that the login is successful.
		if ((refreshMsg.state().streamState() == OmmState.StreamState.OPEN) 
		    && (refreshMsg.state().dataState() == OmmState.DataState.OK))
		  _connectionUp = true;
		//otherwise fail to connect or logs in to ADH
		else 
		  _connectionUp = false;
	}
	
	def onStatusMsg(statusMsg: StatusMsg , event: OmmProviderEvent) 
	{
	 
	  println(statusMsg);
		if (statusMsg.hasState())
		{
		   //After NIP can connect to ADH and logs in, 
      //it receives a message indicating that the login is successful.
			if (( statusMsg.state().streamState() == OmmState.StreamState.OPEN) 
			    && (statusMsg.state().dataState() == OmmState.DataState.OK))
				  _connectionUp = true
			//otherwise fail to connect or logs in to ADH
		   else 
		     _connectionUp = false	
		}
	}
	
	def onGenericMsg(genericMsg: GenericMsg, event: OmmProviderEvent){}
	def onAllMsg(msg:Msg ,event:  OmmProviderEvent){}
	def onPostMsg(postMsg: PostMsg,providerEvent:  OmmProviderEvent) {}
	def onReqMsg(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {}
	def onReissue(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {}
	def onClose(reqMsg: ReqMsg ,providerEvent:  OmmProviderEvent) {}
}

You can download the complete NIP Java application source code explained above at  TR-API-Samples/Article.EMA.Scala.NonInteractiveProvider

Build and Run the application

EMA, ETA, and EMA's dependency library files are required to build and run the EMA Java NIP application. Fortunately, all files are shipped with the Elektron-SDK Java package. Hence, you just set the CLASSPATH to all of them before building and running the application.

The following example command lines show how to set the CLASSPATH, build and run the application on Windows: 

  1. Set the classpath to EMA, ETA, and EMA's dependency library files. For example:
    set CLASSPATH=C:\Elektron-SDK\Java\Ema\Libs\ema-3.2.2.0.jar;C:\Elektron-SDK\Java\Eta\Libs\upa-3.2.2.0.jar;C:\Elektron-SDK\Java\Eta\Libs\upaValueAdd-3.2.2.0.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Eta\Libs\jdacsUpalib.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Ema\Libs\apache\commons-collections-3.2.2.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Ema\Libs\apache\commons-configuration-1.10.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Ema\Libs\apache\commons-lang-2.6.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Ema\Libs\apache\commons-logging-1.2.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Ema\Libs\SLF4J\slf4j-1.7.12\slf4j-api-1.7.12.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Ema\Libs\SLF4J\slf4j-1.7.12\slf4j-jdk14-1.7.12.jar;C:\Elektron-SDK\Elektron-SDK-BinaryPack\Java\Ema\Libs\xpp3-1.1.4c.jar

     

  2. Compile the application using scalac command: 
    scalac com\thomsonreuters\ema\example\scala\*.scala

     

  3. Run the application:
  • Include the path containing the Consumer class files generated from step 2 to the classpath:
    set CLASSPATH=.;%CLASSPATH%

     

  • Run the application using scala command. For example:
    scala com.thomsonreuters.ema.example.scala.NiProvider -user pimchayaProv -itemName NIP.MP -refreshpe 351 -dacsServiceId 4811

     

The command above is to log in with the user named pimchayaProv to publish the default domain type (Market Price) with the item named NIP.MP to the default service named NI_PUB. Specify a refresh message contains the permission data consisting of PE 351 and the service Id 4811, Hence, the field named PROD_PERM(PERMISSION field which field id is 1) with value 351(PE) is added. Moreover, the application publishes in the default rate which is 10 updates every 5 seconds.

The Example Result

When you run the NIP application to publish Market Price, you should see the messages like below:

The application is using the following parameters:
nipNode=Provider_1
service=NI_PUB
user=pimchayaProv
domainType=MARKET_PRICE
itemName=NIP.MP
updateTimes=10
updateInterval=5
refreshPE=351
dacsServiceId=4811

Jan 08, 2019 4:12:53 PM com.thomsonreuters.ema.access.ChannelCallbackClient reactorChannelEventCallback
INFO: loggerMsg
    ClientName: ChannelCallbackClient
    Severity: Info
    Text:    Received ChannelUp event on channel Channel_3
        Instance Name Provider_1_1
        Component Version adh3.0.6.L1.solaris.tis.rrg 64-bit
loggerMsgEnd


RefreshMsg
    streamId="1"
    domain="Login Domain"
    solicited
    RefreshComplete
    state="Open / Ok / None / 'Login accepted by host shogun.'"
    itemGroup="00 00"
    name="pimchayaProv"
    nameType="1"
    Attrib dataType="ElementList"
        ElementList
            ElementEntry name="ApplicationId" dataType="Ascii" value="256"
            ElementEntry name="SupportProviderDictionaryDownload" dataType="UInt" value="1"
        ElementListEnd
    AttribEnd
RefreshMsgEnd

a Refersh Msg has been published
Start publishing 10 updates every 5 seconds.
All 10 updates have been published
Close Stream item NIP.MP

While the NIP application is running, you can run the EMA Java Consumer named example100__MarketPrice__Streaming to verify the NIP's service and its Market Price data. The Consumer application must connect to ADS (one component of TREP) connecting to ADH which the NIP connect to. According to the example command line, you need to modify Consumer.java to request to the service named NI_PUB and the item named NIP.MP. The example result shown in example100__MarketPrice__Streaming:

Jan 08, 2019 4:13:04 PM com.thomsonreuters.ema.access.ChannelCallbackClient reactorChannelEventCallback
INFO: loggerMsg
    ClientName: ChannelCallbackClient
    Severity: Info
    Text:    Received ChannelUp event on channel Channel_1
	Instance Name Consumer_1_1
	Component Version ads3.0.6.L1.solaris.tis.rrg 64-bit
loggerMsgEnd


RefreshMsg
    streamId="5"
    domain="MarketPrice Domain"
    solicited
    RefreshComplete
    state="Open / Ok / None / 'UnSolicited Refresh Completed'"
    itemGroup="00 01"
    permissionData="03 12 cb 35 1c"
    name="NIP.MP"
    nameType="1"
    serviceId="4811"
    serviceName="NI_PUB"
    Payload dataType="FieldList"
        FieldList FieldListNum="0" DictionaryId="0"
            FieldEntry fid="1" name="PROD_PERM" dataType="UInt" value="351"
            FieldEntry fid="3" name="DSPLY_NAME" dataType="Rmtes" value="DUMMY MP"
            FieldEntry fid="15" name="CURRENCY" dataType="Enum" value="764"
            FieldEntry fid="22" name="BID" dataType="Real" value="18.75"
            FieldEntry fid="25" name="ASK" dataType="Real" value="19.01"
            FieldEntry fid="30" name="BIDSIZE" dataType="Real" value="221900.0"
            FieldEntry fid="31" name="ASKSIZE" dataType="Real" value="125920.0"
        FieldListEnd
    PayloadEnd
RefreshMsgEnd

UpdateMsg
    streamId="5"
    domain="MarketPrice Domain"
    updateTypeNum="0"
    name="NIP.MP"
    serviceId="4811"
    serviceName="NI_PUB"
    Payload dataType="FieldList"
        FieldList
            FieldEntry fid="22" name="BID" dataType="Real" value="19.86"
            FieldEntry fid="25" name="ASK" dataType="Real" value="20.22"
            FieldEntry fid="30" name="BIDSIZE" dataType="Real" value="222020.0"
            FieldEntry fid="31" name="ASKSIZE" dataType="Real" value="126030.0"
        FieldListEnd
    PayloadEnd
UpdateMsgEnd

UpdateMsg
    streamId="5"
    domain="MarketPrice Domain"
    updateTypeNum="0"
    name="NIP.MP"
    serviceId="4811"
    serviceName="NI_PUB"
    Payload dataType="FieldList"
        FieldList
            FieldEntry fid="22" name="BID" dataType="Real" value="20.97"
            FieldEntry fid="25" name="ASK" dataType="Real" value="21.43"
            FieldEntry fid="30" name="BIDSIZE" dataType="Real" value="222140.0"
            FieldEntry fid="31" name="ASKSIZE" dataType="Real" value="126140.0"
        FieldListEnd
    PayloadEnd
UpdateMsgEnd

...

Troubleshooting

  • The NIP application shows "<user>, unknown to system."
StatusMsg
    streamId="1"
    domain="Login Domain"
    state="Closed / Suspect / Not entitled / 'pimchayaProvx, unknown to system.'"
StatusMsgEnd

Resolution: The application logs in with the user who does not have the permission to access ADH. Please contact your infrastructure administrator or the Account team to ask for a valid username.

  • The NIP application showsAttempt to submit initial RefreshMsg with a service name of NI_PUA that was not included in the SourceDirectory. Dropping this RefreshMsg.”
Exception:Attempt to submit initial RefreshMsg with service name of NI_PUA that was not included in the SourceDirectory. Dropping this RefreshMsg.

Resolution:  The service specified in the command line running NIP does not match with the service’s name of <Directory> node which the application publishes to ADH.

For example, In EMAConfig.xml

<Service>
	<Name value="NI_PUB"/>	
	….
</Service>

Hence, you should use -service NI_PUB in the command line running the application.

  • The Consumer application shows "Capability not supported" 
StatusMsg
    streamId="5"
    domain="MarketMaker Domain"
    state="Open / Suspect / None / 'Capability not supported'"
    name="NIP.MM"
    serviceName="NI_PUB"
StatusMsgEnd

Resolution: The consumer application requests an item from the NIP service which does not support the domain type of the item. In the example above, the service named NI_PUB does not support Market Maker domain type of the item named NIP.MM. Please make sure that <CapabilitiesEntry value="9"/> which is Market Maker has been added in NI_PUB service defined in EMAConfig.xml read by NIP.

  • The Consumer application shows "Service name of <service> is not found."
StatusMsg
    streamId="0"
    domain="MarketPrice Domain"
    state="Closed / Suspect / None / 'Service name of 'NI_PUBX' is not found.'"
    name="NIPX.MP"
    serviceName="NI_PUBX"
StatusMsgEnd

Resolution: The consumer application requests an item from the wrong service (NI_PUBX); NIP does not provide this service name. Please recheck the service name is correctly typed in with your infrastructure administrator or the Account team.

  • The NI application showsOmmNiProviderConfigImpl::providerName parameter [Provider_1] is an non-existent niprovider name
Exception:OmmNiProviderConfigImpl::providerName parameter [Provider_1] is an non-existent niprovider
 Name

Resolution: EMA cannot find Provider_1 configuration node because EMAConfig.xml containing find Provider_1 configuration node is not in the application run directory. Please put EMAConfig.xml in the application run directory.

Summary

After finishing this article, you will understand more about EMA, NIP application and how to develop a NIP application using EMA to publish data in Scala language. Also, you can modify this application to serve your requirements using EMA easier and faster. Moreover, the article includes the common problems and how to solve them. If you do not have TREP(ADH and ADS), please contact the Account team for the process and details. If you/your TREP administrator requires any TREP(ADH and ADS) assistance, you can contact the TREP support team directly by submitting your query to Get Support in MyRefinitiv

References