使用 Websocket API 向 Refinitiv Real-Time 发布数据

Umer Nalla
Developer Advocate Developer Advocate

“我不想学习一个API,我只想将一些数据发布到Refinitiv”

在客户现场工作期间,我经常遇到一类不需要使用Refinitiv数据的开发人员。他们是一些内部生成的实时数据的所有者,他们希望共享这些数据 - 供其他用户在内部或外部使用 - 但不想学习API来共享数据。

如果您是这些开发人员之一,或者只是想了解如何向Refinitivg贡献数据,请继续阅读....

数据发布

通常,一个组织会生成一些定价数据或其他数据,他们希望或者提供给内部用户使用,或者内部使用并且共享给外部金融市场。

这些价格或其他类型的数据先在内部生成,然后提交给Refinitiv实时系统。

如果仅用于内部使用,这些数据将存储在内部实时缓存中 - 内部用户可以通过使用内部通信的RIC(代码名称)请求相关代码来使用数据。

如果是供外部使用,Refinitiv实时系统会将这些数据转发给贡献引擎,该引擎将负责将这些数据转发给Refinitiv,Refinitiv将在其Elektron实时数据源上发布价格。然后,Elektron 的外部用户可以通过请求相关的 RIC 代码来使用数据。

我在开场白中使用了“发布”这个词,但之后又使用了“贡献”这个词。这是因为许多开发人员在谈到发布时,他们的意思是贡献。在 Refinitiv Real-Time 实时数据场景中,“发布”意味着不同的东西,这超出了本指南的范围。

发布(或插入)数据

用于贡献数据的编程功能称为“发布” – 在旧版的API中也被称为“插入”。

直到最近,如果您想执行Post(或Insert),您必须花一些时间学习我们的API之一,以便执行必要的步骤来达到:

  • 连接到服务器
  • 使用您的账号密码登录
  • 将需要 Post 的有效负载编码为 API 特定的对象/格式
  • 提交发布内容到服务器
  • (可选)处理确认发布内容被接受的确认消息

很多客户端开发人员只会选取一个示例应用程序,然后根据他们的贡献需求对其进行破解 - 但没有时间了解该应用程序的工作原理。

随着我们新的Websocket接口的发布(在最新版本的Refinitiv实时数据系统上公开),这个过程变得更加简单。您不需要了解 API 细节 - 您可以使用基于标准的 Websocket 连接和 JSON 格式。

您需要了解一些 Refinitiv 特定知识 - 即您需要为服务器登录请求而编码JSON 消息的格式以及 Post 消息有效负载本身的格式。

虽然您可以使用任何支持JSON和websocket连接的编程语言,但我将在本指南中使用Python。

先决条件

为了完成本指南并成功将一些数据发布到Refinitiv或内部缓存,您将需要在PC上执行以下操作:

  • Python安装 - 我已经用Python v3.7测试了以下代码
  • 安装了“Websocket-client” Python 模块
  • 下载本文附带的示例源代码(请参阅本页右上角的链接)。

您还需要从内部市场数据团队获得以下内容:

  • 能使用 v3.2.1 或更高版本的 ADS 组件访问 Refinitiv 实时系统的权限(您将连接到的服务器)
  • ADS 上的主机名以及和ADS连接的Websocket 接口的端口号
  • DACS 用户名,具有将数据发布(贡献)到以下内容的正确权限:
    • 要发布到的服务(数据源)的名称
    • 可以供您安全发布的一个或多个测试 RIC(代码名称)

连接和登录

假设您已经满足上述所有先决条件,我们需要做的第一件事就是建立与ADS服务器的Websocket连接。

创建与服务器的 Websocket 连接

我们希望创建一个与服务器的长期连接,因此我们使用WebSocketApp(来自websocket-client模块),它是Websocket的一个封装器,提供一个事件驱动接口。

我们通过调用我在示例中定义的 connect() 方法来执行此操作:

    	
            wsa = connect("myADS",15000, "umer.nalla")
        
        
    

我正在传入 myADS 的主机名、端口 15000 和我的 DACS 用户名。此时不需要用户名,但该方法会将其存储在全局变量中以供以后使用。您显然需要将这些替换为您从市场数据团队获得的。

    	
            

def connect(hostname, port, username, pos = socket.gethostbyname(socket.gethostname()), appid = 256):

    

    """ Called to connect to server """

    

    global app_id, user, position,web_socket_app

    # appid may be allocated by your Market Data team, otherwise default to 256

    app_id = appid

    user = username

    position = pos

    # the above values are stored for later usage when we attempt login.

 

    # Start websocket handshake

    ws_address = "ws://{}:{}/WebSocket".format(hostname, port)

    print("Connecting to WebSocket " + ws_address + " ...")

    web_socket_app = websocket.WebSocketApp(ws_address, header=['User-Agent: Python'],

                                        on_message=on_message,

                                        on_error=on_error,

                                        on_close=on_close,

                                        subprotocols=['tr_json2'])

    

    # callback for once websocket is open - which will send the Login request

    web_socket_app.on_open = on_open

 

    # Create Thread for WebsocketApp processing

    wst = threading.Thread(target=web_socket_app.run_forever)

    wst.start()

首先要注意的是连接地址ws_address是使用主机名和端口形成的,例如myADS:15000,其中myADS是主机名,15000是ADS监听websocket连接请求的端口。

接下来,我们创建一个 WebsocketApp 实例,并提供被以下事件类型激发的回撤方法:

  • 一旦websocket连接建立 - on_open
  •  当websocket关闭时 – on_close
  •  如果发生错误 – on_error
  •  当从 ADS 收到消息时 – on_message

另请注意,我们将子协议设置为tr_json2 - 以确保Websocket连接的Sec-WebSocket-Protocol标头值为tr_json2 - 这是ADS所期望的。

如果您决定使用不同的websocket库/语言,则上述代码无疑会有所不同 - 但关键点是我们需要处理上述事件类型。

调用上述连接方法后,我们等待建立websocket连接并成功登录响应:

    	
            

print ("Waiting for Login response")

 while not logged_in and (not shutdown_app):

     time.sleep(1)

一旦websocket建立连接后登录ADS

一旦我们的应用程序和ADS之间建立了websocket连接,我们要做的第一件事就是向ADS发送登录请求。

    	
            

def on_open(ws):

    """ Called when handshake is complete and websocket is open. Now send login """

    global web_socket_open

    web_socket_open = True

    send_login_request(ws)

因此,一旦websocket连接建立,WebSocketApp应该马上显示on_open。在这里,我们设置一个统一化的标志来指示 websocket 已打开,然后编码并向 ADS 发送 JSON 登录请求消息。

    	
            

def send_login_request(ws, is_refresh_token=False):

    """ Generate a login request and send """

    # Set values for ADS login

    # Note StreamID is 1 and the Domain is Login

    login_json = {

        'ID': 1,

        'Domain': 'Login',

        'Key': {

            'Name': '',

            'Elements': {

                'ApplicationId': '',

                'Position': ''

            }

        }

    }

 

    login_json['Key']['Name'] = user

    login_json['Key']['Elements']['ApplicationId'] = app_id

    login_json['Key']['Elements']['Position'] = position

    

     ws.send(json.dumps(login_json))

因此,我们创建一个 JSON 对象并设置以下值:

  • 数据流ID - 应用程序和服务器之间的每个请求(&响应)的唯一标识符,使用值1作为登录请求
  • 用户名 – 通常称为 DACS 用户名(DACS 是 Refinitiv Real-Time 使用的身份验证和授权系统)。
  • 应用程序 ID – 由您的市场数据团队分配的ID,否则使用默认值 256
  • 位置 - 运行应用程序的PC的本地IP地址/主机名

有些组织的 DACS 政策坚持使用非默认应用程序 ID 来执行成功登录 - 因此请与您的市场数据团队核实要求。

然后,我们通过 websocket 将 JSON 消息发送到 ADS。

传出login_json对象应如下所示:

    	
            

{

"Domain":"Login",

  "ID":1,

  "Key":{

    "Elements":{

      "ApplicationId":"256",

      "Position":"101.43.2.193"

    },

    "Name":"umer.nalla"

  }

发送登录请求后,我们可以期望服务器以JSON消息的形式通过Websocket进行异步响应。

来自 ADS 的成功登录刷新消息将如下所示:

    	
            

{

"Domain":"Login",

 "Elements":{

   "MaxMsgSize":61430,

   "PingTimeout":30

 },

 "ID":1,

 "Key":{

   "Elements":{

     "AllowSuspectData":1,

     "ApplicationId":"256",

     "SupportViewRequests":1

   },

   "Name":"umer.nalla"

 },

 "State":{

   "Data":"Ok",

   "Stream":"Open",

   "Text":"Login accepted by host centos7-2."

 },

 "Type":"Refresh"

}

需要注意的几点:

  • 数据流ID为1,对应于我们在登录请求消息中使用的值
  • 数据状态为“正常”和“数据流状态为打开” - 确认登录请求已被接受
  • “刷新”的类型值 - ADS 发送“刷新”作为对成功请求的初始响应

如果登录请求被ADS拒绝,我们将看到如下内容:

    	
            

{

   "ID": 1,

   "Type": "Status",

   "Domain": "Login",

   "Key": {

     "Name": "fred"

   },

   "State": {

     "Stream": "Closed",

     "Data": "Suspect",

     "Code": "UserUnknownToPermSys",

     "Text": "fred, unknown to system."

   }

 }

因此,对于登录失败,请注意以下事项:

  • 根据我们发送的登录请求,数据流ID为1
  • 类型是状态(而不是刷新)
  •  数据流状态为关闭(非打开)
  • 数据状态为可疑(不是OK)

如果您确实收到状态类型响应,请联系您的内部市场数据团队,并提供状态消息的详细信息,包括代码和文本值。

如果您还记得,我们之前指定了一个回撤来处理我们通过 websocket 收到的消息:

    	
            

def on_message(ws, message):

    """ Called when message received, parse message into JSON for processing """

    print("RECEIVED: ")

    message_json = json.loads(message)

    print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))

    for singleMsg in message_json:

        process_message(ws, singleMsg)

默认情况下,ADS 可以在单个 websocket 消息中发送多个 JSON 消息。因此,我们循环访问每个消息,并为每个消息调用process_message处理:

    	
            

def process_message(ws, message_json):

    global shutdown_app

 

    """ Extract Message Type and Domain"""

    message_type = message_json['Type']

    if 'Domain' in message_json:

        message_domain = message_json['Domain']

    # check for a Login Refresh response to confirm successful login

    if message_type == "Refresh" and message_domain == "Login":

        global logged_in

        logged_in = True

        print ("LOGGED IN")

    elif message_type == "Ping":

        pong_json = { 'Type':'Pong' }

        ws.send(json.dumps(pong_json))

        print("SENT:")

        print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))

    elif message_type == "Status" and message_domain == "Login":

        # A Login Status message usually indicates a problem - so report it and shutdown

        if message_json['State']['Stream'] != "Open" or message_json['State']['Data'] != "Ok":

            print("LOGIN REQUEST REJECTED.")

            shutdown_app = True

    elif message_type == "Error":

            shutdown_app = True

process_message方法会根据消息类型执行一些操作:

  • 如果收到刷新类型消息,我们将设置logged_in标志以指示登录成功。
  • 如果收到状态或错误消息,我们会将应用程序标记为关闭。
  • 当ADS向我们发送Ping消息时,我们会发送Pong响应 - 以确认应用程序仍在运行。

假设 Login 响应成功,回到 _main_ 方法后,由于logged_in标志设置为 True,等待循环应已退出。因此,我们现在可以继续并开始将数据发布到ADS。

将数据发布到ADS

正如我在开始时提到的,本指南针对的是不需要使用Refinitiv Real-Time的任何数据的开发人员 - 只需Post即可。针对此类要求的适宜发布技术称为“非数据流发布/非即时发布”。对于想要使用他们要发布到的工具的开发人员,还有一种称为“在线发布”的技术 - 请参阅本指南末尾,以获取涵盖这两种技术的教程的链接。

在计算机科学中,Stream通常被定义为“在一段时间内可用的列队数据 - 每个单点数据单独到达(而不是批量)”

当我们登录ADS时,我们建立了一个ID为1的数据流 - 即应用程序和ADS之间流式传输的任何与登录相关的数据都将以1的ID标识。

非即时发布

对于非即时发布,我们不需要订阅要发布的条目或者打开那个数据流。事实上,该条目甚至可能不存在,我们可能希望使用发布来创建该条目。

相反,我们将使用我们已经打开的一个数据流 - 即登录数据流来发送我们的帖子。

因此,假设登录请求成功 - 我们可以使用登录流ID 1发送离线帖子。

    	
            

svcname = "NIPROV"  # Service to Post on - Check with your Market Data team for correct value

ric = "UMER.TST"    # RIC to Post / Contribute data to - Check with your Market Data team for Test RICs

 

bid = 22.1          # A few dummy starting values

ask = 24.5

trdprc = 23.3

 

# Use a python dict to store our FieldName(key) + Value pairs

fields = { 'BID' : bid, 'ASK' : ask, 'TRDPRC_1' : trdprc, 'GEN_TEXT16' : 'some text' }

 

# Send our Refresh message to create the RIC or refresh the fields of an existing RIC

send_mp_offstream_post(svcname,ric, fields, True)

在上面的代码中,我们指定服务名称,测试RIC代码,我们创建一个带有虚拟值的几个字段的字典,然后我们调用我们的方法发送刷新类型帖子。

    	
            

def send_mp_offstream_post(svc,riccode, fields, refresh = False):

    global post_id

    """ Send an off-stream post message containing market-price content """

    mp_post_json = {

        'ID': 1,

        'Type':'Post',

        'Key': {

            'Service': svc,

            'Name': riccode

        },

        'Ack':True,

        'PostID':post_id,

        'PostUserInfo': {       # Ask you Market Data team if this is mandated by your organisation

            'Address':position,  # Use IP address as the Post User Address.

            'UserID':os.getpid() # Using process ID as the Post User Id - check with your MD team

        },

        'Message': {

            'ID': 0,

            'Type':'Refresh' if refresh else 'Update',

            'Fields':fields

        }

    }

首先要注意的是,有一个外部 Post 类型消息,其中包含内部刷新类型消息。除此之外,请注意以下事项:

  • 数据流ID为1 - 即应该打开的登录流
  • 外部消息的帖子类型
  • 关键内容包括
    • 由您的市场数据团队提供的服务
    • 名称 - 由您的市场数据团队提供的要发布的工具的RIC代码
  • Ack – 为Ture以请求ADS对此发布内容的确认
  • PostID - 对于我们发送的每个帖子,此值应该是唯一的 - 我们可以从Ack响应中提取它,以确定Ack与哪个帖子相关。
  • PostUserInfo - 您的组织出于审核目的可能需要地址和用户 ID(否则您可以省略这些值)
  • 内部消息包含您希望发布的实际数据
    • ID为0 - 我们正在发布一个值,因此我们不需要建立新的数据流
    • 类型 - 要创建新项目或刷新所有字段,应将其设置为刷新 - 否则使用Update来更新部分字段集
    •  字段 - 提供包含字段名称(关键值)+你想向其提供数据的字段的值对(Value pairs)的字典

调用上述方法应会导致将以下 JSON 消息发送到 ADS:

    	
            

{

  "Ack":true,

  "Domain":"MarketPrice",

  "ID":1,

  "Key":{

    "Name":"UMER.TST",

    "Service":"NIPROV"

  },

  "Message":{

    "Fields":{

      "ASK":24.5,

      "BID":22.1,

      "GEN_TEXT16":"some text",

      "TRDPRC_1":23.3

    },

    "ID":0,

    "Solicited":false,

    "State":{

      "Data":"Ok",

      "Stream":"Open"

    },

    "Type":"Refresh"

  },

  "PostID":1,

  "PostUserInfo":{

    "Address":"10.44.12.152",

    "UserID":7408

  },

  "Type":"Post"

}

假设发布被接受,我们应该从 ADS 获得一个异步 Ack 响应:

    	
            

{

    "AckID":1,

    "ID":1,

    "Key":{

      "Name":"UMER.TST",

      "Service":"NIPROV"

    },

    "Type":"Ack"

}

请注意 AckID 1,它与我们在刚刚发送到服务器的 Post 消息中指定的 PostID 1 相对应。我们可以使用此值来确定 Ack 与哪个 Post 相关。因此,我们应该对我们发送的每条Post消息使用唯一的PostId。

由于这只是一个示例,因此 _main_ 方法的其余部分只是按定时间隔发送具有随机价格值的 Update 类型消息。显然,您将在需要时发送“刷新”和“更新”。

发布刷新或更新?

在上面的示例中,我最初发布了一个刷新,然后是按时间间隔更新 - 纯粹是为了演示目的。但是,您选择使用哪种方法将取决于您的要求:

  • 如果要在内部缓存服务中创建新条目,可以发布刷新
  • 要添加或删除条目中包含的实际字段,您需要使用修订后的字段列表发布刷新
  • 如果您遇到一些现已解决的临时数据问题,并希望强制使用者覆盖任何本地缓存的字段,请发送 Refresh。这可确保数据的任何现有使用者获得所有字段的一组干净值
  • 要更改现有条目的一个或多个字段的值,您可以发布更新

请注意,如果我们尝试将更新发布到不存在的条目,我们将收到Ack响应,但它使用NakCode,例如,如果我们尝试发布到不存在的RIC代码 'DAVE.TST'会显示:

    	
            

{

    "ID": 1,

    "Type": "Ack",

    "AckID": 1,

    "NakCode": "SymbolUnknown",

    "Text": "F44: Unable to find item on post update.",

    "Key": {

      "Service": "NIPROV",

      "Name": "DAVE.TST"

    }

  }

本指南到此结束 - 总结一下:

  • 如果您想将数据贡献到内部缓存或Refinitiv,而无需使用数据,则可以使用Websocket接口以相对直接的方式实现这一点
  • 如果您想使用和贡献数据,您仍然可以应用上述大部分知识,通过使用在线发布来实现这一目标 - 如下面的教程链接中所述
  • 您应该能够使用任何支持Websockets的语言和编码JSON消息的能力来实现上述内容

其他资源

您可以在链接面板中找到指向源代码、API 和其他相关文章的链接