Elektron SDK - Java

API Family: Elektron

EMA NI Provider - Publishing item statuses

Download tutorial source code

Click here to download

Last update January 2017
Compilers

JDK 1.7.x, JDK 1.8.x

Prerequisites

EMA NI Provider - Publishing updates

Declare the NI_PUB and TEST_NI_PUB services in your TREP (see Before you start).

Tutorial purpose

In this tutorial you will learn how to send status messages to indicate a change in the state of a data item published by your provider.

To this aim we will go through the following sections:

Introduction

For each data item published by your provider, you can send status messages as part of the data item streams. Unlike refresh and update messages, status messages do not publish a payload of data but rather information about the state of the item. This can be useful to inform your consumer applications that an item stream is closed or that the published data is temporarily not reliable (for example, because of a failover procedure).

In the next sections, you will learn how to build status messages and how to publish them for your data items.

Status messages

A Status message conveys information about the state of an item stream and the related item data. This information is contained in an OmmState object transported by the message. The OmmState object contains 3 information items:

  • The state of the item stream, via the static members of the StreamState class (e.g. OPEN, CLOSED, etc.).
  • The state of the data item, via the static members of the DataState class (OK, SUSPECT, etc.) and via the members of the StatusCode class (e.g. FAILOVER_STARTED, TIMEOUT, etc.).
  • An explanatory text of the status.

Note: Please refer to the EMA Java Reference Guide (see References) or more details about the OmmState class and its related enumerations.

Status messages are built using the createStatusMsg() method of the EmaFactory. The OmmState details are set via the state() method of the StatusMsg class. Here is an example:

    StatusMsg statusMsg = EmaFactory.createStatusMsg();

    statusMsg.state(
        OmmState.StreamState.OPEN,  // Stream state
        OmmState.DataState.OK,      // Data state 
        OmmState.StatusCode.NONE,   // Data status
        "All is good");             // Explanatory text

Use case examples

Below are two example of disaster recovery scenarios and the status messages a provider could possibly send to notify its consumer applications.

The context

A provider application publishes data items on a TREP for a single service. The application is also connected to several upstream systems that provide the source information for the published data items. Each upstream system provides data for a subset of items.

At the beginning of the scenarios below, all the item streams are open. The initial refresh message and some updates have been published for all of them.

Scenario 1

  1. The connection with one of the upstream systems breaks. Because of this failure, the data items that were fed by this system may not be up-to-date.
  2. In order to notify its consumer applications, the provider builds the following status message and publishes it on the stream of the concerned items.
        StatusMsg statusMsg = EmaFactory.createStatusMsg();
    
        statusMsg.state(
            OmmState.StreamState.OPEN,              // Stream state
            OmmState.DataState.SUSPECT,             // Data state 
            OmmState.StatusCode.FAILOVER_STARTED,   // Data status
            "Recovering the upstream connection - The current data may not be reliable."); // Explanatory text
    
        provider.submit(statusMsg, itemHandle);

    This message contains:

  • The item stream state that is still open (OPEN)
  • The data state that is suspect (SUSPECT) and the reason why (FAILOVER_STARTED)
  • The explanatory text.
  • Note: The consumer application does not need to manual unsubscribe and re-subscribe this item when receives this Stream: Open, Data: Suspect Status message from the API. The API and TREP infrastucture will recover this item stream for the application. 
  1. The provider successfully recovers the connection and starts receiving up-to-date data again.
  2. It builds and sends the following status message to notify the concerned consumers that everything is fine now.
        StatusMsg statusMsg = EmaFactory.createStatusMsg();
    
        statusMsg.state(
            OmmState.StreamState.OPEN,  // Stream state
            OmmState.DataState.OK,      // Data state 
            OmmState.StatusCode.NONE,   // Data status
            "All is good");             // Explanatory text
    
        provider.submit(statusMsg, itemHandle);

    This message contains:

  • The item stream state that is still open (OPEN)
  • The data state that is ok (OK). As it is ok, no status code is required (NONE)
  • An explanatory text.

 

Scenario 2 (Best practice)

The previous scenario works fine. However, in the context of a disaster recovery the best practice is to publish an unsolicited refresh message with an Ok state in place of the Ok status message sent in scenario 1. This allows to refresh the ADH cache with the latest values, ensuring it is in sync with your provider application. With this best practice in mind, we changed the scenario by publishing a refresh message and an associated Ok state to indicate that the system is fully recovered: 

  1. The connection with one of the upstream systems breaks. Because of this failure, the data items that were fed by this system may not be up-to-date.
  2. In order to notify its consumer applications, the provider builds the following status message and publishes it on the stream of the concerned items.
        StatusMsg statusMsg = EmaFactory.createStatusMsg();
    
        statusMsg.state(
            OmmState.StreamState.OPEN,              // Stream state
            OmmState.DataState.SUSPECT,             // Data state 
            OmmState.StatusCode.FAILOVER_STARTED,   // Data status
            "Recovering the upstream connection - The current data may not be reliable."); // Explanatory text
    
        provider.submit(statusMsg, itemHandle);

    This message contains:

  • The item stream state that is still open (OPEN)
  • The data state that is suspect (SUSPECT) and the reason why (FAILOVER_STARTED)
  • The explanatory text.
  • Note: The consumer application does not need to manual unsubscribe and re-subscribe this item when receives this Stream: Open, Data: Suspect Status message from the API. The API and TREP infrastucture will recover this item stream for the application. 
  1. The provider successfully recovers the connection and starts receiving up-to-date data again.
  2. It builds and sends an unsolicited refresh message that contains an Ok state and the complete data item. This message will refresh the ADH cache, send the latest data to the concerned consumers and notify them that everything is fine now.
        provider.submit(
                    EmaFactory.createRefreshMsg()
                        .serviceName(serviceName)
                        .name(name)
                        .domainType(EmaRdm.MMT_MARKET_PRICE)
                        .state(
                            OmmState.StreamState.OPEN,        // Stream state
                            OmmState.DataState.OK,            // Data state
                            OmmState.StatusCode.NONE,         // Data status
                            "UnSolicited Refresh Completed")  // Explanatory text
                        .payload(fieldList)
                        .complete(true), 
                    itemHandle);
    

    Note: By default EMA builds unsolicited refresh messages. You can build solicited messages by calling the solicited() method on the RefreshMsg object. This is however useless for Non Interactive Provider as they must never publish this kind of messages.

This best practice scenario is implemented by the main workflow below.

The NiProvider class

In order to be able to simulate such a use case (with data items that go Suspect and then Ok again) we added two methods to the NiProvider.

The sendSuspectStatusFor() method sends a status message that notifies consumer applications of suspect data. This method is built on the same model as the refresh() and the updated() methods. First of all, it checks if the provider is connected and if the item name is valid. Then, it builds the message and sends it to the concerned stream using the approprate item handle.

void sendSuspectStatusFor(String itemName)
{
    if (!isConnected())
    {
        System.out.println("  ERROR: Can't send a status for " + itemName + ". The provider is not connected.");
        return;
    }

    if (itemName != theOnlyOneItem.getName())
    {
        System.out.println("  ERROR: Can't send a status for " + itemName + ". Unknown item name.");
        return;
    }

    Item item = theOnlyOneItem;

    System.out.println("  Sending a Stale status for " + item.getName());

    provider.submit(
                EmaFactory.createStatusMsg()
                    .state(
                        OmmState.StreamState.OPEN,      // Stream state
                        OmmState.DataState.SUSPECT,     // Data state 
                        OmmState.StatusCode.ERROR,      // Data status
                        "Houston, we have a problem!"), // Explanatory text
                item.getHandle());
}

 

The sendOkStatusFor() method is built in a very similar way, except that it sends an ok status message instead of a suspect status message.

Note: This method is not used by the main workflow below but is provided in case you would like to implement scenario 1.

void sendOkStatusFor(String itemName)
{
    if (!isConnected())
    {
        System.out.println("  ERROR: Can't send a status for " + itemName + ". The provider is not connected.");
        return;
    }

    if (itemName != theOnlyOneItem.getName())
    {
        System.out.println("  ERROR: Can't send a status for " + itemName + ". Unknown item name.");
        return;
    }

    Item item = theOnlyOneItem;

    System.out.println("  Sending an Ok status for " + item.getName());

    provider.submit(
                EmaFactory.createStatusMsg()
                    .state(
                        OmmState.StreamState.OPEN,  // Stream state
                        OmmState.DataState.OK,      // Data state 
                        OmmState.StatusCode.NONE,   // Data status
                        "All is good"),             // Explanatory text
                item.getHandle());
}

The main workflow

We changed the main workflow to simulate the situation when the item's data becomes suspect because of the scenario 2 described above. To this aim, we use the NiProvider methods to send refresh messages, update messages and statuses in a loop that simulates several disaster recoveries. For each iteration of the loop we do the following:

  1. We send an unsolicited refresh with its associated Ok state and wait 1second.
  2. Then we send an update and wait for 1 second again.
  3. Then we send a suspect status to notify consumers of the disaster.
  4. We wait for 3 seconds to simulate the recovery. 
  5. After 3 seconds we go back to the beginning of the loop and send a refresh messages with its Ok state again. This message refreshes the ADH cache, sends the latest data to consumers and notifies them that everything has been recovered.

Here is the corresponding source code:

public static void main(String[] args)
{
    .
    .
    .
        NiProvider provider = new NiProvider(); 
        provider.setServiceName("TEST_NI_PUB");
        provider.connectAs("YOUR_PROVIDER_USER_NAME");
        waitFor(5);

        for (int i = 0; i < 30; ++i)
        {
            provider.refresh("SHARE-0");
            waitFor(1);

            provider.update("SHARE-0");
            waitFor(1);

            provider.sendSuspectStatusFor("SHARE-0");
            waitFor(3);
        }

        provider.disconnect();
    .
    .
    .
}

Build and run the application

Build the application and start it. Please refer to the Build and Run section within the first tutorial of this series (A barebones EMA NIP application shell) for detailed instructions.

This is what you should get:

  1. The application should display something like:
-------------------------------------------------------------------------------
|                    Non Interactive Provider EMA Tutorial                    |
|                                                                             |
|                      Tutorial 8 - Publishing statuses                       |
-------------------------------------------------------------------------------
  Provider created
  Connecting Provider to ADH 10.2.43.49:14003 as nip-user
  Waiting for 5 seconds...
  Provider is connected. OmmState:Open / Ok / None / 'Refresh Completed'
  Refreshing SHARE-0
  Waiting for 1 seconds...
  Updating SHARE-0
  Waiting for 1 seconds...
  Sending a Stale status for SHARE-0
  Waiting for 3 seconds...    .
    .
    . 
  Refreshing SHARE-0
  Waiting for 1 seconds...
  Updating SHARE-0
  Waiting for 1 seconds...
  Sending a Stale status for SHARE-0
  Waiting for 3 seconds...
  Disconnecting...
  Exiting the application
  1. Open a TREP consuming application and subscribe to the SHARE-0 market price item of the TEST_NI_PUB service. After a short while, you should receive values for the 5 fields (DSPLY_NAME/3, OPEN_PRC/19, HST_CLOSE/21, BID/22 and ASK/25) published by the Ni Provider.

    As an example, this is a screenshot of the Eikon Quote object that we used to subscribe to TEST_NI_PUB/SHARE-0: 

  2. After 1 second you should receive an update message for the BID and ASK fields.

    In the Eikon Quote object the BID and ASK fields are displayed in yellow for a short while:

  3. One second after you should receive a suspect status message. 

    In the Eikon Quote object the fields background turns red because of the suspect state of the item data:

  4. After 3 seconds the loop goes back to step 3 and you should receive a refresh message with an Ok state again.

    Thanks to this Ok state, the fields background of the Eikon Quote object turns black again:

  5. After 30 iterations, the TEST_NI_PUB service goes down and the application exits.
  6. Press a key to close the console when you are prompted to.

Troubleshooting

Q: When I build or run the tutorial, it fails with an error like:

The system cannot find the path specified

A: The JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment section of the first tutorial.

 

Q: When I build the tutorial, I get ”<path>/javac: No such file or directory” or when I run the tutorial, I get  ”<path>/java: No such file or directory” error like

line 59: /home/user/jdk/bin/javac: No such file or directory

A: The JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment.

 

Q: When I build or run the tutorial, I get "Finding Jar files in <path>” and “The system cannot find the path specified.” errors like

Build the NIP application with Elektron SDK Java version 1.2.x or higher.

Finding Jar files in C:\Elektron-SDK\Java\Ema\Libs\
The system cannot find the path specified.

A: There are 2 possible causes:

 

Q: When I build or run the tutorial, I get "<path to /*jar> : No such file or directory” error like

/home/user/Elektron-SDK1.1.1.E2.java.eload/Java/Ema/Libs/*.jar: No such file or directory

A: There are 2 possible causes:

 

Q: When I build the tutorial, I get "package ... does not exist" and "cannot find symbol" errors like:

Main.java:20: error: package com.thomsonreuters.ema.access does not exist
import com.thomsonreuters.ema.access.OmmException;
                                    ^
Main.java:56: error: cannot find symbol
                catch (OmmException exception)
                       ^
  symbol:   class OmmException
  location: class Main

A: The ELEKTRON_JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment section of the first tutorial.

 

Q: When I run the tutorial, I get a JNI error with a NoClassDefFoundError exception like:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: com/thomsonreuters/ema/access/OmmException
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: com.thomsonreuters.ema.access.OmmException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more

A: The ELEKTRON_JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment section of the first tutorial.

 

Q: The application is stuck after the "Connecting Provider to ADH…" message is displayed.

After a while the application displays an error like: 

login failed (timed out after waiting 45000 milliseconds) for 10.2.43.149:14003)

A: Verify that the ADH of your TREP infrastructure is up and that you properly set the host parameter in the EmaConfig.xml file. 

You can also use the telnet command tool to verify that your NIP application machine can connect to the ADH (telnet <ADH host> <port>). If the telnet succeeds but you still can’t connect, verify that you don’t have any firewall blocking the messages sent/received by the application.  

Ultimately, ask your TREP administrator to help you to investigate with TREP monitoring tools like adhmon.

 

Q: The NI Provider application can connect and publish data to ADH, but the consumer application does not receive data and shows the following Status Message

state="Closed / Suspect / None / 'Service name of 'TEST_NI_PUB' is not found.'"

Or

State: OPEN, SUSPECT, NONE,  "Waiting for service TEST_NI_PUB UP. Item recovery in progress..."

A: It means the published Service is not match with the NI Service defined in the ADH configurations. Please contact TREP administrator to help you to check the NI Service name defined in your TREP infrastructure.

Tutorial Group: 
EMA NI Provider