tp框架做的图片网站/重庆森林电影
【网络通信 -- 直播】SRS 实战记录 -- 客户端发布与接收媒体流
【1】基于 SRS 的 WebRTC 通讯
【1.1】SRS WebRTC 通讯架构
【1.2】SRS WebRTC 通讯交互流程
【2】基于 SRS 流媒体服务器的 WebRTC 客户端代码分析
【2.1】WebRTC API 简介
/*** 函数 : addTransceiver* 功能 : 创建一个 RTCRtpTransceiver 对象并将该对象添加到与 RTCPeerConnection 实例关联的收发器集合中* RTCRtpTransceiver 代表一个双向的流,存在对应的发送端与接收端* 原型 : rtpTransceiver = RTCPeerConnection.addTransceiver(trackOrKind, init);* 参数 : * trackOrKind,收发器相关联的媒体轨* init,初始化的属性字典,标识属性* 方向、发送端的编码、媒体流*//*** 函数 : getUserMedia* 功能 : 申请本地输入媒体的权限* 原型 : var promise = navigator.mediaDevices.getUserMedia(constraints);* 参数 : * constraints,带获取媒体流的约束条件* 返回 :* 包含媒体流实例的 Promise 对象*//*** 函数 : getTracks* 功能 : 获取媒体流轨道实例* 返回 :* 媒体流轨道数组*//*** 函数 : addTrack* 功能 : 向往对端发送的媒体轨道中添加一个新的轨道* 原型 : rtpSender = rtcPeerConnection.addTrack(track, stream...);* 参数 :* track,标识待添加到对等连接中的媒体轨道实例* stram,本地媒体流实例* 返回 :* rtpSender 用于传输媒体数据的实例*//*** 函数 : createOffer* 功能 : 创建 SDP 描述,用于启动一个与对端的新的 WebRTC 连接* 原型 : myPeerConnection.createOffer(successCallback, failureCallback, [options])* 返回 :* 基于 RTCSessionDescriptionInit 的以约束 SDP 描述生成 offer 的实例*//*** 函数 : setLocalDescription* 功能 : 设置与连接相关的本地描述*//*** 函数 : setRemoteDescription* 功能 : 将特定的会话描述设置为对端当前的 offer / answer*//*** RTCPeerConnection.close() : 关闭当前的连接*/
【2.2】基于 SRS 流媒体 WebRTC 通讯关键代码分析
信令处理
SrsRtcSignalingAsync 实例
作用 : 处理信令交互connect : 连接信令服务器 wss://xxx/sig/v1/rtcsend : 向信令服务器发送消息close : 关闭 websocket 连接onmessage : 处理接收到的消息SrsRtcSignalingParse
作用 : 解析 url 中 query(? 后的字符串)
示例结果如下
{ "query": "","rawQuery": "?autostart=true&room=4a02515","wsSchema": "wss","wsHost": "120.27.131.197","host": "120.27.131.197","room": "4a02515","display": "4186e4f","autostart": true
}
媒体流处理
SrsRtcPublisherAsync
作用 : 发布媒体流publish : 发布媒体流close : 关闭媒体流发布端ontrack : 添加媒体轨时触发的回调函数,回调函数中将媒体轨添加到本地流中SrsRtcPlayerAsync
作用 : 播放媒体流play : 播放媒体流close : 关闭媒体播放端ontrack : 添加媒体轨时触发的回调函数,回调函数中将媒体轨添加到本地流中
【3】基于 SRS 流媒体服务器的信令服务端代码分析
【3.1】消息结构
// 房间参与者 Participant 结构体,关联 Json
type Participant struct {Room *Room `json:"-"`Display string `json:"display"`Publishing bool `json:"publishing"`// 参与者的 Out 为 outMessagesOut chan []byte `json:"-"`
}// 房间 Room 结构体,关联 Json
type Room struct {Name string `json:"room"`Participants []*Participant `json:"participants"`lock sync.RWMutex `json:"-"`
}
【3.2】关键函数分析
房间管理相关方法
// Room 结构体的方法
// 向房间中添加参与者实例
func (v *Room) Add(p *Participant) error// Room 结构体的方法
// 获取房间中的参与者实例
func (v *Room) Get(display string) *Participant// Room 结构体的方法
// 移除房间中的参与者实例
func (v *Room) Remove(p *Participant)// Room 结构体的方法
// 遍历房间中的参与者,广播消息
func (v *Room) Notify(ctx context.Context, peer *Participant, event, param, data string)
WebSocket 处理方法
// 关联 "/sig/v1/rtc" 与 websocket 处理句柄
http.Handle("/sig/v1/rtc", websocket.Handler(func(c *websocket.Conn)// 协程 1 : 当连接关断后广播 leave 事件
go func() {// 从 ctx.Done() channel 中获取数据,该操作会一直阻塞直到读取到数据<-ctx.Done()if self == nil {return}// Notify other peers that we're quiting.// @remark The ctx(of self) is done, so we must use a new context.go self.Room.Notify(context.Background(), self, "leave", "", "")self.Room.Remove(self)logger.Tf(ctx, "Remove client %v", self)
}()// 协程 2 : 接收客户端发送的消息
go func() {defer cancel()buf := make([]byte, 16384)for {n, err := c.Read(buf)if err != nil {logger.Wf(ctx, "Ignore err %v for %v", err, r.RemoteAddr)break}// select 随机执行一个可运行的 case// 如果没有 case 可运行,它将阻塞,直到有 case 可运行select {// 协程结束则 context Donecase <-ctx.Done():// buf 收到的消息发送给 inMessagescase inMessages <- buf[:n]:}}
}()// 协程 3 : 处理客户端消息并发送响应
go func() {defer cancel()// WebSocket 接受到的消息处理函数handleMessage := func(m []byte) error {// 解析客户端信令的 json 结构体// 即 {action:action_val, room:room_val, display:display_val}action := struct {TID string `json:"tid"`Message struct {Action string `json:"action"`} `json:"msg"`}{}if err := json.Unmarshal(m, &action); err != nil {return errors.Wrapf(err, "Unmarshal %s", m)}// 响应的消息var res interface{}// "join" 事件处理if action.Message.Action == "join" {// join_1 : 解析客户端发送的消息// join_2 : 获取房间并添加参加者// join_3 : 构造响应信息// join_4 : 通知 join 事件// "publish" 事件处理} else if action.Message.Action == "publish" {// publish_1 : 解析客户端发送的消息// publish_2 : 获取房间中指定的参与者并修改其发布状态// publish_3 : 通知 publish 事件// "control" 事件处理} else if action.Message.Action == "control" {// control_1 : 解析客户端发送的消息// control_2 : 获取房间中指定的参与者// control_3 : 通知 control 事件} else {return errors.Errorf("Invalid message %s", m)}// 封装响应消息...return nil}// 处理接受到的消息for m := range inMessages {if err := handleMessage(m); err != nil {logger.Wf(ctx, "Handle %s err %v", m, err)break}}
}()// WebSocket 发送响应消息
for m := range outMessages {if _, err := c.Write(m); err != nil {logger.Wf(ctx, "Ignore err %v for %v", err, r.RemoteAddr)break}
}
参考与致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。
【1】WebRTC Native APIs
【2】WebRTC API
【3】webRTC专题