test: comprehensive bridge unit tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich
2026-05-06 14:58:16 -05:00
parent 6450c69820
commit 6490fc30e8
8 changed files with 917 additions and 20 deletions
@@ -36,7 +36,7 @@ internal class SdkEventBridge(
.launchIn(scope)
}
private fun handleEvent(event: MeshEvent) {
internal fun handleEvent(event: MeshEvent) {
when (event) {
is MeshEvent.DeviceRebooted -> {
Logger.i { "[SdkBridge] Device rebooted" }
@@ -28,6 +28,7 @@ import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.util.onlineTimeThreshold
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.PortNum
import org.meshtastic.sdk.NodeChange
@@ -49,14 +50,11 @@ internal class SdkNodeBridge(
accessor.client
.flatMapLatest { client -> client?.packets ?: emptyFlow() }
.filter { it.decoded?.portnum == PortNum.NODE_STATUS_APP }
.onEach { packet ->
val status = packet.decoded?.payload?.utf8() ?: return@onEach
nodeRepository.updateNode(packet.from) { it.copy(nodeStatus = status) }
}
.onEach(::handleNodeStatusPacket)
.launchIn(scope)
}
private suspend fun handleNodeChange(change: NodeChange) {
internal suspend fun handleNodeChange(change: NodeChange) {
when (change) {
is NodeChange.Snapshot -> {
nodeRepository.clear()
@@ -75,6 +73,11 @@ internal class SdkNodeBridge(
}
}
internal fun handleNodeStatusPacket(packet: MeshPacket) {
val status = packet.decoded?.payload?.utf8() ?: return
nodeRepository.updateNode(packet.from) { it.copy(nodeStatus = status) }
}
private fun handleWentOffline(change: NodeChange.WentOffline) {
val nodeNum = change.nodeId.raw
Logger.d {
@@ -57,7 +57,7 @@ internal class SdkPacketBridge(
.launchIn(scope)
}
private suspend fun handleStoreForwardEvent(event: StoreForwardEvent) {
internal suspend fun handleStoreForwardEvent(event: StoreForwardEvent) {
when (event) {
is StoreForwardEvent.ServerDiscovered -> {
Logger.i {
@@ -23,6 +23,7 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.PortNum
import org.meshtastic.sdk.NeighborInfo
@@ -33,19 +34,21 @@ internal class SdkTopologyBridge(
accessor.client
.flatMapLatest { client -> client?.packets ?: emptyFlow() }
.filter { it.decoded?.portnum == PortNum.NEIGHBORINFO_APP }
.onEach { packet ->
val payload = packet.decoded?.payload?.toByteArray() ?: return@onEach
runCatching {
val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload)
val info = NeighborInfo.fromProto(
reportingNode = packet.from,
neighborNodeIds = proto.neighbors.map { it.node_id },
snrValues = proto.neighbors.map { it.snr },
timestamp = proto.last_sent_by_id,
)
topologyService.ingestNeighborInfo(info)
}.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } }
}
.onEach(::handleNeighborInfoPacket)
.launchIn(scope)
}
internal suspend fun handleNeighborInfoPacket(packet: MeshPacket) {
val payload = packet.decoded?.payload?.toByteArray() ?: return
runCatching {
val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload)
val info = NeighborInfo.fromProto(
reportingNode = packet.from,
neighborNodeIds = proto.neighbors.map { it.node_id },
snrValues = proto.neighbors.map { it.snr },
timestamp = proto.last_sent_by_id,
)
topologyService.ingestNeighborInfo(info)
}.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } }
}
}
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import kotlinx.io.bytestring.ByteString as KByteString
import org.meshtastic.core.model.CongestionLevel
import org.meshtastic.core.testing.FakeServiceRepository
import org.meshtastic.proto.ClientNotification
import org.meshtastic.proto.FromRadio
import org.meshtastic.sdk.CongestionMetrics
import org.meshtastic.sdk.DroppedFlow
import org.meshtastic.sdk.Frame
import org.meshtastic.sdk.MeshEvent
import org.meshtastic.sdk.RadioClient
import org.meshtastic.sdk.TransportIdentity
import org.meshtastic.sdk.testing.FakeRadioTransport
import org.meshtastic.sdk.testing.InMemoryStorageProvider
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNull
@OptIn(ExperimentalCoroutinesApi::class)
class SdkEventBridgeTest {
@Test
fun `device rebooted event emits notification`() = runTest {
val serviceRepository = FakeServiceRepository()
val bridge = SdkEventBridge(serviceRepository)
val (transport, client) = connectedClient()
bridge.observe(TestRadioClientAccessor(client), backgroundScope)
client.connect()
runCurrent()
transport.injectFrame(encodeFromRadio(FromRadio(rebooted = true)))
runCurrent()
assertEquals("Device rebooted", serviceRepository.clientNotification.value?.message)
client.disconnect()
}
@Test
fun `congestion warning sets congestion level`() = runTest {
val serviceRepository = FakeServiceRepository()
val bridge = SdkEventBridge(serviceRepository)
bridge.handleEvent(MeshEvent.CongestionWarning(CongestionMetrics(airUtilTx = 80f, channelUtil = 30f)))
assertEquals(CongestionLevel.CRITICAL, serviceRepository.congestionLevel.value)
}
@Test
fun `duplicated public key warning is logged without changing service state`() = runTest {
val serviceRepository = FakeServiceRepository().apply {
setClientNotification(ClientNotification(message = "existing"))
setCongestionLevel(CongestionLevel.HIGH)
}
val bridge = SdkEventBridge(serviceRepository)
bridge.handleEvent(MeshEvent.SecurityWarning.DuplicatedPublicKey)
assertEquals("existing", serviceRepository.clientNotification.value?.message)
assertEquals(CongestionLevel.HIGH, serviceRepository.congestionLevel.value)
}
@Test
fun `low entropy key warning is logged without changing service state`() = runTest {
val serviceRepository = FakeServiceRepository().apply {
setCongestionLevel(CongestionLevel.MEDIUM)
}
val bridge = SdkEventBridge(serviceRepository)
bridge.handleEvent(MeshEvent.SecurityWarning.LowEntropyKey)
assertNull(serviceRepository.clientNotification.value)
assertEquals(CongestionLevel.MEDIUM, serviceRepository.congestionLevel.value)
}
@Test
fun `packets dropped event is handled without crashing`() = runTest {
val serviceRepository = FakeServiceRepository().apply {
setClientNotification(ClientNotification(message = "keep"))
}
val bridge = SdkEventBridge(serviceRepository)
bridge.handleEvent(MeshEvent.PacketsDropped(flow = DroppedFlow.Events, count = 4))
assertEquals("keep", serviceRepository.clientNotification.value?.message)
}
private fun TestScope.connectedClient(): Pair<FakeRadioTransport, RadioClient> {
val transport = FakeRadioTransport(identity = TransportIdentity("fake:event-bridge"), autoHandshake = true)
val client =
RadioClient.Builder()
.transport(transport)
.storage(InMemoryStorageProvider())
.coroutineContext(backgroundScope.coroutineContext)
.autoSyncTimeOnConnect(false)
.build()
return transport to client
}
private fun encodeFromRadio(fromRadio: FromRadio): Frame {
val proto = FromRadio.ADAPTER.encode(fromRadio)
val frameBytes = ByteArray(4 + proto.size).apply {
this[0] = 0x94.toByte()
this[1] = 0xC3.toByte()
this[2] = (proto.size shr 8).toByte()
this[3] = (proto.size and 0xFF).toByte()
proto.copyInto(this, destinationOffset = 4)
}
return Frame(KByteString(frameBytes))
}
private class TestRadioClientAccessor(client: RadioClient) : RadioClientAccessor {
override val client = MutableStateFlow<RadioClient?>(client)
override fun rebuildAndConnectAsync() = Unit
override fun disconnect() = Unit
}
}
@@ -0,0 +1,336 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import kotlinx.io.bytestring.ByteString as KByteString
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.model.Node
import org.meshtastic.core.model.util.onlineTimeThreshold
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.testing.FakeNodeRepository
import org.meshtastic.proto.Data
import org.meshtastic.proto.FromRadio
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.NodeInfo
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.User
import org.meshtastic.sdk.Frame
import org.meshtastic.sdk.NodeChange
import org.meshtastic.sdk.NodeId
import org.meshtastic.sdk.RadioClient
import org.meshtastic.sdk.StorageProvider
import org.meshtastic.sdk.TransportIdentity
import org.meshtastic.sdk.testing.FakeRadioTransport
import org.meshtastic.sdk.testing.InMemoryStorage
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.time.Clock
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalCoroutinesApi::class)
class SdkNodeBridgeTest {
@Test
fun `snapshot clears repository and reinstalls nodes`() = runTest {
val nodeRepository = RecordingNodeRepository().apply {
setNodes(
listOf(
Node(num = 0xAAAA0001.toInt(), user = User(id = "!AAAA0001", long_name = "stale")),
),
)
}
val topologyService = MeshTopologyService().apply {
ingestNeighborInfo(
org.meshtastic.sdk.NeighborInfo(
nodeId = NodeId(1),
neighbors = listOf(org.meshtastic.sdk.NeighborInfo.Neighbor(NodeId(2), 7.5f)),
),
)
}
val bridge = SdkNodeBridge(nodeRepository, topologyService)
val first = nodeInfo(0x11111111, "!11111111", "Alpha")
val second = nodeInfo(0x22222222, "!22222222", "Bravo")
bridge.handleNodeChange(
NodeChange.Snapshot(
mapOf(
NodeId(first.num) to first,
NodeId(second.num) to second,
),
),
)
assertEquals(1, nodeRepository.clearCalls)
assertEquals(listOf(false, false), nodeRepository.installCalls.map { it.second })
assertEquals(setOf(first.num, second.num), nodeRepository.nodeDBbyNum.value.keys)
assertTrue(nodeRepository.isNodeDbReady.value)
assertTrue(topologyService.edges.value.isEmpty())
assertEquals(0, topologyService.nodeCount.value)
}
@Test
fun `added event installs node with broadcast enabled`() = runTest {
val nodeRepository = RecordingNodeRepository()
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
val added = nodeInfo(0x33333333, "!33333333", "Added")
bridge.handleNodeChange(NodeChange.Added(added))
assertEquals(listOf(true), nodeRepository.installCalls.map { it.second })
assertTrue(nodeRepository.nodeDBbyNum.value.containsKey(added.num))
}
@Test
fun `updated event reinstalls node with broadcast enabled`() = runTest {
val nodeRepository = RecordingNodeRepository()
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
val updated = nodeInfo(0x44444444, "!44444444", "Updated")
bridge.handleNodeChange(NodeChange.Updated(updated, emptySet()))
assertEquals(listOf(true), nodeRepository.installCalls.map { it.second })
assertTrue(nodeRepository.nodeDBbyNum.value.containsKey(updated.num))
}
@Test
fun `removed event deletes node from repository`() = runTest {
val nodeNum = 0x55555555
val nodeRepository = RecordingNodeRepository().apply {
setNodes(listOf(Node(num = nodeNum, user = User(id = "!55555555", long_name = "Gone"))))
}
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
bridge.handleNodeChange(NodeChange.Removed(NodeId(nodeNum)))
assertEquals(listOf(nodeNum), nodeRepository.removeCalls)
assertFalse(nodeRepository.nodeDBbyNum.value.containsKey(nodeNum))
}
@Test
fun `went offline updates last heard and marks node offline`() = runTest {
val nodeNum = 0x66666666
val nodeRepository = RecordingNodeRepository().apply {
setNodes(
listOf(
Node(
num = nodeNum,
user = User(id = "!66666666", long_name = "Offline"),
lastHeard = Clock.System.now().epochSeconds.toInt(),
),
),
)
}
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
val staleLastHeard = onlineTimeThreshold() - 20
bridge.handleNodeChange(NodeChange.WentOffline(NodeId(nodeNum), staleLastHeard))
val updated = nodeRepository.nodeDBbyNum.value.getValue(nodeNum)
assertEquals(minOf(Clock.System.now().epochSeconds.toInt(), staleLastHeard, onlineTimeThreshold()), updated.lastHeard)
assertFalse(updated.isOnline)
}
@Test
fun `went offline ignores unknown node`() = runTest {
val nodeRepository = RecordingNodeRepository()
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
bridge.handleNodeChange(NodeChange.WentOffline(NodeId(0x77777777), onlineTimeThreshold() - 10))
assertTrue(nodeRepository.nodeDBbyNum.value.isEmpty())
}
@Test
fun `came online updates last heard and marks node online`() = runTest {
val nodeNum = 0x88888888.toInt()
val nodeRepository = RecordingNodeRepository().apply {
setNodes(
listOf(
Node(
num = nodeNum,
user = User(id = "!88888888", long_name = "Online"),
lastHeard = onlineTimeThreshold() - 120,
),
),
)
}
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
bridge.handleNodeChange(NodeChange.CameOnline(NodeId(nodeNum)))
val updated = nodeRepository.nodeDBbyNum.value.getValue(nodeNum)
assertTrue(updated.lastHeard >= onlineTimeThreshold())
assertTrue(updated.isOnline)
}
@Test
fun `came online ignores unknown node`() = runTest {
val nodeRepository = RecordingNodeRepository()
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
bridge.handleNodeChange(NodeChange.CameOnline(NodeId(0x99999999.toInt())))
assertTrue(nodeRepository.nodeDBbyNum.value.isEmpty())
}
@Test
fun `own node discovered sets my node num`() = runTest {
val myNodeNum = 0x12345678
val nodeRepository = RecordingNodeRepository()
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
val (transport, client) = connectedClient(myNodeNum = myNodeNum)
bridge.observe(TestRadioClientAccessor(client), backgroundScope)
client.connect()
runCurrent()
transport.injectFrame(encodeFromRadio(FromRadio(node_info = nodeInfo(myNodeNum, "!12345678", "Self"))))
runCurrent()
assertEquals(myNodeNum, nodeRepository.myNodeNum.value)
client.disconnect()
}
@Test
fun `node status packet populates node status`() = runTest {
val nodeNum = 0xABCDEF01.toInt()
val nodeRepository = RecordingNodeRepository().apply {
setNodes(listOf(Node(num = nodeNum, user = User(id = "!ABCDEF01", long_name = "Status"))))
}
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
bridge.handleNodeStatusPacket(
MeshPacket(
from = nodeNum,
decoded = Data(
portnum = PortNum.NODE_STATUS_APP,
payload = "nomad active".encodeToByteArray().toByteString(),
),
),
)
assertEquals("nomad active", nodeRepository.nodeDBbyNum.value.getValue(nodeNum).nodeStatus)
}
@Test
fun `node status packet with empty payload stores empty status`() = runTest {
val nodeNum = 0x0BADF00D
val nodeRepository = RecordingNodeRepository().apply {
setNodes(listOf(Node(num = nodeNum, user = User(id = "!0BADF00D", long_name = "Status"))))
}
val bridge = SdkNodeBridge(nodeRepository, MeshTopologyService())
bridge.handleNodeStatusPacket(
MeshPacket(
from = nodeNum,
decoded = Data(portnum = PortNum.NODE_STATUS_APP),
),
)
assertEquals("", nodeRepository.nodeDBbyNum.value.getValue(nodeNum).nodeStatus)
}
private fun TestScope.connectedClient(
storage: StorageProvider = NodeBridgeSeededHeartbeatStorageProvider(emptyMap()),
myNodeNum: Int = 0x11111111,
presenceTimeout: Duration = 1.seconds,
): Pair<FakeRadioTransport, RadioClient> {
val transport = FakeRadioTransport(identity = TransportIdentity("fake:node-bridge"), autoHandshake = true, nodeNum = myNodeNum)
val client =
RadioClient.Builder()
.transport(transport)
.storage(storage)
.coroutineContext(backgroundScope.coroutineContext)
.autoSyncTimeOnConnect(false)
.presenceTimeout(presenceTimeout)
.build()
return transport to client
}
private fun nodeInfo(num: Int, id: String, longName: String) =
NodeInfo(
num = num,
user = User(id = id, long_name = longName, short_name = longName.take(4)),
)
private fun encodeFromRadio(fromRadio: FromRadio): Frame {
val proto = FromRadio.ADAPTER.encode(fromRadio)
val frameBytes = ByteArray(4 + proto.size).apply {
this[0] = 0x94.toByte()
this[1] = 0xC3.toByte()
this[2] = (proto.size shr 8).toByte()
this[3] = (proto.size and 0xFF).toByte()
proto.copyInto(this, destinationOffset = 4)
}
return Frame(KByteString(frameBytes))
}
private class RecordingNodeRepository(
private val delegate: FakeNodeRepository = FakeNodeRepository(),
) : NodeRepository by delegate {
val installCalls = mutableListOf<Pair<NodeInfo, Boolean>>()
val removeCalls = mutableListOf<Int>()
var clearCalls = 0
override fun clear() {
clearCalls += 1
delegate.clear()
}
override fun installNodeInfo(info: NodeInfo, withBroadcast: Boolean) {
installCalls += info to withBroadcast
delegate.installNodeInfo(info, withBroadcast)
}
override fun removeByNodenum(nodeNum: Int) {
removeCalls += nodeNum
delegate.removeByNodenum(nodeNum)
}
fun setNodes(nodes: List<Node>) {
delegate.setNodes(nodes)
}
}
private class TestRadioClientAccessor(client: RadioClient) : RadioClientAccessor {
override val client = MutableStateFlow<RadioClient?>(client)
override fun rebuildAndConnectAsync() = Unit
override fun disconnect() = Unit
}
}
private class NodeBridgeSeededHeartbeatStorageProvider(
private val heartbeats: Map<NodeId, Long>,
) : StorageProvider {
override suspend fun activate(identity: TransportIdentity) =
InMemoryStorage().also { storage ->
heartbeats.forEach { (nodeId, heartbeatMs) ->
storage.saveHeartbeat(nodeId, heartbeatMs)
}
}
}
@@ -0,0 +1,268 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import dev.mokkery.MockMode
import dev.mokkery.mock
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.model.MessageStatus
import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.testing.FakeNodeRepository
import org.meshtastic.core.testing.FakeServiceRepository
import org.meshtastic.proto.Data
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.StoreForwardPlusPlus
import org.meshtastic.sdk.NodeId
import org.meshtastic.sdk.RadioClient
import org.meshtastic.sdk.StoreForwardEvent
import org.meshtastic.sdk.TransportIdentity
import org.meshtastic.sdk.testing.FakeRadioTransport
import org.meshtastic.sdk.testing.InMemoryStorageProvider
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@OptIn(ExperimentalCoroutinesApi::class)
class SdkPacketBridgeTest {
@Test
fun `packet received is emitted to service repository`() = runTest {
val serviceRepository = FakeServiceRepository()
val bridge = SdkPacketBridge(serviceRepository, lazyOf(mock(MockMode.autofill)), FakeNodeRepository())
val (transport, client) = connectedClient()
bridge.observe(TestRadioClientAccessor(client), backgroundScope)
client.connect()
runCurrent()
val packet = MeshPacket(from = 0x10101010, to = 0, decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP))
val packetAwaiter = backgroundScope.async { serviceRepository.meshPacketFlow.first() }
runCurrent()
transport.injectPacket(packet)
runCurrent()
assertEquals(packet, packetAwaiter.await())
client.disconnect()
}
@Test
fun `store forward server list tracks discovered servers`() = runTest {
val serviceRepository = FakeServiceRepository()
val bridge = SdkPacketBridge(serviceRepository, lazyOf(mock(MockMode.autofill)), FakeNodeRepository())
val (transport, client) = connectedClient()
bridge.observe(TestRadioClientAccessor(client), backgroundScope)
client.connect()
runCurrent()
transport.injectStoreForwardResponse(
requestId = 0,
message = org.meshtastic.proto.StoreAndForward(
rr = org.meshtastic.proto.StoreAndForward.RequestResponse.ROUTER_HEARTBEAT,
heartbeat = org.meshtastic.proto.StoreAndForward.Heartbeat(period = 300),
),
fromNode = 0x0A0A0A0A,
)
transport.injectStoreForwardResponse(
requestId = 0,
message = org.meshtastic.proto.StoreAndForward(
rr = org.meshtastic.proto.StoreAndForward.RequestResponse.ROUTER_HEARTBEAT,
heartbeat = org.meshtastic.proto.StoreAndForward.Heartbeat(period = 300),
),
fromNode = 0x0B0B0B0B,
)
runCurrent()
assertEquals(listOf(0x0A0A0A0A, 0x0B0B0B0B), serviceRepository.storeForwardServers.value)
client.disconnect()
}
@Test
fun `sfpp confirmed link updates packet repository`() = runTest {
val packetRepository = RecordingPacketRepository()
val nodeRepository = FakeNodeRepository().apply { setMyNodeNum(0x11111111) }
val bridge = SdkPacketBridge(FakeServiceRepository(), lazyOf(packetRepository), nodeRepository)
bridge.handleStoreForwardEvent(
StoreForwardEvent.SfppLinkProvided(
packetId = 0x1234,
from = 0x55667788,
to = 0x01020304,
messageHash = byteArrayOf(1, 2, 3, 4),
confirmed = true,
),
)
val call = packetRepository.statusCalls.single()
assertEquals(0x1234, call.packetId)
assertEquals(0x55667788, call.from)
assertEquals(0x01020304, call.to)
assertContentEquals(byteArrayOf(1, 2, 3, 4), call.hash)
assertEquals(MessageStatus.SFPP_CONFIRMED, call.status)
assertEquals(0x11111111, call.myNodeNum)
}
@Test
fun `sfpp routing link updates packet repository`() = runTest {
val packetRepository = RecordingPacketRepository()
val bridge = SdkPacketBridge(FakeServiceRepository(), lazyOf(packetRepository), FakeNodeRepository())
bridge.handleStoreForwardEvent(
StoreForwardEvent.SfppLinkProvided(
packetId = 77,
from = 0x11112222,
to = 0x33334444,
messageHash = byteArrayOf(9, 8, 7),
confirmed = false,
),
)
assertEquals(MessageStatus.SFPP_ROUTING, packetRepository.statusCalls.single().status)
}
@Test
fun `sfpp link without hash is ignored`() = runTest {
val packetRepository = RecordingPacketRepository()
val bridge = SdkPacketBridge(FakeServiceRepository(), lazyOf(packetRepository), FakeNodeRepository())
bridge.handleStoreForwardEvent(
StoreForwardEvent.SfppLinkProvided(
packetId = 1,
from = 2,
to = 3,
messageHash = null,
confirmed = true,
),
)
assertTrue(packetRepository.statusCalls.isEmpty())
assertTrue(packetRepository.hashCalls.isEmpty())
}
@Test
fun `sfpp canon announce updates packet repository by hash`() = runTest {
val packetRepository = RecordingPacketRepository()
val bridge = SdkPacketBridge(FakeServiceRepository(), lazyOf(packetRepository), FakeNodeRepository())
bridge.handleStoreForwardEvent(
StoreForwardEvent.SfppCanonAnnounced(
messageHash = byteArrayOf(7, 6, 5, 4),
rxTime = 0xFEDCBA98L,
),
)
val call = packetRepository.hashCalls.single()
assertContentEquals(byteArrayOf(7, 6, 5, 4), call.hash)
assertEquals(MessageStatus.SFPP_CONFIRMED, call.status)
assertEquals(0xFEDCBA98L, call.rxTime)
}
@Test
fun `unknown packet type is handled without crashing`() = runTest {
val serviceRepository = FakeServiceRepository()
val packetRepository = RecordingPacketRepository()
val bridge = SdkPacketBridge(serviceRepository, lazyOf(packetRepository), FakeNodeRepository())
val (transport, client) = connectedClient()
bridge.observe(TestRadioClientAccessor(client), backgroundScope)
client.connect()
runCurrent()
transport.injectPacket(
MeshPacket(
from = 0x99990000.toInt(),
to = 0,
decoded = Data(portnum = PortNum.UNKNOWN_APP, payload = byteArrayOf(0x01, 0x02).toByteString()),
),
)
runCurrent()
assertTrue(packetRepository.statusCalls.isEmpty())
assertTrue(packetRepository.hashCalls.isEmpty())
assertEquals(emptyList(), serviceRepository.storeForwardServers.value)
client.disconnect()
}
private fun TestScope.connectedClient(): Pair<FakeRadioTransport, RadioClient> {
val transport = FakeRadioTransport(identity = TransportIdentity("fake:packet-bridge"), autoHandshake = true)
val client =
RadioClient.Builder()
.transport(transport)
.storage(InMemoryStorageProvider())
.coroutineContext(backgroundScope.coroutineContext)
.autoSyncTimeOnConnect(false)
.build()
return transport to client
}
private class RecordingPacketRepository(
private val delegate: PacketRepository = mock(MockMode.autofill),
) : PacketRepository by delegate {
data class StatusCall(
val packetId: Int,
val from: Int,
val to: Int,
val hash: ByteArray,
val status: MessageStatus,
val rxTime: Long,
val myNodeNum: Int?,
)
data class HashCall(
val hash: ByteArray,
val status: MessageStatus,
val rxTime: Long,
)
val statusCalls = mutableListOf<StatusCall>()
val hashCalls = mutableListOf<HashCall>()
override suspend fun updateSFPPStatus(
packetId: Int,
from: Int,
to: Int,
hash: ByteArray,
status: MessageStatus,
rxTime: Long,
myNodeNum: Int?,
) {
statusCalls += StatusCall(packetId, from, to, hash.copyOf(), status, rxTime, myNodeNum)
}
override suspend fun updateSFPPStatusByHash(hash: ByteArray, status: MessageStatus, rxTime: Long) {
hashCalls += HashCall(hash.copyOf(), status, rxTime)
}
}
private class TestRadioClientAccessor(client: RadioClient) : RadioClientAccessor {
override val client = MutableStateFlow<RadioClient?>(client)
override fun rebuildAndConnectAsync() = Unit
override fun disconnect() = Unit
}
}
@@ -0,0 +1,146 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import kotlinx.coroutines.ExperimentalCoroutinesApi
import okio.ByteString.Companion.toByteString
import org.meshtastic.proto.Data
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.Neighbor
import org.meshtastic.proto.NeighborInfo
import org.meshtastic.proto.PortNum
import org.meshtastic.sdk.NodeId
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlinx.coroutines.test.runTest
@OptIn(ExperimentalCoroutinesApi::class)
class SdkTopologyBridgeTest {
@Test
fun `neighbor info packet updates topology graph`() = runTest {
val topologyService = MeshTopologyService()
val bridge = SdkTopologyBridge(topologyService)
bridge.handleNeighborInfoPacket(
neighborInfoPacket(
from = 0x11111111,
info = NeighborInfo(
last_sent_by_id = 1234,
neighbors = listOf(
Neighbor(node_id = 0x22222222, snr = 7.5f),
Neighbor(node_id = 0x33333333, snr = -2.25f),
),
),
),
)
val edges = topologyService.edges.value
assertEquals(2, edges.size)
assertEquals(3, topologyService.nodeCount.value)
assertEquals(NodeId(0x11111111), edges[0].from)
assertEquals(NodeId(0x22222222), edges[0].to)
assertEquals(7.5f, edges[0].snr)
assertEquals(1234, edges[0].lastUpdated)
assertEquals(NodeId(0x33333333), edges[1].to)
}
@Test
fun `malformed proto is handled without crashing`() = runTest {
val topologyService = MeshTopologyService()
val bridge = SdkTopologyBridge(topologyService)
bridge.handleNeighborInfoPacket(
MeshPacket(
from = 0x44444444,
decoded = Data(
portnum = PortNum.NEIGHBORINFO_APP,
payload = byteArrayOf(0x01, 0x02, 0x03).toByteString(),
),
),
)
assertTrue(topologyService.edges.value.isEmpty())
assertEquals(0, topologyService.nodeCount.value)
}
@Test
fun `empty neighbor list tracks node without edges`() = runTest {
val topologyService = MeshTopologyService()
val bridge = SdkTopologyBridge(topologyService)
bridge.handleNeighborInfoPacket(
neighborInfoPacket(
from = 0x55555555,
info = NeighborInfo(last_sent_by_id = 999, neighbors = emptyList()),
),
)
assertTrue(topologyService.edges.value.isEmpty())
assertEquals(1, topologyService.nodeCount.value)
}
@Test
fun `subsequent reports replace edges from the same reporter`() = runTest {
val topologyService = MeshTopologyService()
val bridge = SdkTopologyBridge(topologyService)
bridge.handleNeighborInfoPacket(
neighborInfoPacket(
from = 0x66666666,
info = NeighborInfo(
neighbors = listOf(
Neighbor(node_id = 0x11110000, snr = 1f),
Neighbor(node_id = 0x22220000, snr = 2f),
),
),
),
)
bridge.handleNeighborInfoPacket(
neighborInfoPacket(
from = 0x66666666,
info = NeighborInfo(neighbors = listOf(Neighbor(node_id = 0x33330000, snr = 3f))),
),
)
val edges = topologyService.edges.value
assertEquals(1, edges.size)
assertEquals(NodeId(0x33330000), edges.single().to)
assertEquals(2, topologyService.nodeCount.value)
}
@Test
fun `empty payload tracks reporter without edges`() = runTest {
val topologyService = MeshTopologyService()
val bridge = SdkTopologyBridge(topologyService)
bridge.handleNeighborInfoPacket(MeshPacket(from = 0x77777777, decoded = Data(portnum = PortNum.NEIGHBORINFO_APP)))
assertTrue(topologyService.edges.value.isEmpty())
assertEquals(1, topologyService.nodeCount.value)
}
private fun neighborInfoPacket(from: Int, info: NeighborInfo) =
MeshPacket(
from = from,
decoded = Data(
portnum = PortNum.NEIGHBORINFO_APP,
payload = NeighborInfo.ADAPTER.encode(info).toByteString(),
),
)
}