- Home
- Article Catalog
- How to discover the health of Elektron Real Time Infrastructure using EMA Java API

How to discover the health of Elektron Real Time Infrastructure using the EMA Java API
This article describes how downstream applications discover the health of Elektron Real Time Infrastructure, through the consumption of SPS (Service Provider Status) messages provided by the Elektron Real Time CHE (Collection Head End). It is intended for Java Developers using EMA to develop an application for monitoring health of Elektron Real Time Infrastructure. However, the general techniques outlined below could be applied to any of our real-time streaming APIs i.e. RFA C++/Java/.NET, ETA C++/Java and EMA C++. The users should have a basic understanding of how to consume data using EMA Java which can be learnt from EMA Consumer Tutorials.
Overview
Elektron Pulse is a real-time monitoring capability built into Elektron products to provide full transparency into the health of Refinitiv infrastructure; helping customers to improve accuracy of data and manage business risk. It is an innovative approach into service management, with full end-to-end service monitoring and operational status for Elektron to radically improve support levels and service manageability- fit for purpose in today’s high frequency, low latency trading environment.
Elektron Pulse provides real-time service information on infrastructural and distribution failures, the health is advertised through a feed, with the flexibility to be programmatically consumed into their monitoring system, or viewed on an interactive GUI provided by Refinitiv. Elektron Pulse displays Service Provider Status (SPS) messages provided on the Elektron Network.
In Elektron, the Collection Head End (CHE) is used to deliver market data from the venue. The CHE is composed of multiple sub-systems known as “line handler”. Each line handler is responsible for part of the venue data coverage. Each line handler advertises its data healthiness by publishing SPS message. In Elektron Pulse, the information of data healthiness is aggregated. A top-level SPS record acts as starting point for client to discover all CHE subProvider SPS. The high level overview is as follow:
The top level SPS RIC points to provider SPS records, which shows individual venues within the same region. Each provider SPS record represents one venue.
- A line handler publishes venue data, and one venue may have multiple line handlers which contribute to the venue. A provider SPS RICs representing a venue, points to a subProvider SPS(s) which represents a CHE line handler.
- Subprovider SPS RIC shows the health of one CHE line handler of a venue. A line handler publishes the venues data, and one venue may be contributed to by multiple line handlers.
- By aggregating the subProvider SPS information, we can get the health of a venue
Solution
The application can consume Elektron Real Time Infrastructure Health Status by the steps below:
- Subscribe to snapshot of a top-level SPS RIC and get the list of Provider SPS in a refresh message.
- Subscribe to snapshot of a Provider SPS and get the list of SubProvider SPS in a refresh message.
- Make a streaming Subscribtion of a SubProvider SPS and get it's health status in real time.
Data content from subscribing top-level SPS and Provider SPS are linkage of records which the application does not need to monitor in real time, because they do not change. Therefore, the application should subscribe to them as snapshot requests; only refresh message is returned with no update message.
An EMA Java application to consume Elektron Real Time Infrastructure Health Status is designed as per the figure below:
We will implement the application based on example210__MarketByOrder__Streaming. The example application includes source code to decode Map and field list data that we need in the application. The application will subscribe to a top-level SPS RIC, then show the Provider SPS list and wait for a Provider SPS which the user inputs. Next, subscribe to the input Provider SPS and show the SubProvider SPS list. Wait for a SubProvider SPS which the user inputs. Finally, subscribe to the input SubProvider SPS to get its health in real-time.
Solution Code
The EMA Java package can be downloaded at Refinitiv Real-time SDK- Java Downloads. The source code of example210__MarketByOrder__Streaming is in \Ema\Src\examples\java\com\thomsonreuters\ema\examples\training\consumer\series200\example210__MarketByOrder__Streaming. We will modify the file named Consumer.java. The steps to implement the above behaviour are the following:
1. Declare the following variables used in processing SPS data in AppClient class:
- Vector SPSList - keep the list of Provider SPS/SubProvider SPS
- int level - keep the current level that the application is processing so the application can act accordingly. For example: if level=1 or level=2, print the list of Provider SPS/SubProvider SPS and wait for a Provider SPS/SubProvider SPS RIC that the user inputs.
- boolean snapshot – a flag to request snapshot(true) or streaming(false).
- String spsRIC – holds the Provider SPS/SubProvider SPS RIC that the user inputs
- BufferedReader br – read an input Provider SPS/SubProvider SPS RIC
- OmmConsumer consumer – subscribe to a Provider SPS/SubProvider SPS RIC
class AppClient implements OmmConsumerClient
{
private Vector<String> SPSList;
private int level;
private boolean snapshot;
String spsRIC;
BufferedReader br;
OmmConsumer consumer;
...
}
2. Modify AppClient's Constructor to accept an OmmConsumer object and initialize the variables declared in the step 1. OmmConsumer provides the method for subscribing to Provider/SubProvider SPS.
class AppClient implements OmmConsumerClient
{
...
public AppClient(OmmConsumer cons) {
consumer=cons;
SPSList = new Vector<String>();
level=0;
snapshot=true;
spsRIC=null;
br = new BufferedReader(new InputStreamReader(System.in));
}
3. Modify main(String[] args) to subscribe to a snapshot of a top level SPS domain(11) or EmaRdm.MMT_SERVICE_PROVIDER_STATUS
try
{
consumer = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig().host("adsmachine").username("user"));
AppClient appClient = new AppClient(consumer);
//the RIC can be .[SPSEMEA, .[SPSASIA, .[SPSAMER
consumer.registerClient(EmaFactory.createReqMsg().domainType(11).serviceName("ELEKTRON_SERVICE").name(".[SPSEMEA").interestAfterRefresh(false), appClient);
Thread.sleep(600000); // API calls onRefreshMsg(), onUpdateMsg() and onStatusMsg()
}
4. After subscribing to a top level SPS, the application receives a refresh message, of which the payload is a map container. Each map entry contains a Provider SPS info and the key is a Provider SPS RIC i.e. .[SPSCHE-EDX as shown below:
<mapEntry flags="0x00" action="ADD" key=".[SPSCHE-EDX" >
<fieldList flags="0x08 (HAS_STANDARD_DATA)">
<fieldEntry fieldId="6456" data="0B"/>
<fieldEntry fieldId="6457" data="01"/>
<fieldEntry fieldId="6458" data="0A"/>
<fieldEntry fieldId="6459" data="00"/>
</fieldList>
</mapEntry>
To get the list of Provider SPS RIC, we have to get the key from each map entry. Likewise, to get the list of SubProvider SPS RIC, we have to get the key in each map entry of a refresh message returned after subscribing to an input Provider SPS RIC e.g. .[SPSCHE-EDX as shown below:
<mapEntry flags="0x00" action="ADD" key=".[SPSEDXL1" >
<fieldList flags="0x08 (HAS_STANDARD_DATA)">
<fieldEntry fieldId="6456" data="0B"/>
<fieldEntry fieldId="6457" data="01"/>
<fieldEntry fieldId="6458" data="0A"/>
<fieldEntry fieldId="6459" data="00"/>
</fieldList>
</mapEntry>
The map entry above contains a SubProvider SPS info and the key is a SubProvider SPS RIC i.e. .[SPSEDXL1. Therefore, we have to add a method named getKeyString(..) in AppClient class. The method is used to get the key in each map entry of Provider SPS and SubProvider SPS as explained above. Then, we will be able to get the list of Provider SPS and SubProvider SPS RIC.
String getKeyString(OmmBuffer ommBuf) {
ByteBuffer buffer = ommBuf.asHex();
StringBuilder asString = new StringBuilder();
int length =buffer.limit();
for (int i = buffer.position(); i < length; i++)
{
byte b = buffer.get(i);
asString.append((char)b);
}
return asString.toString();
}
5. In the decode(..) method which decodes Map, call the method getKeyString(..) to get the keys from the map of a refresh message. Then, extract and add the keys which are the Provider/SubProvider SPS RICs/names to the SPS list.
void decode(Map map)
{
...
SPSList.clear();
for (MapEntry mapEntry : map)
{
if (DataTypes.BUFFER == mapEntry.key().dataType())
{
System.out.println("Action: " + mapEntry.mapActionAsString() + " key value: " + EmaUtility.asHexString(mapEntry.key().buffer().buffer()));
String keyStr=getKeyString(mapEntry.key().buffer());
System.out.println("key="+keyStr);
SPSList.add(keyStr);
}
...
6. Add a method named requestNextlevel() in AppClient class. The method is used to verify if it should print the list of Provider/SubProvider SPS and wait for the user input after it receives a refresh message or not, as detailed below:
- level 1 : print List of Provider SPS RICs and flag to subscribe an input RIC as a snapshot. Then, wait for a user input Provider SPS RIC and returns true to inform the application to subcribe the user input RIC.
- level 2 : print List of SubProvider SPS RICs and flag to subscribe to an input RIC streaming. Then, wait for a user input SubProvider SPS RIC and returns true to inform the application to subcribe to the user input RIC.
- level 3 or higher: returns false to inform the application not to subcribe to any RIC.
public boolean requestNextlevel() {
String key="";
if(level==1) {
key = "Provider SPS";
snapshot=true;
} else if(level==2) {
key = "SubProvider SPS";
snapshot=false;
}
if(level<3) {
System.out.println("List of " + key + ": " );
for(String SPS : SPSList) {
System.out.print(SPS + ",");
}
System.out.println("\nEnter a " + key+ ": " );
try {
spsRIC = br.readLine();
}catch(IOException ie) {
ie.printStackTrace();
System.exit(1);
}
return true;
}
else
return false;
}
7. In onRefreshMsg(..) method, call the method requestNextlevel() after decoding a refresh message. The requestNextlevel() method prints the list of Provider/SubProvider SPS and wait for input. Finally, print the FieldList data which is the health status of the venue line handler(SubProvider SPS).
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent event)
{
...
if (DataType.DataTypes.MAP == refreshMsg.payload().dataType()) {
decode(refreshMsg.payload().map());
}
System.out.println();
+level;
if(requestNextlevel()){
consumer.registerClient(EmaFactory.createReqMsg().domainType(11).serviceName("API_ELEKTRON_EPD_RSSL").name(spsRIC).interestAfterRefresh(!snapshot), this);
} else {
System.out.println("=======Health Status of " + spsRIC + "==============");
}
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType())
decode(refreshMsg.payload().fieldList());
...
}
The application source code is available at Github. You can run the application using the following command line:
java -cp D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\ema.jar;D:\Elektron-SDK1.1.0.java.rrg\Eta\Libs\upa.jar;D:\Elektron-SDK1.1.0.java.rrg\Eta\Libs\upaValueAdd.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\org.apache.commons.collections.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\commons-configuration-1.10.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\commons-lang-2.6.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\apache\commons-logging-1.2.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\SLF4J\slf4j-1.7.12\slf4j-api-1.7.12.jar;D:\Elektron-SDK1.1.0.java.rrg\Ema\Libs\SLF4J\slf4j-jdk14-1.7.12.jar;. com.thomsonreuters.ema.examples.Consumer
The example output when you run the application:
The application will show a refresh containing map of Provider SPS RICs in EMEA Top level SPS. Then, enter a Provider SPS RIC from the list e.g. .[SPSCHE-EDX
The application will show a list of SubProvider SPS RICs under .[SPSCHE-EDX. Next, enter a SubProvider SPS RIC from the list e.g. .[SPSEDXL1
The application will show the health status of a SubProvider. In this example, SubProvider .[SPSEDXL1 has SPS_PV_STS field(field id 6479) is 1 means Line Handler/Provider is UP and SPS_FD_STS field(field id 6474) is 5 means the input feed connection status is UP. Please refer to the next section for further details.
The data content in a SubProvider
Unlike the Top and Provider level SPS items which carry linkage of records, the data content in SubProvider level SPS item returns FieldLists that reflect the health status of the venue line handlers. In Elektron, data of a venue is published by one or multiple line handlers, and data in each line handler should be discrete. Below is the extract of FIDs in the subProvider level item which client should be aware of.
CATEGORY | ACRONYM | FIELD ID | DATA TYPE | COMMENTS | ||
Provider Information | SPS_DESCR | 6471 | RMTES_STRING | Description of the venue line handler | ||
Timestamps | SPS_TME_MS | 6472 | UINT64 | GMT timestamp in milliseconds within a day, from the provider at the point of SPS transmission | ||
SPS Connectivity Detection Criteria | SPS_FREQ | 6473 | UINT64 | Frequency of this SPS publication in milliseconds | ||
SPS Fail Threshold | SPS_FAIL_T | 6478 | UINT64 | Used to inform SPS consumers how many consecutive missed heartbeats constitute a failure | ||
Venue Information | L_H_STAT | 8920 | Enum | Indication of Market activity for a Line Handler, used for SPS | ||
Value | Display | Meaning | ||||
0 | “ “ | Closed. No Market activity expected | ||||
1 | “OPEN” | Open. Market activity taking place, including pre/post market activity | ||||
Line Handler Status | SPS_PV_STS | 6479 | Enum | Provider status. Enum values are – | ||
Value | Display | Meaning | ||||
0 | “ ” | Undefined | ||||
1 | “UP” | Up | ||||
2 | “DOWN” | Down | ||||
3 | “UNAV” | Unavailable | ||||
Input Feed Connectivity Status | SPS_FD_STS | 6474 | Enum | Represent input feed connection status – | ||
3 | “UNDF” | Undefined. The status is Down. | ||||
4 | “TCPO” | TCP Session unexpectedly offline. The status is Down. | ||||
5 | “UDPU” | All UDP Channel pairs up. The status is Up. | ||||
6 | “UDPN” | All UDP Channel up, some not redundant. The status is Up. | ||||
7 | “UDPS” | Some UDP Channel pairs down. The status is Down. | ||||
8 | “UDPD” | All UDP Channel pairs down. The status is Down. | ||||
9 | “UNMO” | Unmonitored. The status is Up. | ||||
*All other ENUM values undefined and should be ignored by applications | ||||||
Input Gap | ARB_GAP_PD | 6475 | UINT64 | Sampling period in seconds of the gap count (x) | ||
Counters | ARB_GAPOUT | 5198 | UINT64 | Current number of outstanding arbitrator Gaps | ||
TTL_GAPOUT | 5524 | UINT64 | Total Number of daily post arbitration sequence gaps, not individual messages. | |||
Overall Pulse Status | VENUE_STAT | 6599 | Enum | Represent health group status of this publisher path. Enum values are – | ||
Value | Display | Meaning | ||||
0 | “ ” | Undefined | ||||
1 | “HEALTH” | Confirmed Healthy | ||||
2 | “STALE” | Confirmed Stale | ||||
3 | “SUSPCT” | Suspected. Have problem but may not affect service, E.g. loss resiliency | ||||
1 – Service status is healthy | ||||||
2 – Service status is stale. | ||||||
3 – Service status is suspected. Non-Service Affecting |
Conclusion
If you are planning to develop an application to monitor/discover the health of Elektron Real Time Infrastructure, you can use this article as a guide. EMA Java is one of the APIs(apart from ETA, RFA) which can be used to retrieve SPS messages. Implementing EMA Java to monitor the health of Elektron Real time is relatively quick and easy as shown in this article.
References
For further details, please check out the following resources:
- Elektron Pulse – Programmers Guide:
pulseprogrammersguidev2.5.pdf
- Refinitiv Real-time SDK - Java page.