mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-02 06:23:37 +02:00
fix: golangci
This commit is contained in:
@@ -509,7 +509,7 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge
|
||||
// closes that race window - any source-add Jicofo emits is picked up
|
||||
// the instant it lands on the wire.
|
||||
s.wg.Add(1)
|
||||
trickleCtx, trickleCancel := context.WithCancel(context.Background())
|
||||
trickleCtx, trickleCancel := context.WithCancel(ctx)
|
||||
s.trickleCancel = trickleCancel
|
||||
go s.trickleDrainLoop(trickleCtx, pc, neg, jSess.LowLevel().Stanzas())
|
||||
|
||||
@@ -599,7 +599,9 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) {
|
||||
// Incoming source-add stanzas (announcing other participants' SSRCs) are
|
||||
// merged into the remote SDP via neg.HandleSourceAdd so pion can route the
|
||||
// inbound RTP through OnTrack.
|
||||
func (s *Session) trickleDrainLoop(ctx context.Context, pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string) {
|
||||
func (s *Session) trickleDrainLoop(
|
||||
ctx context.Context, pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string,
|
||||
) {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
@@ -1209,19 +1211,7 @@ func (s *Session) reconnect(ctx context.Context) error {
|
||||
defer s.reconnecting.Store(false)
|
||||
|
||||
s.bridgeReady.Store(false)
|
||||
|
||||
// Close PC only - keep the XMPP session alive.
|
||||
s.pcMu.Lock()
|
||||
oldPC := s.pc
|
||||
s.pc = nil
|
||||
s.pcMu.Unlock()
|
||||
if s.trickleCancel != nil {
|
||||
s.trickleCancel()
|
||||
s.trickleCancel = nil
|
||||
}
|
||||
if oldPC != nil {
|
||||
_ = oldPC.Close()
|
||||
}
|
||||
s.teardownPC()
|
||||
|
||||
s.localEpoch.Store(randomEpoch())
|
||||
s.peerEpoch.Store(0)
|
||||
@@ -1249,22 +1239,8 @@ func (s *Session) reconnect(ctx context.Context) error {
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
|
||||
// Got session-initiate - negotiate PC and open bridge.
|
||||
sctpBridge := jSess.ColibriWS == ""
|
||||
if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil {
|
||||
logger.Warnf("jitsi: negotiate after reinitiate failed: %v - full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
if sctpBridge {
|
||||
if err := s.openBridgeSCTP(ctx, jSess); err != nil {
|
||||
logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
} else {
|
||||
if err := s.openBridgeWS(ctx, jSess); err != nil {
|
||||
logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
if err := s.reinitiateBridge(ctx, jSess); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.peerEndpoint.Store(nil)
|
||||
@@ -1284,6 +1260,42 @@ func (s *Session) reconnect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// teardownPC closes the current PeerConnection and cancels the trickle loop.
|
||||
func (s *Session) teardownPC() {
|
||||
s.pcMu.Lock()
|
||||
oldPC := s.pc
|
||||
s.pc = nil
|
||||
s.pcMu.Unlock()
|
||||
if s.trickleCancel != nil {
|
||||
s.trickleCancel()
|
||||
s.trickleCancel = nil
|
||||
}
|
||||
if oldPC != nil {
|
||||
_ = oldPC.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// reinitiateBridge negotiates a new PeerConnection and opens the bridge channel.
|
||||
func (s *Session) reinitiateBridge(ctx context.Context, jSess *j.Session) error {
|
||||
sctpBridge := jSess.ColibriWS == ""
|
||||
if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil {
|
||||
logger.Warnf("jitsi: negotiate after reinitiate failed: %v - full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
if sctpBridge {
|
||||
if err := s.openBridgeSCTP(ctx, jSess); err != nil {
|
||||
logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
} else {
|
||||
if err := s.openBridgeWS(ctx, jSess); err != nil {
|
||||
logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconnectFull tears down everything and does a full rejoin (blocking on session-initiate).
|
||||
func (s *Session) reconnectFull(ctx context.Context) error {
|
||||
if old := s.jSess.Swap(nil); old != nil {
|
||||
|
||||
@@ -523,15 +523,9 @@ func (s *Server) serveSingle(ctx context.Context) {
|
||||
|
||||
stream, err := sess.AcceptStream()
|
||||
if err != nil {
|
||||
if contextDone(ctx) {
|
||||
if s.handleAcceptError(ctx, sess) {
|
||||
return
|
||||
}
|
||||
hadSession := s.handshakeReady()
|
||||
logger.Debugf("AcceptStream returned %v - reinstalling session", err)
|
||||
s.reinstallSession(sess)
|
||||
if hadSession && s.ln != nil {
|
||||
s.ln.Reconnect("liveness")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -543,6 +537,20 @@ func (s *Server) serveSingle(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// handleAcceptError handles a failed AcceptStream. Returns true if the server should stop.
|
||||
func (s *Server) handleAcceptError(ctx context.Context, sess *smux.Session) bool {
|
||||
if contextDone(ctx) {
|
||||
return true
|
||||
}
|
||||
hadSession := s.handshakeReady()
|
||||
logger.Debugf("AcceptStream returned error - reinstalling session")
|
||||
s.reinstallSession(sess)
|
||||
if hadSession && s.ln != nil {
|
||||
s.ln.Reconnect("liveness")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Server) currentSessionID() string {
|
||||
s.sessMu.RLock()
|
||||
defer s.sessMu.RUnlock()
|
||||
|
||||
Reference in New Issue
Block a user