Article

Publishing content from internal databases to Thomson Reuters Enterprise Platform (TREP)

Gurpreet Bal
Platform Application Developer Platform Application Developer

Publishing content from internal database to Thomson Reuters Enterprise Platform

Many companies have huge resources of data and information siloed around their organizations. There is a need now to unleash these valuable assets from their siloes to power new value chains and facilitate new use cases and applications - both internally and externally. Getting this information accessible and moving around organizations is typical of the Platform approach - which is also characterised by a shift toward message-oriented architectures. Accompanying this shift is the need for adaptors which can extract the information from databases and provide it on a platform bus.

This article will briefly discuss various mechanisms to achieve this. We will discuss techniques which can be used to extract data from a table, and develop a barebones 'Open Message Model' (OMM) publisher using the Thomson Reuters Elektron Message API (EMA) to publish this extracted data to the Thomson Reuters Enterprise Platform (TREP) infrastructure. The sample code provided with this article is specifically for a MySQL database, but can be extended to any other database and complex schemas. While a simple usage scenario is discussed in this article, an application developer should also consider further possibilities - such as when database information is published, but the transaction rolls back, making the information just published incorrect.

Thomson Reuters also provides a product called Database Publishing System (DPS), which is a tightly integrated component of TREP that publishes information stored inside relational databases or flat files, in CSV and XML formats, and makes it available to any data consuming application.

Prerequisites

This article assumes knowledge of TREP infrastructure and access to an ADS with publishing entitlements. Reader is assumed to have basic knowledge of database concepts like Schemas, Tables, Triggers etc. A database with administrator privileges is required to test the example. The sample code presented uses SQL and Java language. Familiarity and access to EMA API toolkit and MySQL database is required.

Methodologies

The fundamental issue with publishing database contents is how to extract that data in the first place. Once extracted, it is easy to publish this data using Thomson Reuters API's and there are numerous publishing examples demonstrating that. Unlike SQL, the data extraction process and techniques are not standardized across different database providers. There are a few ways this can be accomplished and there is no single best technique, or one which can be used with every database provider. The desired technique a developer may end up using, will depend upon the database in question, ability to access and/or modify the database schema etc. Common techniques to achieve this are:

  1. Periodic polling of database table
  2. Polling and updating the database table
  3. In-database actions - executable code
  4. Database triggers

Let us take a brief look at these.

Periodic polling of database table

This technique involves an external application which uses a database connector (JDBC/ODBC) and extracts all the data from the desired schema. The extracted data is hashed and kept in memory or in local filesystem, depending on the size. The extraction process is repeated periodically; compared with cache and any differences are published. The cache is updated with new information after publishing.

Pros:

Completely external application which requires no change to database setup.

It works with all the database providers.

Cons:

Additional load on database because of periodic polling and complete re-read of entire schema.

It may not be feasible to maintain an external cache for extremely large datasets.

Polling and updating the database table

This technique involves an external application which uses a database connector (JDBC/ODBC) and extracts/updates the desired schema's. Unlike the previous technique, this application only selects the new or changed information from database, and then marks this information as "already-processed". This can be accomplished simply by using following SQL statements in the code:

    	
            

// BEGIN Transaction   

// extract data   

unpubIDs = execute("SELECT * FROM table where published = 'f'");   

// publish data   

for each (id : unpubIDs) {   

  publishMessage(id, data);   

}   

// update table   

execute("update table set published = 't' where published = 'f'")   

// COMMIT Transaction

Pros:

Application design is simple and this approach works with all database providers.

There is no need to maintain huge external cache.

Cons:

The database schema needs modification and write access provided to an external application, which is not always possible.

There is a possibility of race conditions with non-transactional databases.

In-database actions - executable code

This technique exploits the in-database code execution capabilities provided by some database vendors. Oracle (using Java and onDatabaseChangeNotification) and PostgreSQL (using Listen/Notify) are two such providers. The user code is allowed to inspect and act on the data that has or is about to change. This user code can then invoke the API calls required to publish the data directly from within database.

Pros:

Direct interface to data without any overhead of external extraction.

Cons:

Very provider specific and may not work with all providers.

In-database code often executes with system-level privileges, leading to security risks.

The concept of a persistent session which stays connected to publisher, may not be available.

User code may consume additional CPU resources which may put additional load on database.

Database triggers

Database triggers are the ability to execute SQL statements within database, upon the meeting of certain trigger conditions. The trigger SQL code executes within the context of a changed dataset. It is a standard concept, although its implementation and capabilities vary across different database providers. It is possible to write a database trigger to extract the modified data and write it to an external file, which is picked up by an external application and then published.

Pros:

External applications can maintain sessions and use any APIs and are not constrained by provider in-database code capabilities.

The application does not need any read/write access to database.

Does not exert much additional load on the database.

Cons:

Not standard across all database providers.

Some databases may not have the ability to pass on the triggered dataset outside the database.

This technique of using database triggers is explored in this article, using a MySQL database and the Elektron Message API.

Solution

We will develop an adapter application which will read database information and appear as a non-interactive OMM provider to TREP. Since databases can be updated anytime, the database triggers mechanism as described above will be utilized. In our case, the trigger will extract the new or updated data into an external file, which the adapter code will read and publish to the TREP infrastructure.

The data model of published content will be specific to the information within database. For the purposes of this article, we will assume a flat non-relational schema which will be encoded and published as an OMM based Market Price message. A supporting trigger code will reside in our database and will be invoked whenever the data changes in the monitored table. This trigger will write changed information (to be published) to a flat file.

A Java application built using the EMA API will monitor the filesystem for this flat file, then parse and publish the extracted data.

Lets dig deeper into the setup.

Database:

The database schema name we are going to use is finance, and the table of interest is called quotes which contains three fields - Instrument nameBid price and Ask price with sample data as shown:

    	
            

finance.quotes

 

RIC        BID        ASK

----------------------

INSTR1    38.44    38.60

INSTR2    44.79    45.02

INSTR3    14.30    14.90

We need to publish any insert or update to the existing dataset. The SQL commands in the sample code accompanying this article will create and populate this schema and table.

Trigger:

A trigger will monitor this table for changes. In MySQL, a trigger code can also output the triggered data to a file. Define a trigger in finance as:

    	
            

CREATE TRIGGER autopublish1   

AFTER INSERT ON finance.quotes   

FOR EACH ROW   

BEGIN   

  SELECT * INTO OUTFILE 'tmp/pub/newData.csv'   

  FIELDS TERMINATED BY ','   

  LINES TERMINATED BY '\n'   

  FROM finance.quotes   

  WHERE RIC = new.RIC;   

END;

This will write new data (database insert) into 'newData.csv' file. A similar trigger is created for a SQL Update event as well. The complete trigger code is in the attached sample code. The Scanner class from the application loads and parses the information contained in this file. In the attached code the Scanner class hard segregates the data into predefined BID and ASK buckets. It can be modified to be driven by application configuration or the published fields names can also be extracted from the database.

Provider

The published data is a Reuters Domain Model RDM Market price message, which is an OMM Field List where the field entries are name-value data pairs. We will use a custom service to publish the sample derived data from the database and distinguish it from market data provided by Thomson Reuters. The Provider class in our application handles all the publishing tasks. It maintains a handle to all the published items, and publishes an Image or an Update message.

    	
            

FieldList fieldList = EmaFactory.createFieldList();

 

data.forEach((key ,value) -> {

  int fid = key.intValue();

  int dValue = value.intValue();

  // add read data into OMM fieldlist

  fieldList.add( EmaFactory.createFieldEntry().real(fid, dValue, OmmReal.MagnitudeType.EXPONENT_NEG_2));

});

Here the data from file which has been parsed into a Key/Value Map by Scanner class is marshaled into an OMM FieldList which is then packaged as a Refresh or Update message and published by OMMProvider.

Sample code

The sample code for this article is available in src directory. This sample uses Java, EMA and MySQL database.

Testing

To check the end-to-end process flow, start the sample by providing a TREP Service name, ADS host name etc on the command line. You must be entitled to publish on this service and can refer to [EMA Quick Start and Tutorials - NI Provider] (https://developers.refinitiv.com/elektron/elektron-sdk-java/learning?content=12016&type=learning_material_item) on how to set this up. Upon successful startup, the console window should display API messages showing successful login to ADS, and that the application is monitoring the target directory for database files. Whenever any new data is inserted into the target table, our trigger autopublish1 will be invoked which will write it to a file, and that data will be picked up by Scanner and published out to TREP.

    	
            

Started monitoring: C:\Temp

Processing file: newData3.csv

Publishing: INSTR3, data: {22=1476, 25=1492}

Outgoing Reactor message (Tue Jan 17 10:41:48 EST 2017):

<!-- rwfMajorVer="14" rwfMinorVer="1" -->

<REFRESH domainType="MARKET_PRICE" streamId="-1" containerType="FIELD_LIST" flags="0x48 (HAS_MSG_KEY|REFRESH_COMPLETE)"

groupId="0" State: Open/Ok/None - text: "UnSolicited Refresh Completed" dataSize="15">

    <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="0" name="INSTR3"/>

    <dataBody>

        <fieldList flags="0x08 (HAS_STANDARD_DATA)">

            <fieldEntry fieldId="22" data="0C05 96"/>

            <fieldEntry fieldId="25" data="0C06 BA"/>

        </fieldList>

    </dataBody>

</REFRESH>

 

Processing file: newData3.csv

Publishing: INSTR3, data: {22=1442, 25=1745}

Outgoing Reactor message (Tue Jan 17 10:43:23 EST 2017):

<!-- rwfMajorVer="14" rwfMinorVer="1" -->

<UPDATE domainType="MARKET_PRICE" streamId="-1" containerType="FIELD_LIST" flags="0x08 (HAS_MSG_KEY)" updateType="0" dataSize="15">

    <key flags="0x03 (HAS_SERVICE_ID|HAS_NAME)" serviceId="0" name="INSTR3"/>

    <dataBody>

        <fieldList flags="0x08 (HAS_STANDARD_DATA)">

            <fieldEntry fieldId="22" data="0C05 96"/>

            <fieldEntry fieldId="25" data="0C06 BA"/>

        </fieldList>

    </dataBody>

</UPDATE>

Any market data subscriber is able to receive the resulting image and update messages from TREP.

    	
            

Jan 17, 2017 10:43:15 AM com.thomsonreuters.ema.access.ChannelCallbackClient reactorChannelEventCallback

INFO: loggerMsg

    ClientName: ChannelCallbackClient

    Severity: Info

    Text:    Received ChannelUp event on channel Channel

        Instance Name EmaConsumer_1

        Component Version ads2.6.1.L1.linux.rrg 32-bit

loggerMsgEnd

 

Item Name: INSTR3

Service Name: TD

Item State: Open / Ok / None / 'UnSolicited Refresh Completed'

BID(22): 14.76

ASK(25): 14.92

 

Item Name: INSTR3

Service Name: TD

BID(22): 14.42

ASK(25): 17.45

References

Glossary

TREP: Thomson Reuters Enterprise Platform

ADS: Advanced Distributing Server

EMA: Elektron Message API

JDBC: Java Database Connectivity

ODBC: Open Database Connectivity

OMM: Open Message Model