mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-01 22:18:52 +02:00
fix(jitsi): reuse xmpp session on reconnect. fix #82
This commit is contained in:
@@ -17,7 +17,7 @@ require (
|
||||
github.com/xtaci/kcp-go/v5 v5.6.72
|
||||
github.com/xtaci/smux v1.5.57
|
||||
github.com/zarazaex69/gr v0.1.5
|
||||
github.com/zarazaex69/j v0.0.0-20260528155819-956a22114ef6
|
||||
github.com/zarazaex69/j v0.0.0-20260529190049-6e5d2c4287ce
|
||||
golang.org/x/crypto v0.52.0
|
||||
golang.org/x/mobile v0.0.0-20260520154334-0e4426e1883d
|
||||
golang.org/x/sys v0.45.0
|
||||
|
||||
@@ -235,6 +235,12 @@ github.com/zarazaex69/gr v0.1.5 h1:Lpr4WrNXHL/ma65ZXeQH2UyFi6HziJoqqjaYCLisOQY=
|
||||
github.com/zarazaex69/gr v0.1.5/go.mod h1:hAk5j/s2QFlvr3bJkjPwakZENsMBx6VOUN1GwYzYoRw=
|
||||
github.com/zarazaex69/j v0.0.0-20260528155819-956a22114ef6 h1:0VjePKBshNBf3dWThq7DAT+Sm6uKlzBajabFkB0RNQo=
|
||||
github.com/zarazaex69/j v0.0.0-20260528155819-956a22114ef6/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc=
|
||||
github.com/zarazaex69/j v0.0.0-20260529182741-8fecd70f2627 h1:V6n6YSffo3I3pL31ThlvSxKQGWkP4F2anrnghIMVgyE=
|
||||
github.com/zarazaex69/j v0.0.0-20260529182741-8fecd70f2627/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc=
|
||||
github.com/zarazaex69/j v0.0.0-20260529185404-2981d5ea19f1 h1:Xw2LVb7GHIXWUVVWilbM9UKpuDqUlPanufRLtG2k2yA=
|
||||
github.com/zarazaex69/j v0.0.0-20260529185404-2981d5ea19f1/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc=
|
||||
github.com/zarazaex69/j v0.0.0-20260529190049-6e5d2c4287ce h1:jLs7a3nd3U9TR++LDKt3HyQbAruYvvgwlbdGPaV2x3w=
|
||||
github.com/zarazaex69/j v0.0.0-20260529190049-6e5d2c4287ce/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc=
|
||||
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
||||
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
|
||||
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
|
||||
|
||||
@@ -117,6 +117,7 @@ type Session struct {
|
||||
done chan struct{}
|
||||
doneOnce sync.Once
|
||||
cancel context.CancelFunc
|
||||
trickleCancel context.CancelFunc
|
||||
runCtx context.Context //nolint:containedctx // engine owns the supervisor lifetime
|
||||
wg sync.WaitGroup
|
||||
|
||||
@@ -508,7 +509,9 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge
|
||||
// closes that race window - any source-add Jicofo emits is picked up
|
||||
// the instant it lands on the wire.
|
||||
s.wg.Add(1)
|
||||
go s.trickleDrainLoop(pc, neg, jSess.LowLevel().Stanzas())
|
||||
trickleCtx, trickleCancel := context.WithCancel(context.Background())
|
||||
s.trickleCancel = trickleCancel
|
||||
go s.trickleDrainLoop(trickleCtx, pc, neg, jSess.LowLevel().Stanzas())
|
||||
|
||||
if sctpBridge {
|
||||
if err := jSess.PrepareBridgeSCTP(pc); err != nil {
|
||||
@@ -596,10 +599,12 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) {
|
||||
// Incoming source-add stanzas (announcing other participants' SSRCs) are
|
||||
// merged into the remote SDP via neg.HandleSourceAdd so pion can route the
|
||||
// inbound RTP through OnTrack.
|
||||
func (s *Session) trickleDrainLoop(pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string) {
|
||||
func (s *Session) trickleDrainLoop(ctx context.Context, pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string) {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.done:
|
||||
return
|
||||
case raw, ok := <-stanzas:
|
||||
@@ -1204,22 +1209,92 @@ func (s *Session) reconnect(ctx context.Context) error {
|
||||
defer s.reconnecting.Store(false)
|
||||
|
||||
s.bridgeReady.Store(false)
|
||||
if old := s.jSess.Swap(nil); old != nil {
|
||||
_ = old.Close()
|
||||
}
|
||||
|
||||
// Close PC only — keep the XMPP session alive.
|
||||
s.pcMu.Lock()
|
||||
oldPC := s.pc
|
||||
s.pc = nil
|
||||
s.pcMu.Unlock()
|
||||
if s.trickleCancel != nil {
|
||||
s.trickleCancel()
|
||||
s.trickleCancel = nil
|
||||
}
|
||||
if oldPC != nil {
|
||||
_ = oldPC.Close()
|
||||
}
|
||||
|
||||
s.localEpoch.Store(randomEpoch())
|
||||
s.peerEpoch.Store(0)
|
||||
s.resetPeerEpochs()
|
||||
s.drainSendQueue()
|
||||
|
||||
jSess := s.jSess.Load()
|
||||
if jSess == nil {
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
|
||||
// Rejoin MUC (leave + join) without waiting for session-initiate.
|
||||
// This resets Jicofo's state for our participant so it will send
|
||||
// a fresh session-initiate when another peer arrives.
|
||||
logger.Infof("jitsi: rejoin %s/%s (non-blocking) ...", s.host, s.room)
|
||||
if err := jSess.Rejoin(ctx, s.name); err != nil {
|
||||
logger.Warnf("jitsi: rejoin failed: %v — full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
|
||||
// Wait for Jicofo to send session-initiate (when a peer joins the room).
|
||||
logger.Infof("jitsi: waiting for session-initiate in %s/%s ...", s.host, s.room)
|
||||
if _, err := jSess.WaitJingleReinitiate(ctx); err != nil {
|
||||
logger.Warnf("jitsi: wait reinitiate failed: %v — full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
|
||||
// Got session-initiate — negotiate PC and open bridge.
|
||||
sctpBridge := jSess.ColibriWS == ""
|
||||
if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil {
|
||||
logger.Warnf("jitsi: negotiate after reinitiate failed: %v — full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
if sctpBridge {
|
||||
if err := s.openBridgeSCTP(ctx, jSess); err != nil {
|
||||
logger.Warnf("jitsi: bridge after reinitiate failed: %v — full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
} else {
|
||||
if err := s.openBridgeWS(ctx, jSess); err != nil {
|
||||
logger.Warnf("jitsi: bridge after reinitiate failed: %v — full reconnect", err)
|
||||
return s.reconnectFull(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
s.peerEndpoint.Store(nil)
|
||||
s.peerVideoSSRC.Store(0)
|
||||
s.bridgeReady.Store(true)
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.recvLoop()
|
||||
|
||||
if err := s.Send(nil); err != nil {
|
||||
logger.Debugf("jitsi: epoch announce failed: %v", err)
|
||||
}
|
||||
if s.onReconnect != nil {
|
||||
s.onReconnect(nil)
|
||||
}
|
||||
logger.Infof("jitsi: reconnected %s/%s (reinitiate); colibri-ws=%s", s.host, s.room, jSess.ColibriWS)
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconnectFull tears down everything and does a full rejoin (blocking on session-initiate).
|
||||
func (s *Session) reconnectFull(ctx context.Context) error {
|
||||
if old := s.jSess.Swap(nil); old != nil {
|
||||
_ = old.Close()
|
||||
}
|
||||
s.localEpoch.Store(randomEpoch())
|
||||
s.peerEpoch.Store(0)
|
||||
s.resetPeerEpochs()
|
||||
s.drainSendQueue()
|
||||
|
||||
logger.Infof("jitsi: reconnecting %s/%s as %s ...", s.host, s.room, s.name)
|
||||
logger.Infof("jitsi: full reconnect %s/%s as %s ...", s.host, s.room, s.name)
|
||||
jSess, err := s.joinAndOpenBridge(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1235,11 +1310,10 @@ func (s *Session) reconnect(ctx context.Context) error {
|
||||
if err := s.Send(nil); err != nil {
|
||||
logger.Debugf("jitsi: epoch announce failed: %v", err)
|
||||
}
|
||||
|
||||
if s.onReconnect != nil {
|
||||
s.onReconnect(nil)
|
||||
}
|
||||
logger.Infof("jitsi: reconnected %s/%s; colibri-ws=%s", s.host, s.room, jSess.ColibriWS)
|
||||
logger.Infof("jitsi: reconnected %s/%s (full); colibri-ws=%s", s.host, s.room, jSess.ColibriWS)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -526,8 +526,12 @@ func (s *Server) serveSingle(ctx context.Context) {
|
||||
if contextDone(ctx) {
|
||||
return
|
||||
}
|
||||
hadSession := s.handshakeReady()
|
||||
logger.Debugf("AcceptStream returned %v - reinstalling session", err)
|
||||
s.reinstallSession(sess)
|
||||
if hadSession && s.ln != nil {
|
||||
s.ln.Reconnect("liveness")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user