fix(transport): pin peer channel after validation

This commit is contained in:
zarazaex69
2026-05-15 22:53:58 +03:00
parent 75e2674f48
commit 7f9351dad6
4 changed files with 58 additions and 28 deletions
+28 -13
View File
@@ -446,31 +446,34 @@ func (p *streamTransport) handleSample(sample []byte) {
continue
}
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other
// peers — or RTP echo from previously-closed sessions to our
// PeerConnection. The first valid frame we see fixes the peer's
// channelID; later frames with a different ID are silently dropped.
if !p.acceptChannel(frame.channelID) {
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers,
// or RTP echo from previously-closed sessions, to our PeerConnection.
// Once we've identified the real partner's channelID, drop everything
// else. We can't pin the partner from a raw frame header alone — a
// stray RTP packet might decode to a valid magic/version by chance —
// so the pin happens downstream, only after a CRC-validated payload
// (DATA) or a matching ACK waiter has confirmed the sender is ours.
if pinned := p.peerChannelID.Load(); pinned != 0 && frame.channelID != pinned {
continue
}
switch frame.typ {
case frameTypeAck:
p.resolveAck(frame.seq, frame.crc)
p.resolveAck(frame.channelID, frame.seq, frame.crc)
case frameTypeData:
p.handleInboundFrame(frame)
}
}
}
func (p *streamTransport) acceptChannel(id uint32) bool {
// pinPeerChannel commits the partner's channelID after a frame from them has
// been validated downstream. It's a one-shot CAS — later validated frames
// keep the same partner. id==0 is never accepted.
func (p *streamTransport) pinPeerChannel(id uint32) {
if id == 0 {
return false
return
}
if p.peerChannelID.CompareAndSwap(0, id) {
return true
}
return p.peerChannelID.Load() == id
p.peerChannelID.CompareAndSwap(0, id)
}
func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) {
@@ -511,6 +514,9 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.recvMu.Lock()
if crc, ok := p.delivered[frame.seq]; ok && crc == frame.crc {
p.recvMu.Unlock()
// Already-delivered duplicate: the peer is genuine (we accepted
// this seq earlier and CRC-matched it), so pin and re-ack.
p.pinPeerChannel(frame.channelID)
p.sendAck(frame.seq, frame.crc)
return
}
@@ -535,6 +541,11 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.delivered[frame.seq] = msg.crc
p.recvMu.Unlock()
// CRC validated end-to-end — this is our real partner. Pin their
// channelID so future stray frames from other MUC participants are
// dropped before reaching the reassembler.
p.pinPeerChannel(frame.channelID)
if p.onData != nil {
p.onData(data)
}
@@ -545,7 +556,7 @@ func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true)
}
func (p *streamTransport) resolveAck(seq, crc uint32) {
func (p *streamTransport) resolveAck(channelID, seq, crc uint32) {
p.ackMu.Lock()
waiter := p.ackWaiters[seq]
p.ackMu.Unlock()
@@ -554,6 +565,10 @@ func (p *streamTransport) resolveAck(seq, crc uint32) {
return
}
// The ACK matched a seq we're actually waiting for, so it came from our
// real partner; pin their channelID for downstream filtering.
p.pinPeerChannel(channelID)
select {
case waiter <- crc:
default:
@@ -162,7 +162,7 @@ func TestSendAckAndClosePaths(t *testing.T) {
if err != nil {
t.Fatalf("decodeTransportFrame() error = %v", err)
}
tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload))
tr.resolveAck(decoded.channelID, decoded.seq, crc32.ChecksumIEEE(payload))
case <-time.After(time.Second):
t.Fatal("Send() did not enqueue frame")
}
+28 -13
View File
@@ -547,30 +547,33 @@ func (p *streamTransport) handleFrame(frame []byte) {
return
}
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers or
// video echo from previously-closed sessions to our PeerConnection.
// The first valid frame we see fixes the peer's channelID; later frames
// with a different ID are silently dropped.
if !p.acceptChannel(decoded.channelID) {
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers, or
// video echo from previously-closed sessions, to our PeerConnection.
// Once we've identified the real partner's channelID, drop everything
// else. We can't pin the partner from a raw frame header alone — a stray
// video frame might decode to a valid magic/version by chance — so the
// pin happens downstream, only after a CRC-validated payload (DATA) or a
// matching ACK waiter has confirmed the sender is ours.
if pinned := p.peerChannelID.Load(); pinned != 0 && decoded.channelID != pinned {
return
}
switch decoded.typ {
case frameTypeAck:
p.resolveAck(decoded.seq, decoded.crc)
p.resolveAck(decoded.channelID, decoded.seq, decoded.crc)
case frameTypeData:
p.handleInboundFrame(decoded)
}
}
func (p *streamTransport) acceptChannel(id uint32) bool {
// pinPeerChannel commits the partner's channelID after a frame from them has
// been validated downstream. It's a one-shot CAS — later validated frames
// keep the same partner. id==0 is never accepted.
func (p *streamTransport) pinPeerChannel(id uint32) {
if id == 0 {
return false
return
}
if p.peerChannelID.CompareAndSwap(0, id) {
return true
}
return p.peerChannelID.Load() == id
p.peerChannelID.CompareAndSwap(0, id)
}
func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) {
@@ -611,6 +614,9 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.recvMu.Lock()
if crc, ok := p.delivered[frame.seq]; ok && crc == frame.crc {
p.recvMu.Unlock()
// Already-delivered duplicate: the peer is genuine (we accepted
// this seq earlier and CRC-matched it), so pin and re-ack.
p.pinPeerChannel(frame.channelID)
p.sendAck(frame.seq, frame.crc)
return
}
@@ -635,6 +641,11 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.delivered[frame.seq] = msg.crc
p.recvMu.Unlock()
// CRC validated end-to-end — this is our real partner. Pin their
// channelID so future stray frames from other MUC participants are
// dropped before reaching the reassembler.
p.pinPeerChannel(frame.channelID)
if p.onData != nil {
p.onData(data)
}
@@ -645,7 +656,7 @@ func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true)
}
func (p *streamTransport) resolveAck(seq, crc uint32) {
func (p *streamTransport) resolveAck(channelID, seq, crc uint32) {
p.ackMu.Lock()
waiter := p.ackWaiters[seq]
p.ackMu.Unlock()
@@ -654,6 +665,10 @@ func (p *streamTransport) resolveAck(seq, crc uint32) {
return
}
// The ACK matched a seq we're actually waiting for, so it came from our
// real partner; pin their channelID for downstream filtering.
p.pinPeerChannel(channelID)
select {
case waiter <- crc:
default:
@@ -153,7 +153,7 @@ func TestSendAckAndClosePaths(t *testing.T) {
if err != nil {
t.Fatalf("decodeTransportFrame() error = %v", err)
}
tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload))
tr.resolveAck(decoded.channelID, decoded.seq, crc32.ChecksumIEEE(payload))
case <-time.After(time.Second):
t.Fatal("Send() did not enqueue frame")
}