feat(salutejazz): support ICE and staged video offers

This commit is contained in:
zarazaex69
2026-05-14 18:39:10 +03:00
parent c6c4fd10a4
commit 1c43379448
3 changed files with 286 additions and 39 deletions
+2 -1
View File
@@ -30,7 +30,8 @@ const (
contentTypeJSON = "application/json"
jazzOrigin = "https://salutejazz.ru"
jazzReferer = jazzOrigin + "/"
jazzUA = "osName=Linux;osVersion=;appName=jazz;appVersion=26.21.7;surface=WEB;browserName=Firefox;browserVersion=150.0"
jazzUA = "osName=Linux;osVersion=;appName=jazz;appVersion=26.21.7;" +
"surface=WEB;browserName=Firefox;browserVersion=150.0"
)
var apiBase = "https://bk.salutejazz.ru" //nolint:gochecknoglobals // package-level state intentional
+255 -38
View File
@@ -18,6 +18,7 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/protect"
"github.com/pion/webrtc/v4"
"github.com/pion/webrtc/v4/pkg/media"
)
const (
@@ -35,7 +36,16 @@ const (
payloadMethod = "method"
payloadTrack = "track"
payloadType = "type"
payloadDesc = "description"
payloadSDP = "sdp"
payloadAnswer = "answer"
payloadOffer = "offer"
methodOffer = "rtc:offer"
trackTypeAudio = "AUDIO"
trackTypeVideo = "VIDEO"
trackSourceMic = "MICROPHONE"
trackSourceCam = "CAMERA"
credentialKeyPassword = "password"
@@ -47,8 +57,11 @@ const (
sendQueueTimeout = 50 * time.Millisecond
closeWaitTimeout = 2 * time.Second
subscriberOfferGap = 300 * time.Millisecond
audioFrameDuration = 20 * time.Millisecond
)
var opusSilenceFrame = []byte{0xf8, 0xff, 0xfe} //nolint:gochecknoglobals // static Opus silence frame
var (
// ErrPublisherNotInitialized is returned when the publisher peer connection is not set up.
ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized")
@@ -94,12 +107,16 @@ type Session struct {
sessionCloseCh chan struct{}
videoTrackMu sync.RWMutex
videoTracks []webrtc.TrackLocal
audioTrack *webrtc.TrackLocalStaticSample
onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
subscriberReady atomic.Bool
publisherReady atomic.Bool
videoOffered atomic.Bool
subscriberConn chan struct{}
publisherConn chan struct{}
videoNegotiated chan struct{}
wg sync.WaitGroup
groupIDMu sync.RWMutex
groupID string
}
@@ -123,17 +140,18 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) {
}
return &Session{
name: cfg.Name,
connectorURL: cfg.URL,
roomID: roomID,
password: password,
onData: cfg.OnData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, defaultSendQueueSize),
subscriberConn: make(chan struct{}),
publisherConn: make(chan struct{}),
name: cfg.Name,
connectorURL: cfg.URL,
roomID: roomID,
password: password,
onData: cfg.OnData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, defaultSendQueueSize),
subscriberConn: make(chan struct{}),
publisherConn: make(chan struct{}),
videoNegotiated: make(chan struct{}),
}, nil
}
@@ -145,8 +163,11 @@ func (s *Session) Capabilities() engine.Capabilities {
func (s *Session) resetMediaState() {
s.subscriberReady.Store(false)
s.publisherReady.Store(false)
s.videoOffered.Store(false)
s.subscriberConn = make(chan struct{})
s.publisherConn = make(chan struct{})
s.videoNegotiated = make(chan struct{})
s.audioTrack = nil
}
func closeSignal(ch chan struct{}) {
@@ -170,17 +191,63 @@ func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPRecei
}
func (s *Session) attachPendingVideoTracks() error {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
s.videoTrackMu.Lock()
defer s.videoTrackMu.Unlock()
for _, track := range s.videoTracks {
if _, err := s.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("failed to add track: %w", err)
if len(s.videoTracks) > 0 {
if err := s.ensurePublisherAudioTrackLocked(); err != nil {
return err
}
}
return nil
}
func (s *Session) ensurePublisherAudioTrackLocked() error {
if s.audioTrack != nil {
return nil
}
track, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
},
"microphone",
"olcrtc",
)
if err != nil {
return fmt.Errorf("create audio track: %w", err)
}
if _, err := s.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("add audio track: %w", err)
}
s.audioTrack = track
s.wg.Add(1)
go s.writeAudioSilence(track)
return nil
}
func (s *Session) writeAudioSilence(track *webrtc.TrackLocalStaticSample) {
defer s.wg.Done()
ticker := time.NewTicker(audioFrameDuration)
defer ticker.Stop()
for {
select {
case <-s.closeCh:
return
case <-ticker.C:
_ = track.WriteSample(media.Sample{
Data: opusSilenceFrame,
Duration: audioFrameDuration,
})
}
}
}
func defaultWebRTCConfig() webrtc.Configuration {
return webrtc.Configuration{
ICEServers: []webrtc.ICEServer{},
@@ -215,15 +282,33 @@ func (s *Session) createPeerConnections(api *webrtc.API, config webrtc.Configura
cb(track, receiver)
}
})
s.pcSub.OnICECandidate(func(candidate *webrtc.ICECandidate) {
s.sendICECandidate(candidate, "SUBSCRIBER")
})
s.pcPub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("create publisher pc: %w", err)
}
s.pcPub.OnConnectionStateChange(s.onPublisherConnectionStateChange)
s.pcPub.OnICECandidate(func(candidate *webrtc.ICECandidate) {
s.sendICECandidate(candidate, "PUBLISHER")
})
return nil
}
func (s *Session) setGroupID(groupID string) {
s.groupIDMu.Lock()
s.groupID = groupID
s.groupIDMu.Unlock()
}
func (s *Session) getGroupID() string {
s.groupIDMu.RLock()
defer s.groupIDMu.RUnlock()
return s.groupID
}
func (s *Session) createDataChannel() (chan struct{}, error) {
var err error
s.dc, err = s.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{
@@ -308,7 +393,7 @@ func (s *Session) waitForMediaReady(ctx context.Context, timeout time.Duration)
}
select {
case <-s.publisherConn:
case <-s.videoNegotiated:
case <-timer.C:
return ErrPublisherMediaTimeout
case <-ctx.Done():
@@ -481,8 +566,9 @@ func (s *Session) handleSignaling(_ context.Context) {
func (s *Session) handleJoinResponse(payload map[string]any) {
group, _ := payload["participantGroup"].(map[string]any)
s.groupID, _ = group["groupId"].(string)
logger.Verbosef("[salutejazz] peer joined: groupId=%s", s.groupID)
groupID, _ := group["groupId"].(string)
s.setGroupID(groupID)
logger.Verbosef("[salutejazz] peer joined: groupId=%s", groupID)
}
func (s *Session) handleMediaOut(payload map[string]any) {
@@ -541,8 +627,8 @@ func (s *Session) handleRTCConfig(payload map[string]any) {
}
func (s *Session) handleSubscriberOffer(payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
desc, _ := payload[payloadDesc].(map[string]any)
sdp, _ := desc[payloadSDP].(string)
if err := s.pcSub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
@@ -567,13 +653,13 @@ func (s *Session) handleSubscriberOffer(payload map[string]any) {
_ = s.ws.WriteJSON(map[string]any{
keyRoomID: s.roomID,
keyEvent: eventMediaIn,
keyGroupID: s.groupID,
keyGroupID: s.getGroupID(),
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
payloadMethod: "rtc:answer",
"description": map[string]any{
payloadDesc: map[string]any{
payloadType: payloadAnswer,
"sdp": answer.SDP,
payloadSDP: answer.SDP,
},
},
})
@@ -584,7 +670,7 @@ func (s *Session) handleSubscriberOffer(payload map[string]any) {
}
func (s *Session) sendPublisherOffer() {
if err := s.sendPublisherTrackAdds(); err != nil {
if err := s.sendPublisherAudioTrackAdd(); err != nil {
logger.Debugf("send publisher track add error: %v", err)
return
}
@@ -604,33 +690,92 @@ func (s *Session) sendPublisherOffer() {
_ = s.ws.WriteJSON(map[string]any{
keyRoomID: s.roomID,
keyEvent: "media-in",
"groupId": s.groupID,
"groupId": s.getGroupID(),
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
"method": "rtc:offer",
"description": map[string]any{
"type": "offer",
"sdp": offer.SDP,
payloadMethod: methodOffer,
payloadDesc: map[string]any{
payloadType: payloadOffer,
payloadSDP: offer.SDP,
},
},
})
s.wsMu.Unlock()
}
func (s *Session) sendPublisherTrackAdds() error {
func (s *Session) sendPublisherAudioTrackAdd() error {
s.videoTrackMu.RLock()
tracks := append([]webrtc.TrackLocal(nil), s.videoTracks...)
hasAudioTrack := s.audioTrack != nil
s.videoTrackMu.RUnlock()
if hasAudioTrack {
return s.sendPublisherTrackAdd(trackTypeAudio, trackSourceMic, true)
}
return nil
}
func (s *Session) sendPublisherVideoOffer() {
tracks, ok := s.addPublisherVideoTracks()
if !ok {
return
}
for _, track := range tracks {
if track == nil || track.Kind() != webrtc.RTPCodecTypeVideo {
continue
}
if err := s.sendPublisherTrackAdd("VIDEO", "CAMERA", false); err != nil {
return err
if err := s.sendPublisherTrackAdd(trackTypeVideo, trackSourceCam, false); err != nil {
logger.Debugf("send publisher video track add error: %v", err)
return
}
}
return nil
offer, err := s.pcPub.CreateOffer(nil)
if err != nil {
logger.Debugf("create pub video offer error: %v", err)
return
}
if err := s.pcPub.SetLocalDescription(offer); err != nil {
logger.Debugf("set local pub video desc error: %v", err)
return
}
s.wsMu.Lock()
_ = s.ws.WriteJSON(map[string]any{
keyRoomID: s.roomID,
keyEvent: eventMediaIn,
keyGroupID: s.getGroupID(),
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
payloadMethod: methodOffer,
payloadDesc: map[string]any{
payloadType: payloadOffer,
payloadSDP: offer.SDP,
},
},
})
s.wsMu.Unlock()
}
func (s *Session) addPublisherVideoTracks() ([]webrtc.TrackLocal, bool) {
s.videoTrackMu.Lock()
defer s.videoTrackMu.Unlock()
if s.videoOffered.Load() {
return nil, false
}
tracks := append([]webrtc.TrackLocal(nil), s.videoTracks...)
for _, track := range tracks {
if track == nil || track.Kind() != webrtc.RTPCodecTypeVideo {
continue
}
if _, err := s.pcPub.AddTrack(track); err != nil {
logger.Debugf("add publisher video track error: %v", err)
return nil, false
}
}
s.videoOffered.Store(true)
return tracks, true
}
func (s *Session) sendPublisherTrackAdd(trackType, source string, muted bool) error {
@@ -640,7 +785,7 @@ func (s *Session) sendPublisherTrackAdd(trackType, source string, muted bool) er
if err := s.ws.WriteJSON(map[string]any{
keyRoomID: s.roomID,
keyEvent: eventMediaIn,
keyGroupID: s.groupID,
keyGroupID: s.getGroupID(),
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
payloadMethod: "rtc:track:add",
@@ -657,15 +802,78 @@ func (s *Session) sendPublisherTrackAdd(trackType, source string, muted bool) er
return nil
}
func (s *Session) sendICECandidate(candidate *webrtc.ICECandidate, target string) {
if candidate == nil {
return
}
groupID := s.getGroupID()
if groupID == "" {
logger.Debugf("[salutejazz] drop local ICE candidate before group id target=%s", target)
return
}
s.wsMu.Lock()
defer s.wsMu.Unlock()
if s.ws == nil || s.closed.Load() {
return
}
if err := s.ws.WriteJSON(map[string]any{
keyRoomID: s.roomID,
keyEvent: eventMediaIn,
keyGroupID: groupID,
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
payloadMethod: "rtc:ice",
"rtcIceCandidates": []any{jazzICECandidatePayload(candidate.ToJSON(), target)},
},
}); err != nil {
logger.Debugf("[salutejazz] send local ICE candidate error: %v", err)
}
}
func jazzICECandidatePayload(candidate webrtc.ICECandidateInit, target string) map[string]any {
sdpMid := ""
if candidate.SDPMid != nil {
sdpMid = *candidate.SDPMid
}
sdpMLineIndex := uint16(0)
if candidate.SDPMLineIndex != nil {
sdpMLineIndex = *candidate.SDPMLineIndex
}
usernameFragment := ""
if candidate.UsernameFragment != nil {
usernameFragment = *candidate.UsernameFragment
}
return map[string]any{
"candidate": candidate.Candidate,
"sdpMid": sdpMid,
"sdpMLineIndex": sdpMLineIndex,
"usernameFragment": usernameFragment,
"target": target,
}
}
func (s *Session) handlePublisherAnswer(payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
desc, _ := payload[payloadDesc].(map[string]any)
sdp, _ := desc[payloadSDP].(string)
if err := s.pcPub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
logger.Debugf("set remote pub desc error: %v", err)
return
}
if s.hasLocalVideoTracks() && !s.videoOffered.Load() {
s.sendPublisherVideoOffer()
return
}
if s.videoOffered.Load() {
closeSignal(s.videoNegotiated)
}
}
@@ -787,11 +995,20 @@ func (s *Session) Close() error {
func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error {
s.videoTrackMu.Lock()
s.videoTracks = append(s.videoTracks, track)
if s.pcPub != nil && s.audioTrack == nil {
if err := s.ensurePublisherAudioTrackLocked(); err != nil {
s.videoTrackMu.Unlock()
return err
}
}
s.videoTrackMu.Unlock()
if s.pcPub == nil {
return nil
}
if !s.videoOffered.Load() {
return nil
}
if _, err := s.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("failed to add track: %w", err)
}
@@ -162,6 +162,35 @@ func TestSendPublisherTrackAddWritesJazzPayload(t *testing.T) {
assertJazzTrackAddPayload(t, msg[keyPayload])
}
func TestJazzICECandidatePayload(t *testing.T) {
sdpMid := "0"
sdpMLineIndex := uint16(1)
usernameFragment := "ufrag-1"
got := jazzICECandidatePayload(webrtc.ICECandidateInit{
Candidate: "candidate:1 1 udp 1 127.0.0.1 12345 typ host",
SDPMid: &sdpMid,
SDPMLineIndex: &sdpMLineIndex,
UsernameFragment: &usernameFragment,
}, "PUBLISHER")
if got["candidate"] != "candidate:1 1 udp 1 127.0.0.1 12345 typ host" {
t.Fatalf("candidate = %v", got["candidate"])
}
if got["sdpMid"] != "0" {
t.Fatalf("sdpMid = %v, want 0", got["sdpMid"])
}
if got["sdpMLineIndex"] != uint16(1) {
t.Fatalf("sdpMLineIndex = %v, want 1", got["sdpMLineIndex"])
}
if got["usernameFragment"] != "ufrag-1" {
t.Fatalf("usernameFragment = %v, want ufrag-1", got["usernameFragment"])
}
if got["target"] != "PUBLISHER" {
t.Fatalf("target = %v, want PUBLISHER", got["target"])
}
}
func assertJazzTrackAddEnvelope(t *testing.T, msg map[string]any) {
t.Helper()