ARTICLE

使用企业消息API(EMA)的机器可读新闻(MRN)介绍

Zoya Farberov
Developer Advocate Developer Advocate

概述

        本文研究了路孚特的机器可读新闻产品,讨论了它的特点、最常见的使用案例,并从应用开发者的角度提供了具体的实施细节。

 

机器可读新闻

        路孚特机器可读新闻(MRN)是一项对新闻进行自动订阅和系统分析的先进服务。它直接向您的应用程序提供深厚的历史新闻档案、超低延迟的结构化新闻和新闻分析。这使得算法能够运用新闻的力量来把握机会,利用市场的信息不对称并管理事件风险。

 

可能的使用案例

        那么,如何才能利用这些数据呢?

  • 相关性:大量的用例来自于将MRN数据流与另一个数据流相关联,例如,与实时价格流相关联。请参考developers.refinitiv.com上的路孚特实时SDK Java路孚特实时SDK C++来了解实时价格订阅的详细例子。
  • 归档:存储一个固定的时间段内关于一组证券的新闻,以便进一步使用。本文后面提到的一个例子说明了如何使用EclipseLink将数据存储到MySQL数据库。
  • 提醒:获得关于可被视为交易事件的异常新闻的通知。异常案例可以在中性或负面的背景下有非常积极的情绪,反之亦然,或者在短时间内有关于某个特定机构的异常数量的新闻。所有这些特征都可以被量化、具体化并建立在您的应用程序的工作流程中。

 

工具

        在这篇文章中,我们将使用企业消息API(EMA)的Java版本,这是一个数据中立的、多线程的、易于使用的API,提供对OMM(Open Message Model)和RWF(Reuters Wire Format)数据的访问。如果您不熟悉EMA,我们强烈建议您查看JavaC/C++的教程。

 

数据模型

        现在让我们开始研究具体细节,首先我们来研究MRN数据是如何形成的。

  • 一条核心MRN数据是一个UTF8的JSON字符串;
  •  JSON字符串使用gzip进行压缩;
  •  因为有些数据太大,无法装入单一的消息中,所以压缩后的JSON字符串被分割成若干片段,每个片段都适合于一个RSSL更新(Update)。
  • 数据片段作为FieldList包中的FRAGMENT字段值被添加到更新消息中。

        因此,为了解析这些数据,我们需要将这个过程倒过来。让我们考虑一下如何识别所有相关的片段,并重新组装一篇单一的新闻报道。

        以下五个字段以及RIC本身都是必要的,它们用以确定是否收到了整条数据的各个片段,以及如何将这些片段串联起来重建这条数据:

  • MRN_SRC:发布FRAGMENT的评分/处理系统的标识符。
  • GUID:该条数据的全球唯一标识符。这条数据的所有消息都有相同的GUID值。
  •  FRAGMENT:压缩的数据片段本身。
  • TOT_SIZE:分片数据的总大小,以字节为单位。
  •  FRAG_NUM:一条数据里各个片段的序列号。对于发布的每条数据的第一个片段,这个数字被设置为1;对于每条数据的后续片段,这个数字会被递增。

        一条MRN数据发布是由RIC、MRN_SRC和GUID的组合来唯一标识的。

 对于一个给定的RIC-MRN_SRC-GUID组合,当一条数据只需要一个消息时,TOT_SIZE将等于FRAGMENT的字节数,FRAG_NUM将是1。当需要多个消息时,一旦每个FRAGMENT的字节数之和等于TOT_SUM,就可以认为该条数据已经完全收到。消费者还将观察到所有FRAG_NUM的范围是从1到片段的数量,没有跳过中间的整数。换句话说,一条通过三个消息传输的数据的FRAG_NUM值是1,2,3。

 

应用程序的高层结构

        我们的应用程序是一个消费者,它将:

  1. 通过直接的接入点(路孚特实时边界设备EED),或通过分布设施,如路孚特实时分布系统(RTDS,即之前的TREP),与数据源(供应商)建立连接;
  2. 对MRN RIC发出一个或多个订阅请求;
  3.  注册以获取数据更新,并对其进行解析;
  4. 注册以获取状态事件并适当地处理它们。

        关于编写EMA消费者应用程序的更多细节,请参考《开发者指南》。

        让我们来看看使用新闻文本分析域(News Text Analytics)和MRN特定消费发布的数据。首先,我们要请求以下RIC,并订阅它们的更新:

内容集  RIC 
实时新闻 MRN_STORY
新闻分析:公司和商品&能源资产 MRN_NA_ENT0
新闻分析:宏观经济新闻和事件 MRN_NA_DOC

        例子:

    	
            

OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig();

OmmConsumer consumer = EmaFactory.createOmmConsumer(config.host(_ip + ":" + _port).username(_userName));

ReqMsg reqMsg = EmaFactory.createReqMsg();

for(int i = 0; i<_ricsMRN.length; i++) {

    consumer.registerClient(reqMsg.domainType(EmaRdm.MMT_NEWS_TEXT_ANALYTICS).serviceName(

            _serviceName).name(_ricsMRN[i]), _appClient, (new Integer(i)));

 

        我们注册以获取如下更新:

  • onRefreshMsg
  • onUpdateMsg
  • onStatusMsg

 

        寻找片段

    	
            

if (fieldEntry.loadType() == DataTypes.BUFFER) {

    if (fieldEntry.fieldId() == FRAGMENT) {

如果只有一个片段,或者组装的长度等于TOT_SIZE,我们就可以转换为JSON,并可能进行格式化打印(pretty-print):

    	
            

if (fieldEntry.buffer().buffer().array().length == totalSize) {

    // there is only one segment, we are ready

    // unzip using gzip

    String strFlatFrag = unzipPayload(fieldEntry.buffer().buffer().array());

    System.out.println("=>FRAGMENT JSON STRING: " + strFlatFrag);

    try {

        JSONObject jsonResponse = new JSONObject(strFlatFrag);

        // pretty-print json response

        int spacesToIndentEachLevel = 2;

        System.out.println("FRAGMENT JSON PRETTY:\n" + jsonResponse.toString(spacesToIndentEachLevel));

    } 

    catch (Exception e) {

        System.err.println("Exception parsing json: " + e);

        e.printStackTrace(System.err);

    }

}

但如果收到的只是其中一个片段呢?我们应该保留这个片段,直到拥有所有的片段,这些片段存储在一个哈希表中。

    	
            

Hashtable<String,ArrayList<ByteBuffer>> fragBuilderHash;

...

alFrags = fragBuilderHash.get(guid);

alFrags.add(fieldEntry.buffer().buffer());

fragBuilderHash.put(guid, alFrags);

当fragBuilderHash中的片段的总和长度等于总大小时,我们就可以继续处理了。连接、解压缩、转换为JSON,并准备好将数据投入到一个好的用途。

 

相关的示例应用程序

·         控制台实例,将数据格式化打印到屏幕上是最简单和最明显的用例。请参考github上的MRN控制台实例

·         GUI浏览器,显示并排的实时新闻(报道)、新闻分析和情绪指数。请参考github上的MRN GUI Viewer

·         数据库存储,使用EclipseLink存储实现将新闻分析存储到MySQL数据库。请参考github上的MRN数据库存储

 

参考资料

·         MRN数据模型和Elektron实施指南

·         企业消息API(EMA)开发者指南