Overview

The Elektron Message API is a data-neutral, multi-threaded, ease-of-use API providing access to OMM and RWF data. As part of the Elektron Software Development Kit, or Elektron SDK, the Elektron Message API allows applications to consume and provide OMM data at the message level of the API stack. The message level is set on top of the transport level which is handled by the Elektron Transport API (also known as the UPA).

Kotlin is a statically-typed programming language developed by Jetbrains that runs on the Java virtual machine. Kotlin is a first-class programming language on Android OS. Kotlin interoperates with Java code and is reliant on Java code from the existing Java Class Library/Framework. Kotlin syntax aims for reducing Java language verbosity and complexity. The language is also designed with Java Interoperability in mind. Existing Java code (and EMA Java libraries) can be called from Kotlin in a natural way.

This example project shows how to implement the EMA Java OMM Consumer and OMM Provider applications with Kotlin programming language. The application source codes are implemented in Kotlin language with EMA Java library. All source code will be compiled to Java Virtual Machine (JVM) compatible Java classes and run in a console.

Note: Although Kotlin source code can also be compiled to JavaScript and Native code, this example project focus only the JVM target environment.

Application files

The example project contains one Interactive-Provider application and two Consumer applications.

  • The Kotlin_IProvider_200 shows how to implement a basic RSSL Interactive Provider application.
  • The Kotlin_Consumer_100 shows how to implement a basic RSSL Consumer application.
  • The Kotlin_Consumer_220 shows how to implement a RSSL Consumer application that handles incoming data for each FID type.

The consumer applications can consume data from Kotlin_IProvider_200 application or other Elektron data sources (ADS server, Elektron Test Data, etc).

You can find more detail regarding the OMM Consumer and OMM Interactive Provider interaction in EMA Java RDM Usage Guide sections 2.4 OMM Consumer / OMM Interactive Provider Initial Interaction and 2.5 Sending and Receiving Content. The EMA Java RDM Usage Guide is available in Elektron SDK - Java: Documentation page.

The project includes completed Kotlin source codes, EMA Java Configurations file and Intelli IDEA Community Edition files. You can download the latest version of Elektron SDK package from Elektron SDK - Java: Download page.

  • src/ folder: Applications source code folder
  • Kotlin_Consumer_100.kt: A basic consumer application source code
  • Kotlin_Consumer_220.kt: A consumer application source code that show how to handle incoming data for each FID type
  • Kotlin_IProvider_200.kt: An Interactive Provider application source code
  • libs/ folder: Elektron SDK libraries files folder
  • etc/ folder: Elektron Data Dictionary files folder (RDMFieldDictionary and enumtype.def files)
  • EmaConfig.xml: Elektron SDK Java Configuration file
  • .idea/ folder, Kotlin_EMA.iml: IntelliJ IDEA project file and folder
  • LICENSE.md: License declaration file
  • README.md: readme file
  • buid.xml: ANT build and run file

This example project and source code are compatible with Kotlin version 1.2.50 and above, IntelliJ IDEA Java IDE versions 2017/2018/2019 and Elektron SDK - Java edition 1.2.x and 1.3.x versions.

Consumer Application Code Walkthrough

This article shows the Kotlin_Consumer_220.kt application code walkthrough. Please note that all Server, Data Dictionary and Service name configurations are configured in EmaConfig.xml file.

Consumer Creation and Configuration

The Kotlin_Consumer_220.kt file implements the standard EMA Java Consumer applications. Firstly, we create OmmConsumer and OmmConsumerConfig objects. The OmmConsumerConfig object is configured to use "Consumer_1" configuration in the EmaConfig.xml file. The application then passes an OmmConsumerConfig object to an OmmConsumer object. By default, EMA will launch a background thread to manage communication and message dispatching when the application initiates an OmmConsumer object.

fun main(args: Array<String>) {

    lateinit var consumer: OmmConsumer

    try {
        println("Starting Kotlin_Consumer_220 application")
        //OmmConsumer, OmmConsumerConfig creation and establish communication.
        consumer = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig().consumerName("Consumer_1"))
        
    } catch (excp: InterruptedException) {
        println(excp.message)
    } catch (excp: OmmException) {
        println(excp.message)
    } finally {
        consumer.uninitialize()
    }
}

Create the Client application class

Next, we create the AppclientFieldListWalk class that implements the OmmConsumerClient interface. The AppclientFieldListWalk class defines a number of mandatory callbacks functions to capture the different events generated when the application registers interest in an item.

//Client class, implements OmmConsumerClient interface
class AppclientFieldListWalk : OmmConsumerClient {

    override fun onRefreshMsg(refreshMsg: RefreshMsg, event: OmmConsumerEvent) {
        println(refreshMsg)
    }

    override fun onUpdateMsg(updateMsg: UpdateMsg, event: OmmConsumerEvent) {
        println(updateMsg)
    }

    override fun onStatusMsg(statusMsg: StatusMsg, event: OmmConsumerEvent) {
        println(statusMsg)
    }

    override fun onGenericMsg(genericMsg: GenericMsg, event: OmmConsumerEvent): Unit {}

    override fun onAckMsg(ackMsg: AckMsg, event: OmmConsumerEvent): Unit {}

    override fun onAllMsg(msg: Msg, event: OmmConsumerEvent): Unit {}
}

Sending Market Price Item Request Message

Next, the application subscribes Euro currency Market Price data (EUR= RIC) to OMM Provider via the EMA Java OmmConsumer::registerClient() function. The application uses a default OmmConsumerConfig.OperationModel.API_DISPATCH dispatch model, so the application sleeps for one minute to lets the API dispatches all incoming messages to the AppclientFieldListWalk class.

fun main(args: Array<String>) {

    lateinit var consumer: OmmConsumer

    val appClient = AppclientFieldListWalk()
    try {
        println("Starting Kotlin_Consumer_220 application")
        //OmmConsumer, OmmConsumerConfig creation and establish communication.
        consumer = EmaFactory.createOmmConsumer(EmaFactory.createOmmConsumerConfig().consumerName("Consumer_1"))

        println("Kotlin_Consumer_220: Send item request message")
        consumer.registerClient(EmaFactory.createReqMsg().serviceName("DIRECT_FEED").name("EUR="), appClient) //Subscribe for EUR= RIC from DIRECT_FEED service

        Thread.sleep(60000)
    } catch (excp: InterruptedException) {
        println(excp.message)
    } catch (excp: OmmException) {
        println(excp.message)
    } finally {
        consumer.uninitialize()
    }
}

Handles incoming data

All messages (Refresh, Update, Status, etc) from the OMM Provider will be available via associate AppclientFieldListWalk callbacks functions. We implement a decode function to iterate each incoming OMM FieldList and FieldEntry data based on each field type, then parse and display it in a console. This function receives the FieldList data from Refresh and Update messages payload as a parameter.

class AppclientFieldListWalk : OmmConsumerClient {
    //EMA callbacks functions

    //Iterates OMM FieldList, then parse each OMM FieldEntry based on FID type
    fun decode(fieldList: FieldList): Unit {
        for (fieldEntry: FieldEntry in fieldList) {
            print("Fid: ${fieldEntry.fieldId()} Name = ${fieldEntry.name()} DataType: ${DataType.asString(fieldEntry.load().dataType())} Value: ")

            if (fieldEntry.code() == Data.DataCode.BLANK) {
                println(" blank")
            } else {
                when (fieldEntry.loadType()) {
                    DataTypes.REAL -> println(fieldEntry.real().asDouble())
                    DataTypes.DATE -> println("${fieldEntry.date().day()} / ${fieldEntry.date().month()} / ${fieldEntry.date().year()}")
                    DataTypes.TIME -> println("${fieldEntry.time().hour()} : ${fieldEntry.time().minute()} : ${fieldEntry.time().second()} : ${fieldEntry.time().millisecond()}")
                    DataTypes.INT -> println(fieldEntry.intValue())
                    DataTypes.UINT -> println(fieldEntry.uintValue())
                    DataTypes.ASCII -> println(fieldEntry.ascii())
                    DataTypes.ENUM -> println("${if(fieldEntry.hasEnumDisplay()) fieldEntry.enumDisplay() else fieldEntry.enumValue() }")
                    DataTypes.ERROR -> println("(${fieldEntry.error().errorCodeAsString()})")
                    else -> println("")
                }
            }
        }
    }
}

Next, we update onRefreshMsg() and onUpdateMsg() callbacks functions to send incoming FieldList payload data to parse and display in console via a decode() function.

class AppclientFieldListWalk : OmmConsumerClient {
    override fun onRefreshMsg(refreshMsg: RefreshMsg, event: OmmConsumerEvent): Unit {

        if (refreshMsg.hasName()) println("Refresh: Item Name: ${refreshMsg.name()}")

        if (refreshMsg.hasServiceName()) println("Refresh: Service Name: ${refreshMsg.serviceName()}")

        println("Refresh:  Item State: ${refreshMsg.state()}")

        if(DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()) decode(refreshMsg.payload().fieldList())

        println("")
    }

    override fun onUpdateMsg(updateMsg: UpdateMsg, event: OmmConsumerEvent): Unit {

        if (updateMsg.hasName()) println("Update: Item Name: ${updateMsg.name()}")

        if (updateMsg.hasServiceName()) println("Update: Service Name: ${updateMsg.serviceName()}")

        if(DataType.DataTypes.FIELD_LIST == updateMsg.payload().dataType()) decode(updateMsg.payload().fieldList())

        println("")
    }

    override fun onStatusMsg(statusMsg: StatusMsg, event: OmmConsumerEvent): Unit {

        if (statusMsg.hasName()) println("Status: Item Name: ${statusMsg.name()}")

        if (statusMsg.hasServiceName()) println("Status: Service Name: ${statusMsg.serviceName()}")

        if(statusMsg.hasState()) println("Status: Item State: ${statusMsg.state()}")
    }

    override fun onGenericMsg(genericMsg: GenericMsg, event: OmmConsumerEvent): Unit {}

    override fun onAckMsg(ackMsg: AckMsg, event: OmmConsumerEvent): Unit {}

    override fun onAllMsg(msg: Msg, event: OmmConsumerEvent): Unit {}

    fun decode(fieldList: FieldList): Unit {
        // Code from previous step
        for (fieldEntry: FieldEntry in fieldList) {
            //...
        }
    }
}

Now, Kotlin_Consumer_220.kt application is ready for connecting and consuming Market Price data from OMM Provider.

Interactive Provider Application Code Walkthrough

This section shows the Kotlin_IProvider_200.kt application code walkthrough. Please note that all Server, Data Dictionary and Service name configurations are configured in EmaConfig.xml file.

Define the OmmProviderClient interface class

The Kotlin_IProvider_200.kt file implements the standard EMA Java Interactive Provider applications. Firstly, we create IProviderAppClient class that implements OmmProviderClient interface. This IProviderAppClient class will be responsible for the following tasks:

  • Processing incoming Login/Market Price request messages from the consumer application.
  • Sending Market Price Refresh message back to the consumer.

We define a number of mandatory OmmProviderClient's callbacks functions to handle different comsumer requests events.

//Client class, implements OmmProviderClient interface
class IProviderAppClient : OmmProviderClient {
    override fun onReqMsg(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {}

    override fun onRefreshMsg(refreshMsg: RefreshMsg, providerEvent: OmmProviderEvent) {}

    override fun onStatusMsg(statusMsg: StatusMsg, providerEvent: OmmProviderEvent) {}

    override fun onAllMsg(msg: Msg, providerEvent: OmmProviderEvent) {}

    override fun onClose(closeMsg: ReqMsg, providerEvent: OmmProviderEvent) {}

    override fun onGenericMsg(genericMsg: GenericMsg, providerEvent: OmmProviderEvent) {}

    override fun onPostMsg(postMsg: PostMsg, providerEvent: OmmProviderEvent) {}

    override fun onReissue(reissueMsg: ReqMsg, providerEvent: OmmProviderEvent) {}
}


fun main(args: Array<String>){
    val appCient = IProviderAppClient()
}

Provider Creation and Configuration

Next we create OmmProvider object with EmaFactory.createOmmProvider() function and configure it to use "Provider_1" configuration in the EmaConfig.xml file. This Interactive Provider application uses a OmmConsumerConfig.OperationModel.USER_DISPATCH dispatch model, so application calls the OmmProvider.dispatch() function manually to dispatch received messages to IProviderAppClient class. By default, EMA will launch a background thread to manage communication and establishes a server port when the application initiates an OmmProvider object.

fun main(args: Array<String>){
    lateinit var provider: OmmProvider
    val appCient = IProviderAppClient()
    try{

        println("Starting Kotlin_IProvider_200 application, waiting for a consumer application")

        //OMMProvider creation and establish sersver port.
        provider = EmaFactory.createOmmProvider(EmaFactory.createOmmIProviderConfig().providerName("Provider_1").operationModel(OmmIProviderConfig.OperationModel.USER_DISPATCH), appCient)

        while(appCient.itemHandle.toInt() == 0){
            provider.dispatch(1000L)
            Thread.sleep(1000L)
        }

    } catch (excp:OmmException ){
        println(excp.message)
    } catch (excp:InterruptedException ){
        println(excp.message)
    } finally {
        /*provider?.let {
            provider.uninitialize()
        }*/
        provider.uninitialize()
    }
}

Handles incoming Login Request Message

All request messages from Consumer applications will be delivered to IProviderAppClient class via the onReqMsg() callback function. We check if incoming request message domain type is a Login domain, then send that request message to the processLoginRequest() function for processing this Login request message. The processLoginRequest() function then sends a Login Refresh Response message via the OmmProvider.submit() function to accept this Login request.

class IProviderAppClient : OmmProviderClient {

    var itemHandle: Long = 0L // Initial item Handle
    override fun onReqMsg(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {
        //Verify incoming message type
        when (reqMsg.domainType()) {
            EmaRdm.MMT_LOGIN -> processLoginRequest(reqMsg,providerEvent) //Handle Login Request Message
            else -> {
               println("Wrong Request")
            }
        }
    }

    //OmmProviderClient callbacks functions.

    private fun processLoginRequest(reqMsg: ReqMsg, event: OmmProviderEvent) {

        println("Receive Login Request message from ${reqMsg.name()}, send Login Refresh")
        //Send Login REFESH_RESP message to consumer
        event.provider().submit(
                EmaFactory.createRefreshMsg()
                        .domainType(EmaRdm.MMT_LOGIN)
                        .name(reqMsg.name())
                        .nameType(EmaRdm.USER_NAME)
                        .complete(true)
                        .solicited(true)
                        .state(OmmState.StreamState.OPEN, OmmState.DataState.OK, OmmState.StatusCode.NONE, "Login accept")
                        .attrib(EmaFactory.createElementList())
                , event.handle())

    }
}

Handle incoming Market Price Request Message

Next, we update the onReqMsg callback function to support Market Price request message. The application checks if incoming request message domain type is a Market Price domain, then sends that request message to the processMarketPriceRequest() function for processing this Market Price request message.

The processMarketPriceRequest() function creates a Market Price Payload. Firstly, we create an OMM FieldList object via the EmaFactory.createFieldList() function, sets each Field value and FID in an OMM FieldEntry object, adds each FieldEntry object to FieldList object. Finally, the application sends a Market Price Refresh Response message with FieldList Payload to Consumer application via the OmmProvider.submit() function.

class IProviderAppClient : OmmProviderClient {

    var itemHandle: Long = 0L // Initial item Handle
    override fun onReqMsg(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {
        //Verify incoming message type
        when (reqMsg.domainType()) {
            EmaRdm.MMT_LOGIN -> processLoginRequest(reqMsg,providerEvent) //Handle Login Request Message
            EmaRdm.MMT_MARKET_PRICE -> processMarketPriceRequest(reqMsg,providerEvent) //Handle Market Request Message
            else -> {
                println("Wrong Request")
            }
        }
    }

    //OmmProviderClient callbacks functions.

    fun processLoginRequest(reqMsg: ReqMsg, event: OmmProviderEvent) {
        //Handle Login Request Message
    }

    private fun processMarketPriceRequest(reqMsg: ReqMsg,event: OmmProviderEvent){

        println("Kotlin_IProvider_200: Receive Market Price Request message")


        //Creates Market Price Payload and Send Refresh Message
        val fieldList:FieldList = EmaFactory.createFieldList()

        //Add each FID data to the message
        fieldList.add(EmaFactory.createFieldEntry().ascii(3, reqMsg.name()))
        fieldList.add(EmaFactory.createFieldEntry().enumValue(15, 840))
        fieldList.add(EmaFactory.createFieldEntry().real(21, 3900, OmmReal.MagnitudeType.EXPONENT_NEG_2))
        fieldList.add(EmaFactory.createFieldEntry().real(22, 3990, OmmReal.MagnitudeType.EXPONENT_NEG_2))
        fieldList.add(EmaFactory.createFieldEntry().real(25, 3994, OmmReal.MagnitudeType.EXPONENT_NEG_2))
        fieldList.add(EmaFactory.createFieldEntry().real(30, 9 , OmmReal.MagnitudeType.EXPONENT_0))
        fieldList.add(EmaFactory.createFieldEntry().real(31, 19 , OmmReal.MagnitudeType.EXPONENT_0))

        println("Kotlin_IProvider_200: Send  Market Price Refresh message")

        //Send Market Price REFESH_RESP message to consumer
        event.provider().submit(
                EmaFactory.createRefreshMsg()
                        .serviceName(reqMsg.serviceName())
                        .name(reqMsg.name())
                        .state(OmmState.StreamState.OPEN, OmmState.DataState.OK, OmmState.StatusCode.NONE , "Refresh Completed")
                        .solicited(true)
                        .payload(fieldList)
                        .complete(true)
                , event.handle())

        itemHandle = event.handle()
    }

}

Sending Market Price Update Message

The application sends Market Price Update Messages to Consumer application in a main() function. Firstly the application creates OMM FieldList and Update Message objects via EmaFactory class. The application creates the Update Message Payload with OMM FieldList and OMM FieldEntry objects, and sends Update Message object with a payload via OmmProvider.submit() function.

We iterate integer value from 1 to 59 and adds that value to the Price value of each Update message payload. This behavior simulates Elektron Update Message behavior that each tick contains different updated value.

fun main(args: Array<String>){

    //OMMProvider creation and establish sersver port.

    provider = EmaFactory.createOmmProvider(EmaFactory.createOmmIProviderConfig().providerName("Provider_1").operationModel(OmmIProviderConfig.OperationModel.USER_DISPATCH), appCient)

    //Creats OMM FieldList and Update Message objects
    val fieldList: FieldList = EmaFactory.createFieldList()
    val updateMsg: UpdateMsg = EmaFactory.createUpdateMsg()

    for(index in 1..59){
        val startTime: Long = System.currentTimeMillis()

        provider.dispatch(1000L)

        //Add each FID data to the message
        fieldList.clear()
        fieldList.add(EmaFactory.createFieldEntry().real(22, 3991 + index.toLong(), OmmReal.MagnitudeType.EXPONENT_NEG_2))
        fieldList.add(EmaFactory.createFieldEntry().real(25, 3994 + index.toLong(), OmmReal.MagnitudeType.EXPONENT_NEG_2))
        fieldList.add(EmaFactory.createFieldEntry().real(30, 10 + index.toLong(), OmmReal.MagnitudeType.EXPONENT_0))
        fieldList.add(EmaFactory.createFieldEntry().real(31, 19 + index.toLong(), OmmReal.MagnitudeType.EXPONENT_0))

        //Send Market Price UPDATE_RESP message to consumer
        provider.submit(updateMsg.clear().payload(fieldList), appCient.itemHandle)

        println("Kotlin_IProvider_200: Send  Market Price Update messages")
        while(System.currentTimeMillis() - startTime < 1000){}
    }
}

Handles Invalid Request Message

The application needs to handle invalid request messages from Consumer applications as well. We create a processInvalidItemRequest() function to send OMM Status message with Stream State: Closed, Data State: Suspect to Consumer application for following cases:

  • Incoming request messages are not Login and Market Price domains
  • An Item Handle is an invalid number.
class IProviderAppClient : OmmProviderClient {

    var itemHandle: Long = 0L // Initial item Handle
    override fun onReqMsg(reqMsg: ReqMsg, providerEvent: OmmProviderEvent) {
        when (reqMsg.domainType()) {
            EmaRdm.MMT_LOGIN -> processLoginRequest(reqMsg,providerEvent) //Handle Login Request Message
            EmaRdm.MMT_MARKET_PRICE -> processMarketPriceRequest(reqMsg,providerEvent) //Handle Market Price Request Message
            else -> {
                processInvalidItemRequest(reqMsg,providerEvent)
            }
        }
    }

    //OmmProviderClient callbacks functions.

    fun processLoginRequest(reqMsg: ReqMsg, event: OmmProviderEvent) {
        //Handle Login Request Message
    }

    private fun processMarketPriceRequest(reqMsg: ReqMsg,event: OmmProviderEvent){

        if(itemHandle.toInt() != 0){
            processInvalidItemRequest(reqMsg,event)
            return
        }

        //Creates Market Price Payload and Send Refresh Message
    }

    private fun processInvalidItemRequest(reqMsg: ReqMsg, event: OmmProviderEvent){
        event.provider().submit(
                EmaFactory.createStatusMsg()
                        .name(reqMsg.name())
                        .serviceName(reqMsg.serviceName())
                        .domainType(reqMsg.domainType())
                        .state(OmmState.StreamState.CLOSED, OmmState.DataState.SUSPECT, OmmState.StatusCode.NOT_FOUND, "Item not found")
                , event.handle())
    }

}

Now Kotlin_Consumer_220.kt application is ready for publishing Market Price data for OMM Consumer.

The application source code and Intelli IDEA Community Edition project files are available at GitHub web site. You can get it via the following git command

$>git clone git@github.com:TR-API-Samples/Example.EMA.Java.Kotlin.git

This example requires the following dependencies software and runtime

  1. Java 8 SDK
  2. IntelliJ IDEA Java IDE version 2017 and above. You can download Intelli IDEA Community Edition from this page.
  3. Elektron SDK Java Libraries via Maven Central web site. You can also download the SDK documents and examples from Elektron SDK - Java: Download page or GitHub page.

please check the README.md in the source code project for more detail.

How to run this example with IntelliJ IDEA

  1. Get the project via the above git command or download a zip file from GitHub page into a directory of your choice (example, D:/code/Kotlin_proj).
  2. Open IntelliJ IDEA Java IDE, select Create New Project.

     
  3. Select Kotlin/JVM in the Addition Libraries and Frameworks window, then click the Next button. 
  4. Set Project location folder to the directory from step 1 . 
  5. The example project and sub folders will be available in IntelliJ IDEA Java IDE. 
  6. Click on the File menu bar, then choose Project Structure... menu to set Project libraries.  
  7. Select Libraries menu, and then choose + menu bar and select New Project Library from Maven.. setting. 
  8. For this development purpose, input com.thomsonreuters.ema:ema:3.3.0.1 for EMA Java 3.3.0 G1 which is the latest Development version from the Maven Central repository. Then set IntelliJ to download all required libraries to D:\code\Kotlin_proj\libs folder location. For production purpose, I recommend com.thomsonreuters.ema:ema:3.3.0.0 for EMA Java 3.3.0 L1 which is an official stable version. 
  9. IntelliJ will automatically download all EMA libralies and dependencies to D:\code\Kotlin_proj\libs folder location. 
  10. Click on the Build menu bar, then choose Build Project to build the project (You may choose Rebuild Project if you change any source code). 
  11. Open Kotlin_IProvider_200.kt file, right click and choose Run to start the Kotlin_IProvider_200 provider application. 
  12. The Kotlin_IProvider_200 application will be started and waiting for a consumer application. 
  13. Open Kotlin_Consumer_220 file, right click and choose Run to start the Kotlin_Consumer_220 consumer application. 
  14. The Kotlin_Consumer_220 application will be started, then connects and consumes data from Kotlin_IProvider_200 application. 

 

References

For further details, please check out the following resources:

For any question related to this article or Elektron Message API page, please use the Developer Community Q&A Forum.