matrix-meshtastic-bridge/src/meshtastic.rs

176 lines
6.5 KiB
Rust
Raw Normal View History

2024-10-10 17:00:44 +00:00
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
use meshtastic::api::state::Connected;
use meshtastic::api::{ConnectedStreamApi, StreamApi};
use meshtastic::protobufs::{from_radio::PayloadVariant, FromRadio};
use meshtastic::protobufs::{mesh_packet, Data};
use meshtastic::utils::{self, generate_rand_id};
use tokio::sync::mpsc::error::SendError;
2024-10-10 17:00:44 +00:00
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use crate::config;
pub struct MeshtasticClient {
decoded_listener: UnboundedReceiver<FromRadio>,
stream_api: ConnectedStreamApi<Connected>,
}
pub(crate) async fn build(
config: config::MeshtasticConfig,
) -> Result<MeshtasticClient, Box<dyn std::error::Error>> {
let stream_api = StreamApi::new();
2024-10-18 05:44:10 +00:00
let tcp_stream = utils::stream::build_tcp_stream(config.address).await?;
2024-10-18 05:44:10 +00:00
let (decoded_listener, stream_api) = stream_api.connect(tcp_stream).await;
2024-10-10 17:00:44 +00:00
log::info!("connected to meshtastic device");
2024-10-10 17:00:44 +00:00
Ok(MeshtasticClient {
decoded_listener: decoded_listener,
stream_api: stream_api,
})
}
impl MeshtasticClient {
pub async fn receive(
mut self: MeshtasticClient,
matrix_sender: Sender<RoomMessageEventContent>,
) -> Result<(), Box<dyn std::error::Error>> {
// let config_id = utils::generate_rand_id();
// let stream_api = stream_api.configure(config_id).await?;
// This loop can be broken with ctrl+c, or by disconnecting
// the attached serial port.
self.stream_api.configure(generate_rand_id()).await?;
2024-10-10 17:00:44 +00:00
log::debug!("listening for messages from meshtastic");
while let Some(packet) = self.decoded_listener.recv().await {
let payload_variant = match packet.payload_variant {
Some(payload_variant) => payload_variant,
None => {
log::debug!(
"Received FromRadio packet with no payload variant, not handling..."
);
continue;
}
};
2024-10-10 17:00:44 +00:00
match payload_variant {
PayloadVariant::Packet(packet) => {
2024-10-17 07:08:14 +00:00
let payload_variant = match &packet.payload_variant {
Some(payload_variant) => payload_variant,
None => {
log::debug!(
"Received MeshPacket with no payload variant, not handling..."
);
return Ok(());
}
};
match payload_variant {
mesh_packet::PayloadVariant::Decoded(decoded) => {
2024-10-17 07:08:14 +00:00
handle_decoded_packet(&matrix_sender, &packet, decoded).await?;
}
mesh_packet::PayloadVariant::Encrypted(encrypted) => {
log::debug!(
"received encrypted packet (decryption not yet supported): {:?}",
encrypted
);
}
};
}
PayloadVariant::MyInfo(my_node_info) => {
debug_message(
&matrix_sender,
format!("I'm alive! MyInfo: {:?}", my_node_info),
)
.await?
}
PayloadVariant::LogRecord(record) => {
log::info!(
"[meshtastic] {} [{}] [{}] {}",
record.level,
record.time,
record.source,
record.message
)
}
PayloadVariant::Channel(channel) => {
log::debug!("Channel: {:?}", channel);
}
2024-10-17 21:42:25 +00:00
PayloadVariant::Config(config) => {
log::debug!("config: {:?}", config);
}
PayloadVariant::ConfigCompleteId(config_complete_id) => {
log::debug!("config complete ID: {:?}", config_complete_id);
}
PayloadVariant::NodeInfo(node_info) => {
log::debug!("node info: {:?}", node_info);
}
PayloadVariant::Rebooted(rebooted) => {
log::debug!("rebooted: {:?}", rebooted);
}
PayloadVariant::ModuleConfig(module_config) => {
log::debug!("module config: {:?}", module_config);
}
PayloadVariant::QueueStatus(queue_status) => {
log::debug!("queue status: {:?}", queue_status);
}
PayloadVariant::XmodemPacket(xmodem_packet) => {
log::debug!("xmodem packet: {:?}", xmodem_packet);
}
PayloadVariant::Metadata(metadata) => {
log::debug!("metadata message: {:?}", metadata);
}
PayloadVariant::MqttClientProxyMessage(mqtt_client_proxy_message) => {
log::debug!("mqtt client proxy message: {:?}", mqtt_client_proxy_message);
}
}
2024-10-10 17:00:44 +00:00
}
Ok(())
}
}
async fn handle_decoded_packet(
matrix_sender: &Sender<RoomMessageEventContent>,
2024-10-17 07:08:14 +00:00
packet: &meshtastic::protobufs::MeshPacket,
decoded: &Data,
) -> Result<(), Box<dyn std::error::Error>> {
match decoded.portnum() {
meshtastic::protobufs::PortNum::TextMessageApp => {
2024-10-17 07:08:14 +00:00
log::debug!("posting packet to matrix: {:?}", packet);
log::debug!("decoded: {:?}", decoded);
let message = format!(
"text from {:02x}: {} (snr: {}, rssi: {}, hop limit: {}, hop start: {})",
packet.from,
std::str::from_utf8(&decoded.payload)?,
packet.rx_snr,
packet.rx_rssi,
packet.hop_limit,
packet.hop_start
);
debug_message(&matrix_sender, message).await?;
}
_ => log::debug!(
"dropping packet we havent implemented support for ({})",
decoded.portnum().as_str_name()
),
}
Ok(())
}
async fn debug_message(
matrix_sender: &Sender<RoomMessageEventContent>,
message: String,
) -> Result<(), SendError<RoomMessageEventContent>> {
log::debug!("[mashtastic]: received {}", message);
matrix_sender
.send(RoomMessageEventContent::text_plain(message))
.await
}