feat(telemost): Add connection monitoring and automatic reconnection

This commit is contained in:
zarazaex59
2026-04-07 01:15:44 +03:00
parent 66c272950c
commit d614e748ea
4 changed files with 105 additions and 15 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
module github.com/zarazaex69/olcrtc module github.com/zarazaex69/olcrtc
go 1.23 go 1.24.0
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
+4 -1
View File
@@ -64,11 +64,14 @@ func Run(roomURL, keyHex string, socksPort int) error {
c.peer = peer c.peer = peer
log.Println("Connecting to Telemost...") log.Println("Connecting to Telemost...")
if err := peer.Connect(context.Background()); err != nil { ctx := context.Background()
if err := peer.Connect(ctx); err != nil {
return err return err
} }
log.Println("Connected to Telemost") log.Println("Connected to Telemost")
go peer.WatchConnection(ctx)
return c.runSOCKS5(socksPort) return c.runSOCKS5(socksPort)
} }
+4 -1
View File
@@ -71,11 +71,14 @@ func Run(roomURL, keyHex string) error {
s.peer = peer s.peer = peer
log.Println("Connecting to Telemost...") log.Println("Connecting to Telemost...")
if err := peer.Connect(context.Background()); err != nil { ctx := context.Background()
if err := peer.Connect(ctx); err != nil {
return err return err
} }
log.Println("Connected to Telemost") log.Println("Connected to Telemost")
go peer.WatchConnection(ctx)
return s.run() return s.run()
} }
+96 -12
View File
@@ -13,14 +13,16 @@ import (
) )
type Peer struct { type Peer struct {
roomURL string roomURL string
name string name string
conn *ConnectionInfo conn *ConnectionInfo
ws *websocket.Conn ws *websocket.Conn
pcSub *webrtc.PeerConnection pcSub *webrtc.PeerConnection
pcPub *webrtc.PeerConnection pcPub *webrtc.PeerConnection
dc *webrtc.DataChannel dc *webrtc.DataChannel
onData func([]byte) onData func([]byte)
reconnectCh chan struct{}
closeCh chan struct{}
} }
func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) {
@@ -30,10 +32,12 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) {
} }
return &Peer{ return &Peer{
roomURL: roomURL, roomURL: roomURL,
name: name, name: name,
conn: conn, conn: conn,
onData: onData, onData: onData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
}, nil }, nil
} }
@@ -90,6 +94,13 @@ func (p *Peer) Connect(ctx context.Context) error {
} }
p.ws = ws p.ws = ws
ws.SetPongHandler(func(string) error {
ws.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
go p.keepAlive()
if err := p.sendHello(); err != nil { if err := p.sendHello(); err != nil {
return err return err
} }
@@ -162,6 +173,10 @@ func (p *Peer) handleSignaling() {
var msg map[string]interface{} var msg map[string]interface{}
if err := p.ws.ReadJSON(&msg); err != nil { if err := p.ws.ReadJSON(&msg); err != nil {
log.Printf("WS read error: %v", err) log.Printf("WS read error: %v", err)
select {
case p.reconnectCh <- struct{}{}:
default:
}
return return
} }
@@ -320,6 +335,7 @@ func (p *Peer) setupICEHandlers() {
} }
func (p *Peer) Close() error { func (p *Peer) Close() error {
close(p.closeCh)
if p.ws != nil { if p.ws != nil {
p.ws.Close() p.ws.Close()
} }
@@ -331,3 +347,71 @@ func (p *Peer) Close() error {
} }
return nil return nil
} }
func (p *Peer) keepAlive() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if p.ws != nil {
if err := p.ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil {
log.Printf("Ping error: %v", err)
select {
case p.reconnectCh <- struct{}{}:
default:
}
return
}
}
case <-p.closeCh:
return
}
}
}
func (p *Peer) reconnect(ctx context.Context) error {
log.Println("Reconnecting...")
if p.ws != nil {
p.ws.Close()
}
if p.pcSub != nil {
p.pcSub.Close()
}
if p.pcPub != nil {
p.pcPub.Close()
}
time.Sleep(2 * time.Second)
conn, err := GetConnectionInfo(p.roomURL, p.name)
if err != nil {
return err
}
p.conn = conn
return p.Connect(ctx)
}
func (p *Peer) WatchConnection(ctx context.Context) {
for {
select {
case <-p.reconnectCh:
for {
if err := p.reconnect(ctx); err != nil {
log.Printf("Reconnect failed: %v, retrying in 5s...", err)
time.Sleep(5 * time.Second)
continue
}
log.Println("Reconnected successfully")
break
}
case <-p.closeCh:
return
case <-ctx.Done():
return
}
}
}