refactor(videochannel): replace ffmpeg with Go VP8 codec

- thnks rape4me/kc for vp8 enc/dec
This commit is contained in:
zarazaex69
2026-05-26 19:11:11 +03:00
parent d7c813265b
commit 65dbb93221
4 changed files with 119 additions and 18 deletions
+105
View File
@@ -0,0 +1,105 @@
package videochannel
import (
"fmt"
"sync"
"sync/atomic"
"codeberg.org/rape4me/kc/vp8"
)
// goEncoder is a pure Go VP8 encoder replacing ffmpegEncoder.
type goEncoder struct {
enc *vp8.Encoder
width int
height int
frameSize int
closed atomic.Bool
mu sync.Mutex
}
func newGoEncoder(width, height, _ int) *goEncoder {
return &goEncoder{
enc: vp8.NewEncoder(width, height, 10),
width: width,
height: height,
frameSize: width * height,
}
}
func (e *goEncoder) EncodeFrame(frame []byte) ([]byte, error) {
if e.closed.Load() {
return nil, ErrTransportClosed
}
if len(frame) != e.frameSize {
return nil, fmt.Errorf("%w: got %d expected %d", ErrUnexpectedFrameSize, len(frame), e.frameSize)
}
e.mu.Lock()
defer e.mu.Unlock()
return e.enc.Encode(frame)
}
func (e *goEncoder) Close() error {
e.closed.Store(true)
return nil
}
// goDecoder is a pure Go VP8 decoder replacing ffmpegDecoder.
type goDecoder struct {
dec *vp8.Decoder
width int
height int
frameSize int
frames chan []byte
closed atomic.Bool
closeOnce sync.Once
closeCh chan struct{}
}
func newGoDecoder(width, height int) *goDecoder {
return &goDecoder{
dec: vp8.NewDecoder(),
width: width,
height: height,
frameSize: width * height,
frames: make(chan []byte, 32),
closeCh: make(chan struct{}),
}
}
func (d *goDecoder) PushSample(sample []byte) error {
if d.closed.Load() {
return ErrTransportClosed
}
frame, err := d.dec.Decode(sample)
if err != nil {
return fmt.Errorf("vp8 decode: %w", err)
}
gray := frame.Grayscale()
select {
case d.frames <- gray:
case <-d.closeCh:
return ErrTransportClosed
}
return nil
}
func (d *goDecoder) PopFrame() ([]byte, error) {
select {
case frame, ok := <-d.frames:
if !ok {
return nil, ErrTransportClosed
}
return frame, nil
case <-d.closeCh:
return nil, ErrTransportClosed
}
}
func (d *goDecoder) Close() error {
d.closeOnce.Do(func() {
d.closed.Store(true)
close(d.closeCh)
})
return nil
}
+9 -16
View File
@@ -58,10 +58,10 @@ type streamTransport struct {
stream videoSession
track *webrtc.TrackLocalStaticSample
codec codecSpec
encoder *ffmpegEncoder
encoder *goEncoder
encoderMu sync.Mutex
decoderMu sync.Mutex
decoders map[*ffmpegDecoder]struct{}
decoders map[*goDecoder]struct{}
onData func([]byte)
outbound chan []byte
outboundAck chan []byte
@@ -157,7 +157,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
outboundAck: make(chan []byte, 64),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
decoders: make(map[*ffmpegDecoder]struct{}),
decoders: make(map[*goDecoder]struct{}),
fragAcks: newFragAckTracker(),
reassembler: common.NewReassembler(256),
videoW: opts.Width,
@@ -189,10 +189,7 @@ func (p *streamTransport) Connect(ctx context.Context) error {
connectCtx, cancel := context.WithTimeout(ctx, defaultConnectTimeout)
defer cancel()
encoder, err := newFFmpegEncoder(ctx, p.codec, p.videoW, p.videoH, p.videoFPS, p.videoBitrate, p.videoHW)
if err != nil {
return fmt.Errorf("new encoder: %w", err)
}
encoder := newGoEncoder(p.videoW, p.videoH, p.videoFPS)
if err := p.stream.Connect(connectCtx); err != nil {
_ = encoder.Close()
@@ -390,7 +387,7 @@ func (p *streamTransport) Features() transport.Features {
}
}
func (p *streamTransport) writeIdleFrame(enc *ffmpegEncoder, frameDuration time.Duration) {
func (p *streamTransport) writeIdleFrame(enc *goEncoder, frameDuration time.Duration) {
p.idleFrameMu.Lock()
cached := p.idleFrame
p.idleFrameMu.Unlock()
@@ -415,7 +412,7 @@ func (p *streamTransport) writeIdleFrame(enc *ffmpegEncoder, frameDuration time.
_ = p.track.WriteSample(media.Sample{Data: cached, Duration: frameDuration})
}
func (p *streamTransport) writePayloadFrame(enc *ffmpegEncoder, payload []byte, frameDuration time.Duration) {
func (p *streamTransport) writePayloadFrame(enc *goEncoder, payload []byte, frameDuration time.Duration) {
rawFrame, err := p.renderFrame(payload)
if err != nil {
logger.Debugf("videochannel render error: %v", err)
@@ -521,7 +518,7 @@ func (p *streamTransport) enqueueFrame(frame []byte, priority bool) error {
}
}
func (p *streamTransport) popDecoderFrames(decoder *ffmpegDecoder) {
func (p *streamTransport) popDecoderFrames(decoder *goDecoder) {
defer func() {
p.decoderMu.Lock()
if p.decoders != nil {
@@ -549,7 +546,7 @@ func (p *streamTransport) popDecoderFrames(decoder *ffmpegDecoder) {
}
}
func (p *streamTransport) readDecoderInput(track *webrtc.TrackRemote, decoder *ffmpegDecoder, codec codecSpec) {
func (p *streamTransport) readDecoderInput(track *webrtc.TrackRemote, decoder *goDecoder, codec codecSpec) {
sb := samplebuilder.New(sampleBuilderMaxLate, codec.depacketizer(), track.Codec().ClockRate)
for {
select {
@@ -583,11 +580,7 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc
return
}
decoder, err := newFFmpegDecoder(p.runCtx, codec, p.videoW, p.videoH, p.videoFPS, p.videoHW)
if err != nil {
logger.Warnf("videochannel decoder init failed: %v", err)
return
}
decoder := newGoDecoder(p.videoW, p.videoH)
p.decoderMu.Lock()
if p.closed.Load() || p.decoders == nil {