mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-02 06:23:37 +02:00
refactor(jitsi): clean up linter suppressions and test helpers
This commit is contained in:
@@ -55,11 +55,7 @@ const (
|
|||||||
// Without this, the peer's own recovery (which produces a fresh epoch)
|
// Without this, the peer's own recovery (which produces a fresh epoch)
|
||||||
// drives us into an infinite reconnect loop.
|
// drives us into an infinite reconnect loop.
|
||||||
reconnectGrace = 20 * time.Second
|
reconnectGrace = 20 * time.Second
|
||||||
// stableUptime is how long the bridge must stay healthy before the
|
|
||||||
// reconnectCount is reset. Without this, healthy reconnects accumulated
|
|
||||||
// over hours of operation eventually cross maxReconnects and the engine
|
|
||||||
// gives up on a perfectly recoverable failure.
|
|
||||||
stableUptime = 60 * time.Second
|
|
||||||
// xmppKeepaliveInterval keeps the underlying XMPP transport alive while
|
// xmppKeepaliveInterval keeps the underlying XMPP transport alive while
|
||||||
// we wait for a peer. BOSH has no built-in stream management; without
|
// we wait for a peer. BOSH has no built-in stream management; without
|
||||||
// any application traffic Prosody closes the BOSH session after roughly
|
// any application traffic Prosody closes the BOSH session after roughly
|
||||||
@@ -666,7 +662,7 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge
|
|||||||
// after the default 1-minute inactivity timeout, which causes JVB to
|
// after the default 1-minute inactivity timeout, which causes JVB to
|
||||||
// shut down the DTLS session and emit close_notify.
|
// shut down the DTLS session and emit close_notify.
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.rtcpKeepalive(pcCtx, pc)
|
go s.rtcpKeepalive(pcCtx, pc) //nolint:contextcheck // pcCtx intentionally derives from s.runCtx to outlive this call
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -1297,7 +1293,7 @@ func (s *Session) inReconnectGrace() bool {
|
|||||||
// magic. A non-olcrtc participant in the same MUC (a regular Jitsi web
|
// magic. A non-olcrtc participant in the same MUC (a regular Jitsi web
|
||||||
// client, an unrelated bot, etc.) gets filtered out before we ever
|
// client, an unrelated bot, etc.) gets filtered out before we ever
|
||||||
// get here.
|
// get here.
|
||||||
func (s *Session) peerLatchAccepts(from string) bool {
|
func (s *Session) peerLatchAccepts(from string) bool { //nolint:unparam // filter contract; always-true is policy
|
||||||
if cur := s.peerEndpoint.Load(); cur != nil {
|
if cur := s.peerEndpoint.Load(); cur != nil {
|
||||||
if *cur == from {
|
if *cur == from {
|
||||||
return true
|
return true
|
||||||
|
|||||||
@@ -180,11 +180,11 @@ func TestSanitiseNick(t *testing.T) {
|
|||||||
raw string
|
raw string
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"alice", "alice"},
|
{nameAlice, nameAlice},
|
||||||
{"Alice Smith", "Alice-Smith"},
|
{"Alice Smith", "Alice-Smith"},
|
||||||
{"Конрад Олег", "Konrad-Oleg"},
|
{"Конрад Олег", "Konrad-Oleg"},
|
||||||
{"olcrtc-bot42", "olcrtc-bot42"},
|
{"olcrtc-bot42", "olcrtc-bot42"},
|
||||||
{" bob ", "bob"},
|
{" bob ", nameBob},
|
||||||
{"$$$ %%%", ""},
|
{"$$$ %%%", ""},
|
||||||
{"verylongnicknamethatexceedslimit", "verylongnicknamet"[:16]},
|
{"verylongnicknamethatexceedslimit", "verylongnicknamet"[:16]},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -282,7 +282,7 @@ func TestRequestReconnectIdempotent(t *testing.T) {
|
|||||||
js.SetShouldReconnect(func() bool { return true })
|
js.SetShouldReconnect(func() bool { return true })
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < 10; i++ {
|
for range 10 {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|||||||
@@ -48,12 +48,12 @@ package jitsi
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -67,6 +67,16 @@ const (
|
|||||||
envPairedIdle = "OLCRTC_JITSI_PAIRED_IDLE"
|
envPairedIdle = "OLCRTC_JITSI_PAIRED_IDLE"
|
||||||
envPairedChaosInterval = "OLCRTC_JITSI_PAIRED_CHAOS_INTERVAL"
|
envPairedChaosInterval = "OLCRTC_JITSI_PAIRED_CHAOS_INTERVAL"
|
||||||
envPairedVerbose = "OLCRTC_JITSI_PAIRED_VERBOSE"
|
envPairedVerbose = "OLCRTC_JITSI_PAIRED_VERBOSE"
|
||||||
|
|
||||||
|
nameAlice = "alice"
|
||||||
|
nameBob = "bob"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errCastFailed = errors.New("cast to *Session failed")
|
||||||
|
errBridgeTimeout = errors.New("bridge not ready before deadline")
|
||||||
|
errRoundtripTimeout = errors.New("roundtrip timeout")
|
||||||
|
errReceiveTimeout = errors.New("receive timeout")
|
||||||
)
|
)
|
||||||
|
|
||||||
type pairedConfig struct {
|
type pairedConfig struct {
|
||||||
@@ -84,7 +94,7 @@ func (c *pairedConfig) durationLabel() string {
|
|||||||
return c.duration.String()
|
return c.duration.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func readPairedConfig(t *testing.T) *pairedConfig {
|
func readPairedConfig(t *testing.T) *pairedConfig { //nolint:cyclop // config parsing is naturally branchy
|
||||||
t.Helper()
|
t.Helper()
|
||||||
host := strings.TrimSpace(os.Getenv(envPairedHost))
|
host := strings.TrimSpace(os.Getenv(envPairedHost))
|
||||||
if host == "" {
|
if host == "" {
|
||||||
@@ -153,10 +163,10 @@ func (p *pairedInstance) note(b []byte) {
|
|||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pairedInstance) snapshot() (count int64, lastAt time.Time) {
|
func (p *pairedInstance) snapshot() int64 {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
return p.receivedFromOther, p.lastReceiveAt
|
return p.receivedFromOther
|
||||||
}
|
}
|
||||||
|
|
||||||
// startInstance spins up one Session at a time so the second one is
|
// startInstance spins up one Session at a time so the second one is
|
||||||
@@ -178,7 +188,7 @@ func startInstance(ctx context.Context, t *testing.T, cfg *pairedConfig, name st
|
|||||||
js, ok := sess.(*Session)
|
js, ok := sess.(*Session)
|
||||||
if !ok {
|
if !ok {
|
||||||
_ = sess.Close()
|
_ = sess.Close()
|
||||||
return nil, fmt.Errorf("%s: cast to *Session failed", name)
|
return nil, fmt.Errorf("%s: cast to *Session failed: %w", name, errCastFailed)
|
||||||
}
|
}
|
||||||
js.SetShouldReconnect(func() bool { return ctx.Err() == nil })
|
js.SetShouldReconnect(func() bool { return ctx.Err() == nil })
|
||||||
inst.js = js
|
inst.js = js
|
||||||
@@ -201,11 +211,11 @@ func waitForBridge(ctx context.Context, inst *pairedInstance, deadline time.Time
|
|||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return fmt.Errorf("%s: wait for bridge: %w", inst.name, ctx.Err())
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("%s: bridge not ready before deadline", inst.name)
|
return fmt.Errorf("%s: bridge not ready before deadline: %w", inst.name, errBridgeTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pumpLoop sends a small heartbeat payload from `from` to the other
|
// pumpLoop sends a small heartbeat payload from `from` to the other
|
||||||
@@ -229,8 +239,8 @@ func pumpLoop(ctx context.Context, t *testing.T, from *pairedInstance, interval
|
|||||||
seq++
|
seq++
|
||||||
buf := append([]byte(nil), payload...)
|
buf := append([]byte(nil), payload...)
|
||||||
if len(buf) >= 8 {
|
if len(buf) >= 8 {
|
||||||
for i := 0; i < 8; i++ {
|
for i := range 8 {
|
||||||
buf[i] = byte(seq >> (8 * i))
|
buf[i] = byte(seq >> (8 * i)) //nolint:gosec // intentional truncation to byte
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := from.js.Send(buf); err != nil {
|
if err := from.js.Send(buf); err != nil {
|
||||||
@@ -245,7 +255,6 @@ type pairedStats struct {
|
|||||||
cycles int64
|
cycles int64
|
||||||
chaosKicks int64
|
chaosKicks int64
|
||||||
wedgesPair int64 // periods where neither side received any data
|
wedgesPair int64 // periods where neither side received any data
|
||||||
maxObservedRttMs atomic.Int64
|
|
||||||
startedAt time.Time
|
startedAt time.Time
|
||||||
lastChaosAt time.Time
|
lastChaosAt time.Time
|
||||||
bothSidesReceived bool
|
bothSidesReceived bool
|
||||||
@@ -268,7 +277,7 @@ type pairedStats struct {
|
|||||||
// - Either side hits ErrSessionClosed at the engine level
|
// - Either side hits ErrSessionClosed at the engine level
|
||||||
// (the closed flag is the canonical "we gave up" signal).
|
// (the closed flag is the canonical "we gave up" signal).
|
||||||
//
|
//
|
||||||
//nolint:cyclop // chaos cycle structure naturally branches on phase + side
|
//nolint:cyclop,gocognit // chaos cycle structure naturally branches on phase + side
|
||||||
func TestJitsiPairedChaosStress(t *testing.T) {
|
func TestJitsiPairedChaosStress(t *testing.T) {
|
||||||
cfg := readPairedConfig(t)
|
cfg := readPairedConfig(t)
|
||||||
infinite := cfg.duration == 0
|
infinite := cfg.duration == 0
|
||||||
@@ -289,7 +298,7 @@ func TestJitsiPairedChaosStress(t *testing.T) {
|
|||||||
|
|
||||||
// Spin up Alice first so she's already in the room when Bob arrives —
|
// Spin up Alice first so she's already in the room when Bob arrives —
|
||||||
// this guarantees min-participants triggers session-initiate.
|
// this guarantees min-participants triggers session-initiate.
|
||||||
alice, err := startInstance(ctx, t, cfg, "alice")
|
alice, err := startInstance(ctx, t, cfg, nameAlice)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("alice: %v", err)
|
t.Fatalf("alice: %v", err)
|
||||||
}
|
}
|
||||||
@@ -298,7 +307,7 @@ func TestJitsiPairedChaosStress(t *testing.T) {
|
|||||||
// Brief settle so Alice is fully in the MUC before Bob joins.
|
// Brief settle so Alice is fully in the MUC before Bob joins.
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
bob, err := startInstance(ctx, t, cfg, "bob")
|
bob, err := startInstance(ctx, t, cfg, nameBob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("bob: %v", err)
|
t.Fatalf("bob: %v", err)
|
||||||
}
|
}
|
||||||
@@ -374,10 +383,10 @@ func TestJitsiPairedChaosStress(t *testing.T) {
|
|||||||
|
|
||||||
// === Phase B: pick a victim and chaos ===
|
// === Phase B: pick a victim and chaos ===
|
||||||
victim := alice
|
victim := alice
|
||||||
victimName := "alice"
|
victimName := nameAlice
|
||||||
if rng.Intn(2) == 1 {
|
if rng.Intn(2) == 1 {
|
||||||
victim = bob
|
victim = bob
|
||||||
victimName = "bob"
|
victimName = nameBob
|
||||||
}
|
}
|
||||||
if cfg.verbose {
|
if cfg.verbose {
|
||||||
t.Logf("[paired][%d] CHAOS victim=%s — teardownPC + requestReconnect", stats.cycles, victimName)
|
t.Logf("[paired][%d] CHAOS victim=%s — teardownPC + requestReconnect", stats.cycles, victimName)
|
||||||
@@ -393,10 +402,10 @@ func TestJitsiPairedChaosStress(t *testing.T) {
|
|||||||
// engine has wedged.
|
// engine has wedged.
|
||||||
recoveryBudget := cfg.idle + 60*time.Second
|
recoveryBudget := cfg.idle + 60*time.Second
|
||||||
survivor := alice
|
survivor := alice
|
||||||
survivorName := "alice"
|
survivorName := nameAlice
|
||||||
if victim == alice {
|
if victim == alice {
|
||||||
survivor = bob
|
survivor = bob
|
||||||
survivorName = "bob"
|
survivorName = nameBob
|
||||||
}
|
}
|
||||||
if err := waitFreshReceive(ctx, survivor, recoveryBudget); err != nil {
|
if err := waitFreshReceive(ctx, survivor, recoveryBudget); err != nil {
|
||||||
stats.wedgesPair++
|
stats.wedgesPair++
|
||||||
@@ -425,21 +434,21 @@ func TestJitsiPairedChaosStress(t *testing.T) {
|
|||||||
func waitFirstReceive(ctx context.Context, a, b *pairedInstance, budget time.Duration) error {
|
func waitFirstReceive(ctx context.Context, a, b *pairedInstance, budget time.Duration) error {
|
||||||
deadline := time.Now().Add(budget)
|
deadline := time.Now().Add(budget)
|
||||||
for time.Now().Before(deadline) {
|
for time.Now().Before(deadline) {
|
||||||
ac, _ := a.snapshot()
|
ac := a.snapshot()
|
||||||
bc, _ := b.snapshot()
|
bc := b.snapshot()
|
||||||
if ac > 0 && bc > 0 {
|
if ac > 0 && bc > 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return fmt.Errorf("wait first receive: %w", ctx.Err())
|
||||||
case <-time.After(500 * time.Millisecond):
|
case <-time.After(500 * time.Millisecond):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ac, _ := a.snapshot()
|
ac := a.snapshot()
|
||||||
bc, _ := b.snapshot()
|
bc := b.snapshot()
|
||||||
return fmt.Errorf("did not see bidirectional roundtrip in %s (alice=%d bob=%d)",
|
return fmt.Errorf("did not see bidirectional roundtrip in %s (alice=%d bob=%d): %w",
|
||||||
budget, ac, bc)
|
budget, ac, bc, errRoundtripTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitFreshReceive blocks until target sees a NEW receive after this call,
|
// waitFreshReceive blocks until target sees a NEW receive after this call,
|
||||||
@@ -447,20 +456,20 @@ func waitFirstReceive(ctx context.Context, a, b *pairedInstance, budget time.Dur
|
|||||||
// bridge fully recovered: bytes are arriving from the (forced-to-reconnect)
|
// bridge fully recovered: bytes are arriving from the (forced-to-reconnect)
|
||||||
// peer.
|
// peer.
|
||||||
func waitFreshReceive(ctx context.Context, target *pairedInstance, budget time.Duration) error {
|
func waitFreshReceive(ctx context.Context, target *pairedInstance, budget time.Duration) error {
|
||||||
startCount, _ := target.snapshot()
|
startCount := target.snapshot()
|
||||||
deadline := time.Now().Add(budget)
|
deadline := time.Now().Add(budget)
|
||||||
for time.Now().Before(deadline) {
|
for time.Now().Before(deadline) {
|
||||||
c, _ := target.snapshot()
|
c := target.snapshot()
|
||||||
if c > startCount {
|
if c > startCount {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return fmt.Errorf("wait fresh receive: %w", ctx.Err())
|
||||||
case <-time.After(500 * time.Millisecond):
|
case <-time.After(500 * time.Millisecond):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("no new receive in %s (count stuck at %d)", budget, startCount)
|
return fmt.Errorf("no new receive in %s (count stuck at %d): %w", budget, startCount, errReceiveTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
func reportPairedStats(t *testing.T, s *pairedStats, cfg *pairedConfig) {
|
func reportPairedStats(t *testing.T, s *pairedStats, cfg *pairedConfig) {
|
||||||
|
|||||||
Reference in New Issue
Block a user