跳到主要内容
版本:4.0.0

STOMP 通讯

HTTP API 提供一套基于 STOMP (over websocket) 协议的、供客户端/服务端通讯用的消息队列,作为 HTTP API 的一个补充(采用纯 Websocket API 的客户端自带消息推送,不需要额外使用 STOMP 协议)。

采用 HTTP API 时,客户端通过连接到媒体服务的 /v1/ws 端点,订阅所需的消息,并在收到消息时作出相应的处理。比如,打开实时音视频操作,需要订阅 流媒体状态通知,在以异步模式调用 打开实时音视频 接口后,如果返回的流尚未可用,则要等待 流媒体状态通知消息, 一旦收到标志着流已经准备好的 流媒体状态通知消息,再开始播放。

基本约定

端点/v1/ws

步骤

  1. 连接到/v1/ws端点,连接时要在url带上 POST /login接口返回的有效的token(参数名:__token)。
  2. 连接后,订阅所需要的主题。
  3. 在订阅回调函数中处理消息。

心跳:采用默认的心跳设置(heartbeat.outgoining = 10000, heartbeat.inbound = 10000)。

会话有效性: 当HTTP API的token失效时,Websocket的会话也同时失效。HTTP API的token失效的原因主要是超时一定的时间无API调用。 所以在WebSocket的错误事件处理中,如果断定token已经失效,则不应再重连。当POST /login接口再次成功调用时,才重新连接WebSocket。

消息的结构

STOMP 客户端的消息回调处理器中,message.body的结构依主题不同而不同,见下WebSocket实时消息主题一览表。

以下以 stomp-websocket 的stomp.js(2.3.4)为例:

let stompClient;
let authToken;

function connect() {
if (!authToken)
return;

const url = 'wss://n11.gratour.info:7011/v1/ws?__token=' + authToken; // 将token添加到url上
stompClient = Stomp.Client(url);
stompClient.connect({}, (frame) => {
// 成功链接,订阅需要的主题
const dest = '/user/' + authToken + '/queue/strm';
this.stomp.subscribe(dest, (frame) => {
const notif = JSON.parse(frame.body);
// ... 处理
});

}, (error) => {
// 错误处理
});
} // function connect

消息队列一览

订阅地址样式说明发送者message.body的结构
/user/{token}/queue/strm流媒体状态通知服务端StrmNotif
/user/{token}/queue/cmd终端指令状态变更通知服务端CmdStateChanged
/user/{token}/queue/av_upload录像文件上传状态通知服务端AvUploadNotif
/test调试端点,见后文客户端WsTestRec

订阅地址中的{token}要用实际的令牌字符串(POST /login接口返回的authToken)代替,如token为Ge4E1xNHSfW8NYa0VJe48A,则流媒体状态通知的订阅地址为: /user/Ge4E1xNHSfW8NYa0VJe48A/queue/strm

调试端口

为方便调试,客户端可以通过向调试地址 /test 发送一个WsTestReq结构的消息,可触发服务端向相关的队列推送消息,以便调试。 WebSocket客户端可以使用本方法来测试连接,或调试自己的websocket消息业务处理逻辑。

使用方法和过程

  1. 用Stomp客户端连接 /v1/ws 端点
  2. 订阅所要测试的列队
  3. /test 测试地址发送一个 WsTestReq 消息。
  4. 服务端收到 WsTestReq 消息后,将 WsTestReq 消息中的 msg 字符串的内容原样推送到 /user/{token}/queue/{queue} 其中 token 为登录 token,queue 为由 WsTestReq.queue 指定的名称
  5. 客户端订阅的指定队列收到服务端的推送消息

WsTestReq 结构

属性数据类型必要说明
queuestringY队列名。服务最终推送的队列名为:/user/{token}/queue/{queue}
msgstringY所要推送的消息的内容。

示例

/**
* 发送到 `/test` 测试地址的消息体
*/
class WsTestReq {
/**
* 队列名
* @type string
*/
queue;

/**
* 要服务端推到 `queue` 属性定义的队列的消息内容。
* @type string
*/
msg;

/**
*
* @param {string} queueName
* @param {Object | string} payload
*/
constructor(queueName, payload) {
this.queue = queueName;
if (payload instanceof String)
this.msg = payload;
else
this.msg = JSON.stringify(payload);
}
}

/**
* 本结构用户可以自定义,此结构的内容将在 JSON.stringify 后作为 WsTestReq.msg 的内容,在 WsTestReq 发送到 `/test` 测试地址后,服务端会
* 将本结构的内容原样推送到 `/user/{token}/queue/{queue}`,其中 token 为登录 token,queue 为由 WsTestReq.queue 指定的名称。
*/
class WsConnTestPayload {

/**
* 自定义此结构的内容
*/

// ...
}


// 订阅 mytest 队列
this.stomp.subscribe("/user/LoDpxsejQxCIeZHE83iMSA/queue/mytest", (frame) => {
/**
*
* @type {WsConnTestPayload}
*/
const connTestReq = JSON.parse(frame.body);

// 对发到的消息进行处理
// ...
});

// 测试时
const reqId = crypto.randomUUID();
const payload = new WsConnTestPayload();
payload.reqId = reqId;

const req = new WsTestReq("mytest", payload);

// 将消息发送到 `/test` 测试地址
this.stomp.send("/test", null, JSON.stringify(req));