diff --git a/go.mod b/go.mod index 1b94c8c..7ff2369 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/xtaci/kcp-go/v5 v5.6.72 github.com/xtaci/smux v1.5.57 github.com/zarazaex69/gr v0.1.5 - github.com/zarazaex69/j v0.0.0-20260528155819-956a22114ef6 + github.com/zarazaex69/j v0.0.0-20260529190049-6e5d2c4287ce golang.org/x/crypto v0.52.0 golang.org/x/mobile v0.0.0-20260520154334-0e4426e1883d golang.org/x/sys v0.45.0 diff --git a/go.sum b/go.sum index 4b38b22..3d9ea0d 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,12 @@ github.com/zarazaex69/gr v0.1.5 h1:Lpr4WrNXHL/ma65ZXeQH2UyFi6HziJoqqjaYCLisOQY= github.com/zarazaex69/gr v0.1.5/go.mod h1:hAk5j/s2QFlvr3bJkjPwakZENsMBx6VOUN1GwYzYoRw= github.com/zarazaex69/j v0.0.0-20260528155819-956a22114ef6 h1:0VjePKBshNBf3dWThq7DAT+Sm6uKlzBajabFkB0RNQo= github.com/zarazaex69/j v0.0.0-20260528155819-956a22114ef6/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc= +github.com/zarazaex69/j v0.0.0-20260529182741-8fecd70f2627 h1:V6n6YSffo3I3pL31ThlvSxKQGWkP4F2anrnghIMVgyE= +github.com/zarazaex69/j v0.0.0-20260529182741-8fecd70f2627/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc= +github.com/zarazaex69/j v0.0.0-20260529185404-2981d5ea19f1 h1:Xw2LVb7GHIXWUVVWilbM9UKpuDqUlPanufRLtG2k2yA= +github.com/zarazaex69/j v0.0.0-20260529185404-2981d5ea19f1/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc= +github.com/zarazaex69/j v0.0.0-20260529190049-6e5d2c4287ce h1:jLs7a3nd3U9TR++LDKt3HyQbAruYvvgwlbdGPaV2x3w= +github.com/zarazaex69/j v0.0.0-20260529190049-6e5d2c4287ce/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index 0981b92..9060648 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -117,6 +117,7 @@ type Session struct { done chan struct{} doneOnce sync.Once cancel context.CancelFunc + trickleCancel context.CancelFunc runCtx context.Context //nolint:containedctx // engine owns the supervisor lifetime wg sync.WaitGroup @@ -508,7 +509,9 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge // closes that race window - any source-add Jicofo emits is picked up // the instant it lands on the wire. s.wg.Add(1) - go s.trickleDrainLoop(pc, neg, jSess.LowLevel().Stanzas()) + trickleCtx, trickleCancel := context.WithCancel(context.Background()) + s.trickleCancel = trickleCancel + go s.trickleDrainLoop(trickleCtx, pc, neg, jSess.LowLevel().Stanzas()) if sctpBridge { if err := jSess.PrepareBridgeSCTP(pc); err != nil { @@ -596,10 +599,12 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) { // Incoming source-add stanzas (announcing other participants' SSRCs) are // merged into the remote SDP via neg.HandleSourceAdd so pion can route the // inbound RTP through OnTrack. -func (s *Session) trickleDrainLoop(pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string) { +func (s *Session) trickleDrainLoop(ctx context.Context, pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string) { defer s.wg.Done() for { select { + case <-ctx.Done(): + return case <-s.done: return case raw, ok := <-stanzas: @@ -1204,22 +1209,92 @@ func (s *Session) reconnect(ctx context.Context) error { defer s.reconnecting.Store(false) s.bridgeReady.Store(false) - if old := s.jSess.Swap(nil); old != nil { - _ = old.Close() - } + + // Close PC only — keep the XMPP session alive. s.pcMu.Lock() oldPC := s.pc s.pc = nil s.pcMu.Unlock() + if s.trickleCancel != nil { + s.trickleCancel() + s.trickleCancel = nil + } if oldPC != nil { _ = oldPC.Close() } + + s.localEpoch.Store(randomEpoch()) + s.peerEpoch.Store(0) + s.resetPeerEpochs() + s.drainSendQueue() + + jSess := s.jSess.Load() + if jSess == nil { + return s.reconnectFull(ctx) + } + + // Rejoin MUC (leave + join) without waiting for session-initiate. + // This resets Jicofo's state for our participant so it will send + // a fresh session-initiate when another peer arrives. + logger.Infof("jitsi: rejoin %s/%s (non-blocking) ...", s.host, s.room) + if err := jSess.Rejoin(ctx, s.name); err != nil { + logger.Warnf("jitsi: rejoin failed: %v — full reconnect", err) + return s.reconnectFull(ctx) + } + + // Wait for Jicofo to send session-initiate (when a peer joins the room). + logger.Infof("jitsi: waiting for session-initiate in %s/%s ...", s.host, s.room) + if _, err := jSess.WaitJingleReinitiate(ctx); err != nil { + logger.Warnf("jitsi: wait reinitiate failed: %v — full reconnect", err) + return s.reconnectFull(ctx) + } + + // Got session-initiate — negotiate PC and open bridge. + sctpBridge := jSess.ColibriWS == "" + if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil { + logger.Warnf("jitsi: negotiate after reinitiate failed: %v — full reconnect", err) + return s.reconnectFull(ctx) + } + if sctpBridge { + if err := s.openBridgeSCTP(ctx, jSess); err != nil { + logger.Warnf("jitsi: bridge after reinitiate failed: %v — full reconnect", err) + return s.reconnectFull(ctx) + } + } else { + if err := s.openBridgeWS(ctx, jSess); err != nil { + logger.Warnf("jitsi: bridge after reinitiate failed: %v — full reconnect", err) + return s.reconnectFull(ctx) + } + } + + s.peerEndpoint.Store(nil) + s.peerVideoSSRC.Store(0) + s.bridgeReady.Store(true) + + s.wg.Add(1) + go s.recvLoop() + + if err := s.Send(nil); err != nil { + logger.Debugf("jitsi: epoch announce failed: %v", err) + } + if s.onReconnect != nil { + s.onReconnect(nil) + } + logger.Infof("jitsi: reconnected %s/%s (reinitiate); colibri-ws=%s", s.host, s.room, jSess.ColibriWS) + return nil +} + +// reconnectFull tears down everything and does a full rejoin (blocking on session-initiate). +func (s *Session) reconnectFull(ctx context.Context) error { + if old := s.jSess.Swap(nil); old != nil { + _ = old.Close() + } s.localEpoch.Store(randomEpoch()) s.peerEpoch.Store(0) s.resetPeerEpochs() s.drainSendQueue() - logger.Infof("jitsi: reconnecting %s/%s as %s ...", s.host, s.room, s.name) + logger.Infof("jitsi: full reconnect %s/%s as %s ...", s.host, s.room, s.name) jSess, err := s.joinAndOpenBridge(ctx) if err != nil { return err @@ -1235,11 +1310,10 @@ func (s *Session) reconnect(ctx context.Context) error { if err := s.Send(nil); err != nil { logger.Debugf("jitsi: epoch announce failed: %v", err) } - if s.onReconnect != nil { s.onReconnect(nil) } - logger.Infof("jitsi: reconnected %s/%s; colibri-ws=%s", s.host, s.room, jSess.ColibriWS) + logger.Infof("jitsi: reconnected %s/%s (full); colibri-ws=%s", s.host, s.room, jSess.ColibriWS) return nil } diff --git a/internal/server/server.go b/internal/server/server.go index a3dffdc..5c07cad 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -526,8 +526,12 @@ func (s *Server) serveSingle(ctx context.Context) { if contextDone(ctx) { return } + hadSession := s.handshakeReady() logger.Debugf("AcceptStream returned %v - reinstalling session", err) s.reinstallSession(sess) + if hadSession && s.ln != nil { + s.ln.Reconnect("liveness") + } continue }