refactor(ble): improve connection lifecycle and enhance OTA reliability (#4721)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich
2026-03-05 12:58:34 -06:00
committed by GitHub
parent 5a5aa1f026
commit 68b2b6d88e
19 changed files with 741 additions and 537 deletions
+2 -1
View File
@@ -42,11 +42,12 @@ The `:feature:firmware` module provides a unified interface for updating Meshtas
Meshtastic-Android supports three primary firmware update flows:
#### 1. ESP32 Unified OTA (WiFi & BLE)
Used for modern ESP32 devices (e.g., Heltec V3, T-Beam S3). This method utilizes the **Unified OTA Protocol**, which enables high-speed transfers over TCP (port 3232) or BLE. The BLE transport uses the **Nordic Semiconductor Kotlin-BLE-Library** for architectural consistency with the rest of the application.
Used for modern ESP32 devices (e.g., Heltec V3, T-Beam S3). This method utilizes the **Unified OTA Protocol**, which enables high-speed transfers over TCP (port 3232) or BLE. The BLE transport uses the **Nordic Semiconductor Kotlin-BLE-Library** for architectural consistency and modern coroutine support.
**Key Features:**
- **Pre-shared Hash Verification**: The app sends the firmware SHA256 hash in an initial `AdminMessage` trigger. The device stores this in NVS and verifies the incoming stream against it.
- **Connection Retry**: Robust logic to wait for the device to reboot and start the OTA listener.
- **Automatic MTU Handling & Fragmentation**: The BLE transport automatically detects the negotiated MTU and fragments data chunks into packets that fit. It carefully manages acknowledgments for each fragmented packet to ensure reliability even on congested connections.
```mermaid
sequenceDiagram
@@ -32,13 +32,16 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.withTimeout
import no.nordicsemi.kotlin.ble.client.RemoteCharacteristic
import no.nordicsemi.kotlin.ble.client.android.CentralManager
import no.nordicsemi.kotlin.ble.client.android.ConnectionPriority
import no.nordicsemi.kotlin.ble.client.android.Peripheral
import no.nordicsemi.kotlin.ble.core.ConnectionState
import no.nordicsemi.kotlin.ble.core.WriteType
import org.meshtastic.core.ble.BleConnection
import org.meshtastic.core.ble.BleScanner
import org.meshtastic.core.ble.MeshtasticBleConstants.OTA_NOTIFY_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.OTA_SERVICE_UUID
import org.meshtastic.core.ble.MeshtasticBleConstants.OTA_WRITE_CHARACTERISTIC
import kotlin.time.Duration.Companion.seconds
import kotlin.uuid.Uuid
/**
* BLE transport implementation for ESP32 Unified OTA protocol. Uses Nordic Kotlin-BLE-Library for modern coroutine
@@ -161,57 +164,81 @@ class BleOtaTransport(
Logger.i { "BLE OTA: Connected to ${p.address}, discovering services..." }
// Discover services
val chars =
bleConnection.discoverCharacteristics(SERVICE_UUID, listOf(OTA_CHARACTERISTIC_UUID, TX_CHARACTERISTIC_UUID))
?: throw OtaProtocolException.ConnectionFailed("Required OTA service or characteristics not found")
// Increase connection priority for OTA
bleConnection.requestConnectionPriority(ConnectionPriority.HIGH)
otaCharacteristic = chars[OTA_CHARACTERISTIC_UUID]
val txChar = chars[TX_CHARACTERISTIC_UUID]
if (otaCharacteristic == null || txChar == null) {
throw OtaProtocolException.ConnectionFailed("Required characteristics not found")
}
// Enable notifications and collect responses
val subscribed = CompletableDeferred<Unit>()
txChar
.subscribe {
Logger.d { "BLE OTA: TX characteristic subscribed" }
subscribed.complete(Unit)
}
.onEach { notifyBytes ->
try {
val response = notifyBytes.decodeToString()
Logger.d { "BLE OTA: Received response: $response" }
responseChannel.trySend(response)
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
Logger.e(e) { "BLE OTA: Failed to decode response bytes" }
// Discover services using our unified profile helper
bleConnection.profile(OTA_SERVICE_UUID) { service ->
val ota =
requireNotNull(service.characteristics.firstOrNull { it.uuid == OTA_WRITE_CHARACTERISTIC }) {
"OTA characteristic not found"
}
val txChar =
requireNotNull(service.characteristics.firstOrNull { it.uuid == OTA_NOTIFY_CHARACTERISTIC }) {
"TX characteristic not found"
}
}
.catch { e ->
if (!subscribed.isCompleted) subscribed.completeExceptionally(e)
Logger.e(e) { "BLE OTA: Error in TX characteristic subscription" }
}
.launchIn(transportScope)
subscribed.await()
Logger.i { "BLE OTA: Service discovered and ready" }
otaCharacteristic = ota
// Log negotiated MTU for diagnostics
val maxLen = bleConnection.maximumWriteValueLength(WriteType.WITHOUT_RESPONSE)
Logger.i { "BLE OTA: Service ready. Max write value length: $maxLen bytes" }
// Enable notifications and collect responses
val subscribed = CompletableDeferred<Unit>()
txChar
.subscribe {
Logger.d { "BLE OTA: TX characteristic subscribed" }
subscribed.complete(Unit)
}
.onEach { notifyBytes ->
try {
val response = notifyBytes.decodeToString()
Logger.d { "BLE OTA: Received response: $response" }
responseChannel.trySend(response)
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
Logger.e(e) { "BLE OTA: Failed to decode response bytes" }
}
}
.catch { e ->
if (!subscribed.isCompleted) subscribed.completeExceptionally(e)
Logger.e(e) { "BLE OTA: Error in TX characteristic subscription" }
}
.launchIn(this)
subscribed.await()
Logger.i { "BLE OTA: Service discovered and ready" }
}
}
/**
* Initiates the OTA update by sending the size and hash.
*
* Note: If the start command is fragmented into multiple BLE packets, the protocol may send multiple responses
* (usually one ACK per packet followed by a final OK/ERASING).
*/
@Suppress("CyclomaticComplexMethod")
override suspend fun startOta(
sizeBytes: Long,
sha256Hash: String,
onHandshakeStatus: suspend (OtaHandshakeStatus) -> Unit,
): Result<Unit> = runCatching {
val command = OtaCommand.StartOta(sizeBytes, sha256Hash)
sendCommand(command)
val packetsSent = sendCommand(command)
var handshakeComplete = false
var responsesReceived = 0
while (!handshakeComplete) {
val response = waitForResponse(ERASING_TIMEOUT_MS)
responsesReceived++
when (val parsed = OtaResponse.parse(response)) {
is OtaResponse.Ok -> handshakeComplete = true
is OtaResponse.Ok -> {
// Only consider handshake complete after consuming all potential fragmented responses
if (responsesReceived >= packetsSent) {
handshakeComplete = true
}
}
is OtaResponse.Erasing -> {
Logger.i { "BLE OTA: Device erasing flash..." }
onHandshakeStatus(OtaHandshakeStatus.Erasing)
@@ -231,6 +258,14 @@ class BleOtaTransport(
}
}
/**
* Streams the firmware data in chunks.
*
* Each chunk is potentially fragmented into multiple BLE packets based on the negotiated MTU. The transport ensures
* that every fragmented packet is acknowledged by the device before proceeding, preventing buffer overflows on the
* radio.
*/
@Suppress("CyclomaticComplexMethod")
override suspend fun streamFirmware(
data: ByteArray,
chunkSize: Int,
@@ -248,43 +283,49 @@ class BleOtaTransport(
val currentChunkSize = minOf(chunkSize, remainingBytes)
val chunk = data.copyOfRange(sentBytes, sentBytes + currentChunkSize)
// Write chunk
writeData(chunk, WriteType.WITHOUT_RESPONSE)
// Write chunk (potentially fragmented into multiple BLE packets)
val packetsSentForChunk = writeData(chunk, WriteType.WITHOUT_RESPONSE)
// Wait for response (ACK or OK for last chunk)
val response = waitForResponse(ACK_TIMEOUT_MS)
// Wait for responses (The protocol expects one response per GATT write)
val nextSentBytes = sentBytes + currentChunkSize
when (val parsed = OtaResponse.parse(response)) {
is OtaResponse.Ack -> {
// Normal chunk success
}
repeat(packetsSentForChunk) { i ->
val response = waitForResponse(ACK_TIMEOUT_MS)
val isLastPacketOfChunk = i == packetsSentForChunk - 1
is OtaResponse.Ok -> {
// OK indicates completion (usually on last chunk)
if (nextSentBytes >= totalBytes) {
sentBytes = nextSentBytes
onProgress(1.0f)
return@runCatching Unit
} else {
throw OtaProtocolException.TransferFailed("Premature OK received at offset $nextSentBytes")
when (val parsed = OtaResponse.parse(response)) {
is OtaResponse.Ack -> {
// Normal packet success
}
}
is OtaResponse.Error -> {
if (parsed.message.contains("Hash Mismatch", ignoreCase = true)) {
throw OtaProtocolException.VerificationFailed("Firmware hash mismatch after transfer")
is OtaResponse.Ok -> {
// OK indicates completion (usually on last packet of last chunk)
if (nextSentBytes >= totalBytes && isLastPacketOfChunk) {
sentBytes = nextSentBytes
onProgress(1.0f)
return@runCatching Unit
} else if (!isLastPacketOfChunk) {
// Intermediate OK might happen if the device treats packets as chunks
} else {
throw OtaProtocolException.TransferFailed("Premature OK received at offset $nextSentBytes")
}
}
throw OtaProtocolException.TransferFailed("Transfer failed: ${parsed.message}")
}
else -> throw OtaProtocolException.TransferFailed("Unexpected response: $response")
is OtaResponse.Error -> {
if (parsed.message.contains("Hash Mismatch", ignoreCase = true)) {
throw OtaProtocolException.VerificationFailed("Firmware hash mismatch after transfer")
}
throw OtaProtocolException.TransferFailed("Transfer failed: ${parsed.message}")
}
else -> throw OtaProtocolException.TransferFailed("Unexpected response: $response")
}
}
sentBytes = nextSentBytes
onProgress(sentBytes.toFloat() / totalBytes)
}
// If we finished the loop without receiving OK, wait for it now
// If we finished the loop without receiving OK, wait for it now (verification stage)
val finalResponse = waitForResponse(VERIFICATION_TIMEOUT_MS)
when (val parsed = OtaResponse.parse(finalResponse)) {
is OtaResponse.Ok -> Unit
@@ -305,20 +346,37 @@ class BleOtaTransport(
transportScope.cancel()
}
private suspend fun sendCommand(command: OtaCommand) {
private suspend fun sendCommand(command: OtaCommand): Int {
val data = command.toString().toByteArray()
writeData(data, WriteType.WITH_RESPONSE)
return writeData(data, WriteType.WITH_RESPONSE)
}
private suspend fun writeData(data: ByteArray, writeType: WriteType) {
/**
* Writes data to the OTA characteristic, fragmenting the data into multiple BLE packets if it exceeds the
* negotiated MTU (maximum write length).
*
* @return The number of packets sent.
*/
private suspend fun writeData(data: ByteArray, writeType: WriteType): Int {
val characteristic =
otaCharacteristic ?: throw OtaProtocolException.ConnectionFailed("OTA characteristic not available")
val maxLen = bleConnection.maximumWriteValueLength(writeType) ?: data.size
var offset = 0
var packetsSent = 0
try {
characteristic.write(data, writeType = writeType)
while (offset < data.size) {
val chunkSize = minOf(data.size - offset, maxLen)
val packet = data.copyOfRange(offset, offset + chunkSize)
characteristic.write(packet, writeType = writeType)
offset += chunkSize
packetsSent++
}
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
throw OtaProtocolException.TransferFailed("Failed to write data", e)
throw OtaProtocolException.TransferFailed("Failed to write data at offset $offset", e)
}
return packetsSent
}
private suspend fun waitForResponse(timeoutMs: Long): String = try {
@@ -328,11 +386,6 @@ class BleOtaTransport(
}
companion object {
// Service and Characteristic UUIDs from ESP32 Unified OTA spec
private val SERVICE_UUID = Uuid.parse("4FAFC201-1FB5-459E-8FCC-C5C9C331914B")
private val OTA_CHARACTERISTIC_UUID = Uuid.parse("62ec0272-3ec5-11eb-b378-0242ac130005")
private val TX_CHARACTERISTIC_UUID = Uuid.parse("62ec0272-3ec5-11eb-b378-0242ac130003")
// Timeouts and retries
private val SCAN_TIMEOUT = 10.seconds
private const val CONNECTION_TIMEOUT_MS = 15_000L
@@ -0,0 +1,209 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.feature.firmware.ota
import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.runTest
import no.nordicsemi.kotlin.ble.client.android.CentralManager
import no.nordicsemi.kotlin.ble.client.android.mock.mock
import no.nordicsemi.kotlin.ble.client.mock.ConnectionResult
import no.nordicsemi.kotlin.ble.client.mock.PeripheralSpec
import no.nordicsemi.kotlin.ble.client.mock.PeripheralSpecEventHandler
import no.nordicsemi.kotlin.ble.client.mock.Proximity
import no.nordicsemi.kotlin.ble.core.CharacteristicProperty
import no.nordicsemi.kotlin.ble.core.LegacyAdvertisingSetParameters
import no.nordicsemi.kotlin.ble.core.Permission
import no.nordicsemi.kotlin.ble.core.and
import no.nordicsemi.kotlin.ble.environment.android.mock.MockAndroidEnvironment
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
import kotlin.time.Duration.Companion.milliseconds
import kotlin.uuid.Uuid
private val SERVICE_UUID = Uuid.parse("4FAFC201-1FB5-459E-8FCC-C5C9C331914B")
private val OTA_CHARACTERISTIC_UUID = Uuid.parse("62ec0272-3ec5-11eb-b378-0242ac130005")
private val TX_CHARACTERISTIC_UUID = Uuid.parse("62ec0272-3ec5-11eb-b378-0242ac130003")
/**
* Tests for BleOtaTransport service discovery via Nordic's Peripheral.profile() API. These validate the refactored
* connect() path that replaced discoverCharacteristics().
*/
@OptIn(ExperimentalCoroutinesApi::class)
class BleOtaTransportServiceDiscoveryTest {
private val testDispatcher = StandardTestDispatcher()
private val address = "00:11:22:33:44:55"
@Before
fun setup() {
Logger.setLogWriters(
object : co.touchlab.kermit.LogWriter() {
override fun log(severity: Severity, message: String, tag: String, throwable: Throwable?) {
println("[$severity] $tag: $message")
throwable?.printStackTrace()
}
},
)
}
@Test
fun `connect fails when OTA service not found on device`() = runTest(testDispatcher) {
val mockEnvironment = MockAndroidEnvironment.Api31(isBluetoothEnabled = true)
val centralManager = CentralManager.mock(mockEnvironment, scope = backgroundScope)
// Create a peripheral with a DIFFERENT service UUID (not the OTA service)
val wrongServiceUuid = Uuid.parse("0000180A-0000-1000-8000-00805F9B34FB") // Device Info
val otaPeripheral =
PeripheralSpec.simulatePeripheral(identifier = address, proximity = Proximity.IMMEDIATE) {
advertising(
parameters = LegacyAdvertisingSetParameters(connectable = true, interval = 100.milliseconds),
) {
CompleteLocalName("ESP32-OTA")
}
connectable(
name = "ESP32-OTA",
eventHandler = object : PeripheralSpecEventHandler {},
isBonded = true,
) {
Service(uuid = wrongServiceUuid) {
Characteristic(
uuid = OTA_CHARACTERISTIC_UUID,
properties =
CharacteristicProperty.WRITE and CharacteristicProperty.WRITE_WITHOUT_RESPONSE,
permission = Permission.WRITE,
)
}
}
}
centralManager.simulatePeripherals(listOf(otaPeripheral))
val transport = BleOtaTransport(centralManager, address, testDispatcher)
val result = transport.connect()
assertTrue("Connect should fail when OTA service is missing", result.isFailure)
transport.close()
}
@Test
fun `connect fails when TX characteristic is missing`() = runTest(testDispatcher) {
val mockEnvironment = MockAndroidEnvironment.Api31(isBluetoothEnabled = true)
val centralManager = CentralManager.mock(mockEnvironment, scope = backgroundScope)
// Create a peripheral with the OTA service but only the OTA characteristic (no TX)
val otaPeripheral =
PeripheralSpec.simulatePeripheral(identifier = address, proximity = Proximity.IMMEDIATE) {
advertising(
parameters = LegacyAdvertisingSetParameters(connectable = true, interval = 100.milliseconds),
) {
CompleteLocalName("ESP32-OTA")
}
connectable(
name = "ESP32-OTA",
eventHandler = object : PeripheralSpecEventHandler {},
isBonded = true,
) {
Service(uuid = SERVICE_UUID) {
Characteristic(
uuid = OTA_CHARACTERISTIC_UUID,
properties =
CharacteristicProperty.WRITE and CharacteristicProperty.WRITE_WITHOUT_RESPONSE,
permission = Permission.WRITE,
)
// TX_CHARACTERISTIC intentionally omitted
}
}
}
centralManager.simulatePeripherals(listOf(otaPeripheral))
val transport = BleOtaTransport(centralManager, address, testDispatcher)
val result = transport.connect()
assertTrue("Connect should fail when TX characteristic is missing", result.isFailure)
transport.close()
}
@Test
fun `connect fails when device is not found during scan`() = runTest(testDispatcher) {
val mockEnvironment = MockAndroidEnvironment.Api31(isBluetoothEnabled = true)
val centralManager = CentralManager.mock(mockEnvironment, scope = backgroundScope)
// Don't simulate any peripherals — scan will find nothing
val transport = BleOtaTransport(centralManager, address, testDispatcher)
val result = transport.connect()
assertTrue("Connect should fail when device is not found", result.isFailure)
val exception = result.exceptionOrNull()
assertTrue(
"Should be ConnectionFailed, got: $exception",
exception is OtaProtocolException.ConnectionFailed,
)
transport.close()
}
@Test
fun `connect succeeds with valid OTA service and characteristics`() = runTest(testDispatcher) {
val mockEnvironment = MockAndroidEnvironment.Api31(isBluetoothEnabled = true)
val centralManager = CentralManager.mock(mockEnvironment, scope = backgroundScope)
val otaPeripheral =
PeripheralSpec.simulatePeripheral(identifier = address, proximity = Proximity.IMMEDIATE) {
advertising(
parameters = LegacyAdvertisingSetParameters(connectable = true, interval = 100.milliseconds),
) {
CompleteLocalName("ESP32-OTA")
}
connectable(
name = "ESP32-OTA",
eventHandler =
object : PeripheralSpecEventHandler {
override fun onConnectionRequest(
preferredPhy: List<no.nordicsemi.kotlin.ble.core.Phy>,
): ConnectionResult = ConnectionResult.Accept
},
isBonded = true,
) {
Service(uuid = SERVICE_UUID) {
Characteristic(
uuid = OTA_CHARACTERISTIC_UUID,
properties =
CharacteristicProperty.WRITE and CharacteristicProperty.WRITE_WITHOUT_RESPONSE,
permission = Permission.WRITE,
)
Characteristic(
uuid = TX_CHARACTERISTIC_UUID,
property = CharacteristicProperty.NOTIFY,
permission = Permission.READ,
)
}
}
}
centralManager.simulatePeripherals(listOf(otaPeripheral))
val transport = BleOtaTransport(centralManager, address, testDispatcher)
val result = transport.connect()
assertTrue("Connect should succeed: ${result.exceptionOrNull()}", result.isSuccess)
transport.close()
}
}