STOMP 通讯
HTTP API
提供一套基于 STOMP (over websocket) 协议的、供客户端/服务端通讯用的消息队列,作为 HTTP API
的一个补充(采用纯 Websocket API 的客户端自带消息推送,不需要额外使用 STOMP 协议)。
采用 HTTP API
时,客户端通过连接到媒体服务的 /v1/ws
端点,订阅所需的消息,并在收到消息时作出相应的处理。比如,打开实时音视频操作,需要订阅
流媒体状态通知
,在以异步模式调用 打开实时音视频
接口后,如果返回的流尚未可用,则要等待 流媒体状态通知
消息,
一旦收到标志着流已经准备好的 流媒体状态通知
消息,再开始播放。
基本约定
端点: /v1/ws
步骤:
- 连接到
/v1/ws
端点,连接时要在url带上POST /login
接口返回的有效的token(参数名:__token
)。 - 连接后,订阅所需要的主题。
- 在订阅回调函数中处理消息。
心跳:采用默认的心跳设置(heartbeat.outgoining = 10000, heartbeat.inbound = 10000)。
会话有效性: 当HTTP API的token失效时,Websocket的会话也同时失效。HTTP API的token失效的原因主要是超时一定的时间无API调用。
所以在WebSocket的错误事件处理中,如果断定token已经失效,则不应再重连。当POST /login
接口再次成功调用时,才重新连接WebSocket。
消息的结构
STOMP 客户端的消息回调处理器中,message.body的结构依主题不同而不同,见下WebSocket实时消息主题一览表。
以下以 stomp-websocket 的stomp.js(2.3.4)为例:
let stompClient;
let authToken;
function connect() {
if (!authToken)
return;
const url = 'wss://n11.gratour.info:7011/v1/ws?__token=' + authToken; // 将token添加到url上
stompClient = Stomp.Client(url);
stompClient.connect({}, (frame) => {
// 成功链接,订阅需要的主题
const dest = '/user/' + authToken + '/queue/strm';
this.stomp.subscribe(dest, (frame) => {
const notif = JSON.parse(frame.body);
// ... 处理
});
}, (error) => {
// 错误处理
});
} // function connect
消息队列一览
订阅地址样式 | 说明 | 发送者 | message.body的结构 |
---|---|---|---|
/user/{token}/queue/strm | 流媒体状态通知 | 服务端 | StrmNotif |
/user/{token}/queue/cmd | 终端指令状态变更通知 | 服务端 | CmdStateChanged |
/user/{token}/queue/av_upload | 录像文件上传状态通知 | 服务端 | AvUploadNotif |
/test | 调试端点,见后文 | 客户端 | WsTestRec |
订阅地址中的{token}
要用实际的令牌字符串(POST /login
接口返回的authToken
)代替,如token为Ge4E1xNHSfW8NYa0VJe48A
,则流媒体状态通知的订阅地址为:
/user/Ge4E1xNHSfW8NYa0VJe48A/queue/strm
。
调试端口
为方便调试,客户端可以通过向调试地址 /test
发送一个WsTestReq
结构的消息,可触发服务端向相关的队列推送消息,以便调试。
WebSocket客户端可以使用本方法来测试连接,或调试自己的websocket消息业务处理逻辑。
使用方法和过程
- 用Stomp客户端连接 /v1/ws 端点
- 订阅所要测试的列队
- 向
/test
测试地址发送一个WsTestReq
消息。 - 服务端收到
WsTestReq
消息后,将WsTestReq
消息中的msg
字符串的内容原样推送到/user/{token}/queue/{queue}
其中 token 为登录 token,queue 为由 WsTestReq.queue 指定的名称 - 客户端订阅的指定队列收到服务端的推送消息
WsTestReq 结构
属性 | 数据类型 | 必要 | 说明 |
---|---|---|---|
queue | string | Y | 队列名。服务最终推送的队列名为:/user/{token}/queue/{queue} |
msg | string | Y | 所要推送的消息的内容。 |
示例
/**
* 发送到 `/test` 测试地址的消息体
*/
class WsTestReq {
/**
* 队列名
* @type string
*/
queue;
/**
* 要服务端推到 `queue` 属性定义的队列的消息内容。
* @type string
*/
msg;
/**
*
* @param {string} queueName
* @param {Object | string} payload
*/
constructor(queueName, payload) {
this.queue = queueName;
if (payload instanceof String)
this.msg = payload;
else
this.msg = JSON.stringify(payload);
}
}
/**
* 本结构用户可以自定义,此结构的内容将在 JSON.stringify 后作为 WsTestReq.msg 的内容,在 WsTestReq 发送到 `/test` 测试地址后,服务端会
* 将本结构的内容原样推送到 `/user/{token}/queue/{queue}`,其中 token 为登录 token,queue 为由 WsTestReq.queue 指定的名称。
*/
class WsConnTestPayload {
/**
* 自定义此结构的内容
*/
// ...
}
// 订阅 mytest 队列
this.stomp.subscribe("/user/LoDpxsejQxCIeZHE83iMSA/queue/mytest", (frame) => {
/**
*
* @type {WsConnTestPayload}
*/
const connTestReq = JSON.parse(frame.body);
// 对发到的消息进行处理
// ...
});
// 测试时
const reqId = crypto.randomUUID();
const payload = new WsConnTestPayload();
payload.reqId = reqId;
const req = new WsTestReq("mytest", payload);
// 将消息发送到 `/test` 测试地址
this.stomp.send("/test", null, JSON.stringify(req));