Elektron SDK - Java

API Family: Elektron

EMA NI Provider - Efficiently publishing multiple data items

Download tutorial source code

Click here to download

Last update January 2017
Compilers

JDK 1.7.x, JDK 1.8.x

Prerequisites

EMA NI Provider - Publishing item statuses

Declare the NI_PUB and TEST_NI_PUB services in your TREP (see Before you start).

Tutorial purpose

In this tutorial you will learn how to efficiently use EMA to publish refresh and update messages for a number of data items.

To this aim we will go through the following sections:

Introduction

Even if EMA is an ease of use API, it is also an efficient API that enables you to build high performance applications. When it comes to performance, using an efficient API is one thing, but you also have to use it the right way. In this tutorial, you will learn how to efficiently use EMA to publish a large number of messages with the best performance the API can offer.

EMA objects reuse

In the previous tutorials of this series we did not pay too much attention to performance and, for the sake of simplicity, we generally constructed new EMA objects each time we needed them. For example, to send an update message we wrote something like this:

    FieldList fieldList = EmaFactory.createFieldList();
    fieldList.add(EmaFactory.createFieldEntry()
            .real(
                BID, 
                item.getBidPrice(), OmmReal.MagnitudeType.EXPONENT_NEG_2));
    fieldList.add(EmaFactory.createFieldEntry()
            .real(
                ASK, 
                item.getAskPrice(), OmmReal.MagnitudeType.EXPONENT_NEG_2));

    provider.submit(
                EmaFactory.createUpdateMsg()
                    .payload(fieldList), 
                item.getHandle());

This works fine, but you may have noticed that each time we send an update we build a new UpdateMsg  object and a new FieldList object. Taken individually these objects creations may not represent much, but they start to have an importance as soon as you send a large number of messages. For this reason, it is always a good idea to reuse EMA objects when you can. The API helps you in this domain and provides a clear() method for all its reusable objects. When called, this method clears all the object’s values and resets them to the defaults.

So, instead of re-creating RefreshMsg, UpdateMsg and FieldList objects each time we need them, we can just preserve one instance of each and call the clear() method whenever we need to reuse it. Here is an example with the code snippet above refactored for EMA objects reuse:

    fieldList.clear();
    updateMessage.clear();

    fieldList.add(EmaFactory.createFieldEntry()
            .real(
                BID, 
                item.getBidPrice(), OmmReal.MagnitudeType.EXPONENT_NEG_2));
    fieldList.add(EmaFactory.createFieldEntry()
            .real(
                ASK, 
                item.getAskPrice(), OmmReal.MagnitudeType.EXPONENT_NEG_2));

    provider.submit(
                updateMessage
                    .payload(fieldList), 
                item.getHandle());  

This is an optimization we implemented in this tutorial. In the next sections we will explain how we refactored the application to achieve this goal.

About performance and optimization

Obviously, performance and optimization is not just about following some guidelines as the one described above. It is all about testing and measuring in isolated or real environments to understand what specific parts of your software or your system degrades performance. This requires the usage of “off the shelf” measurement tools and maybe also the development of bespoke tools that help you measure very specific aspects of your application. For example, as this tutorial series is about data items publication, it may be interesting to know how fast our application is able to publish images and updates.

This is what we will find out in this tutorial, by adopting a simplistic approach that consists in measuring the time spent in messages publication. Obviously, this will only measure publication performance at the provider level. It will not tell us anything about the performance of the complete distribution chain (Provider => TREP => Consumer), nor about other types of performance like message latency. But this is a start. The other aspects go beyond the scope of this tutorial. If you want to learn more about the performance of your system, you may be interested in the EMA PerfTools and the ETA PerfTools included in the Elektron SDK examples.

NI Provider refactoring

Being able to publish multiple data items requires to refactor our application. The major part of this refactoring takes place in the NiProvider class, as described below.

 

The items collection

Because we need the provider to be able to handle several data items, we got rid of the theOnlyOneItem member and replaced it with a collection of items named items. The items member is actually an HashMap that uses the item name as the key, and the corresponding item object as the value. That way, we can efficiently find an item by its name like this:

    Item item = items.get("SHARE-0");

This is required for the refresh(String itemName) and update(String itemName) methods that send a single message to items identified by their name. The items collection can also be efficiently iterated to send messages to all items.

 

The createItems() method

In order to build the collection of item objects, we added the createItems() method that takes two parameters: a name prefix and the number of items to create. The method creates the requested number of item objects and adds them to the items collection. The name of the items is based on the name prefix and on the item ID. For example, if the prefix is “SHARE-” and the number of items is 100, then the items will be named “SHARE-0, “SHARE-1”, “SHARE-2” to “SHARE-99”. The base value of the items is also driven by their IDs.

Here are the details of the method:

public void createItems(String itemNamePrefix, int itemCount)
{
    // Clears the items collection
    items.clear();

    for (int itemId = 0; itemId < itemCount; ++itemId)
    {
        // Generates the item name
        String itemName = itemNamePrefix + itemId;

        // Compute a base price for this item 
        int itemBasePrice = FLOOR_BASE_PRICE + itemId * 100;

        // Creates the item and preserves it in the collection
        Item newItem = new Item(itemName, itemBasePrice);
        items.put(itemName, newItem);
    }
}

 

The refreshAll() and updateAll() methods

In order to efficiently send refresh and update messages for all these items, we created a refreshAll() method and an updateAll() method that respectively refresh and update all the items of the items collection.

To avoid duplication, we factorized the code that actually builds and sends the refresh and update messages, to two new private methods: refresh(Item item) and update(Item item) (see below for details). 

As an example, here are the details of the updateAll() method (refreshAll() is built on the same model):

public void updateAll()
{
    System.out.println("  Updating all items");

    for(Item item : items.values())
    {
        update(item);
    }
}

 

The refresh(Item& item) and update(Item& item) new methods

These two new methods take advantage of the good practice described in the EMA objects reuse section above. They build and send EMA messages reusing the EMA objects we preserved as members of the NiProvider class. These methods are private to the NiProvider class and used by the following public methods: refresh(String itemName), refreshAll(), update(String itemName) and updateAll()

As an example, here is the source code of the update() method:

private void update(Item item)
{
    FieldList fieldList;
    UpdateMsg updateMessage;

    if(REUSE_EMA_OBJECTS)
    {   // Reuse EMA objects for better performance

        fieldList = this.fieldList;
        fieldList.clear();

        updateMessage = this.updateMessage;
        updateMessage.clear();
    }
    else
    {   // EMA objects created for each update

        fieldList = EmaFactory.createFieldList();
        updateMessage = EmaFactory.createUpdateMsg();
    }

    item.generateNextTick();

    fieldList.add(EmaFactory.createFieldEntry()
            .real(
                BID, 
                item.getBidPrice(), OmmReal.MagnitudeType.EXPONENT_NEG_2));
    fieldList.add(EmaFactory.createFieldEntry()
            .real(
                ASK, 
                item.getAskPrice(), OmmReal.MagnitudeType.EXPONENT_NEG_2));

    provider.submit(
                updateMessage
                    .serviceName(getServiceName())
                    .name(item.getName())
                    .payload(fieldList), 
                item.getHandle());        
}

As you can notice this method is implemented in two different ways, depending on the value of the REUSE_EMA_OBJECTS static member. If the REUSE_EMA_OBJECTS  is true, the refresh() and update() methods reuse the preserved EMA objects. If REUSE_EMA_OBJECTS  is false, the methods create new EMA objects for each call. By default, the REUSE_EMA _OBJECTS is set to true, so EMA objects are reused. REUSE_EMA _OBJECTS is set in the 43rd line of the NiProvider.java file:

static boolean REUSE_EMA_OBJECTS = true;

This gives you the opportunity to measure the performance difference between the two modes (EMA objects created vs EMA objects reused). For more details please refer to the related note in the Build and run the application section.

The main workflow

We changed the main method so that it implements a workflow equivalent to that of the Publishing updates tutorial, but for 10,000 items instead of one.

public static void main(String[] args)
{
    .
    .
    .
        NiProvider provider = new NiProvider(); 
        int itemsCount = 10000;

        provider.setServiceName("TEST_NI_PUB");
        provider.createItems("SHARE-", itemsCount);
        provider.connectAs("YOUR_PROVIDER_USER_NAME");
        waitFor(5);

        provider.refreshAll();
        waitFor(1);

        for (int i = 0; i < 30; ++i)
        {
            provider.updateAll();
            waitFor(1);
        }

        provider.disconnect();
    .
    .
    .
}

We also added time measurement and statistics display for the refresh phase and for the update phase. These statistics rely on the currentTimeMillis() System method and the printStatistics() static method also defined in Main.java.

This is the main workflow with the additional statistics:

public static void main(String[] args)
{
    .
    .
    .
        NiProvider provider = new NiProvider(); 
        int itemsCount = 10000;
        long startTime = 0;
        long endTime = 0;

        provider.setServiceName("TEST_NI_PUB");
        provider.createItems("SHARE-", itemsCount);
        provider.connectAs("YOUR_PROVIDER_USER_NAME");
        waitFor(5);

        startTime = System.currentTimeMillis();
        provider.refreshAll();
        endTime = System.currentTimeMillis();
        printStatistics(itemsCount, startTime, endTime);
        waitFor(1);

        for (int i = 0; i < 30; ++i)
        {
            startTime = System.currentTimeMillis();
            provider.updateAll();
            endTime = System.currentTimeMillis();
            printStatistics(itemsCount, startTime, endTime);
            waitFor(1);
        }

        provider.disconnect();
    .
    .
    .
}

The printStatistics() static method displays the time spent to publish the 10,000 refresh or update messages. It also displays the corresponding output rate.

Here are the details of this method:

static private void printStatistics(int itemsCount, long startTime, long endTime)
{
    float timeSpent = (float)(endTime - startTime) / (float)1000;

    System.out.println("\tMessage count: " + itemsCount);
    System.out.println("\tTotal time   : " + timeSpent + " sec");
    System.out.println("\tMessage rate : " + (float)itemsCount / timeSpent + " msg/sec");
}   

Build and run the application

Build the application and start it. Please refer to the Build and Run section within the first tutorial of this series (A barebones EMA NIP application shell) for detailed instructions.

This is what you should get:

  1. The application should display something like:
-------------------------------------------------------------------------------
|                    Non Interactive Provider EMA Tutorial                    |
|                                                                             |
|                 Tutorial 9 - Publishing multiple data items                 |
-------------------------------------------------------------------------------
  Provider created
  Connecting Provider to the ADH as YOUR_PROVIDER_USER_NAME
  Waiting for 5 seconds...
  Provider is connected. OmmState:Open / Ok / None / 'Refresh Completed'
  Refreshing all items
        Message count: 10000
        Total time   : 0.08 sec
        Message rate : 125000 msg/sec
  Waiting for 1 seconds...
  Updating all items
        Message count: 10000
        Total time   : 0.157 sec
        Message rate : 63694.3 msg/sec
  Waiting for 1 seconds...
  Updating all items
        Message count: 10000
        Total time   : 0.152 sec
        Message rate : 65789.5 msg/sec
  Waiting for 1 seconds...
    .
    .    
    .
  Updating all items
        Message count: 10000
        Total time   : 0.151 sec
        Message rate : 66225.2 msg/sec
  Waiting for 1 seconds...
  Disconnecting...
  Exiting the application
  1. Open a TREP consuming application and subscribe to the SHARE-1 market price item of the TEST_NI_PUB service. After a short while, you should receive values for the 5 fields (DSPLY_NAME/3, OPEN_PRC/19, HST_CLOSE/21, BID/22 and ASK/25) published by the Ni Provider. Then, every second the BID and ASK fields are updated.

    As an example, this is a screenshot of the Eikon Quote object that we used to subscribe to TEST_NI_PUB/SHARE-1. In this Eikon, updated fields are displayed in yellow for a short while:

  2. Your can also open any of the SHARE-0 to SHARE-9999 data items. You will see that all of them are streaming (one update every second).

    This is a screenshot of SHARE-9999 opened in the Eikon Quote object:

  3. After the 30 updates sent for each of the 10,000 items, the TEST_NI_PUB service goes down and the application exits.

Note: If you want to measure the performance difference between the two modes (EMA objects created on the stack vs EMA objects reused), you can build two versions of the application (one with REUSE_EMA _OBJECTS set to true and one with this variable set to false - REUSE_EMA _OBJECTS is defined in NiProvider.java), run them and compare the printed statistics.

Troubleshooting

Q: When I build or run the tutorial, it fails with an error like:

The system cannot find the path specified

A: The JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment section of the first tutorial.

 

Q: When I build the tutorial, I get ”<path>/javac: No such file or directory” or when I run the tutorial, I get  ”<path>/java: No such file or directory” error like

line 59: /home/user/jdk/bin/javac: No such file or directory

A: The JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment.

 

Q: When I build or run the tutorial, I get "Finding Jar files in <path>” and “The system cannot find the path specified.” errors like

Build the NIP application with Elektron SDK Java version 1.2.x or higher.

Finding Jar files in C:\Elektron-SDK\Java\Ema\Libs\
The system cannot find the path specified.

A: There are 2 possible causes:

 

Q: When I build or run the tutorial, I get "<path to /*jar> : No such file or directory” error like

/home/user/Elektron-SDK1.1.1.E2.java.eload/Java/Ema/Libs/*.jar: No such file or directory

A: There are 2 possible causes:

 

Q: When I build the tutorial, I get "package ... does not exist" and "cannot find symbol" errors like:

Main.java:20: error: package com.thomsonreuters.ema.access does not exist
import com.thomsonreuters.ema.access.OmmException;
                                    ^
Main.java:56: error: cannot find symbol
                catch (OmmException exception)
                       ^
  symbol:   class OmmException
  location: class Main

A: The ELEKTRON_JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment section of the first tutorial.

 

Q: When I run the tutorial, I get a JNI error with a NoClassDefFoundError exception like:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: com/thomsonreuters/ema/access/OmmException
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: com.thomsonreuters.ema.access.OmmException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more

A: The ELEKTRON_JAVA_HOME environment variable is not set, or set to the wrong path. See Setup the development environment section of the first tutorial.

 

Q: The application is stuck after the "Connecting Provider to ADH…" message is displayed.

After a while the application displays an error like: 

login failed (timed out after waiting 45000 milliseconds) for 10.2.43.149:14003)

A: Verify that the ADH of your TREP infrastructure is up and that you properly set the host parameter in the EmaConfig.xml file. 

You can also use the telnet command tool to verify that your NIP application machine can connect to the ADH (telnet <ADH host> <port>). If the telnet succeeds but you still can’t connect, verify that you don’t have any firewall blocking the messages sent/received by the application.  

Ultimately, ask your TREP administrator to help you to investigate with TREP monitoring tools like adhmon.

 

Q: My provider logs an error message that says: "channel out of buffers".

Details of the error message:

janv. 24, 2017 12:21:50 PM com.thomsonreuters.ema.access.OmmNiProviderImpl submit
SEVERE: loggerMsg
    ClientName: NiProvider_1_1
    Severity: Error
    Text:    Internal error: rsslChannel.submit() failed in OmmNiProviderImpl.submit(RefreshMsg)RsslChannel 0
        Error Id -3
        Internal sysError 0
        Error Location Reactor.submitChannel
        Error Text channel out of buffers chnl=java.nio.channels.SocketChannel[connected local=/10.44.25.159:61920 remote=/10.2.43.49:14003] 
        errorId=-4 errorText=channel out of buffers
loggerMsgEnd

Failed to submit RefreshMsg. Reason: ReactorReturnCodes.NO_BUFFERS. 
    Error text: channel out of buffers chnl=java.nio.channels.SocketChannel[connected local=/10.44.25.159:61920 remote=/10.2.43.49:14003] 
    errorId=-4 errorText=channel out of buffers

A: This indicates that the output buffer of the EMA/ETA library has overflowed. This happens if the ADH is too slow to consume the messages sent by your provider. This may be the sign of a slow connection between your Provider and the ADH (e.g. if you are connected via a VPN). You can work around this issue by decreasing the number of items created by your provider (try 1000 and then increase to adjust to your available bandwidth). 

 

Q: The NI Provider application can connect and publish data to ADH, but the consumer application does not receive data and shows the following Status Message

state="Closed / Suspect / None / 'Service name of 'TEST_NI_PUB' is not found.'"

Or

State: OPEN, SUSPECT, NONE,  "Waiting for service TEST_NI_PUB UP. Item recovery in progress..."

A: It means the published Service is not match with the NI Service defined in the ADH configurations. Please contact TREP administrator to help you to check the NI Service name defined in your TREP infrastructure.

Tutorial Group: 
EMA NI Provider