use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use meshtastic::api::state::Connected; use meshtastic::api::{ConnectedStreamApi, StreamApi}; use meshtastic::protobufs::{from_radio::PayloadVariant, FromRadio}; use meshtastic::protobufs::{mesh_packet, Data}; use meshtastic::utils::{self, generate_rand_id}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{Sender, UnboundedReceiver}; use crate::config; pub struct MeshtasticClient { decoded_listener: UnboundedReceiver, stream_api: ConnectedStreamApi, } pub(crate) async fn build( config: config::MeshtasticConfig, ) -> Result> { let stream_api = StreamApi::new(); let tcp_stream = utils::stream::build_tcp_stream(config.hostname).await?; let (decoded_listener, stream_api) = stream_api.connect(tcp_stream).await; log::info!("connected to meshtastic device"); Ok(MeshtasticClient { decoded_listener: decoded_listener, stream_api: stream_api, }) } impl MeshtasticClient { pub async fn receive( mut self: MeshtasticClient, matrix_sender: Sender, ) -> Result<(), Box> { // let config_id = utils::generate_rand_id(); // let stream_api = stream_api.configure(config_id).await?; // This loop can be broken with ctrl+c, or by disconnecting // the attached serial port. self.stream_api.configure(generate_rand_id()).await?; log::debug!("listening for messages from meshtastic"); while let Some(packet) = self.decoded_listener.recv().await { let payload_variant = match packet.payload_variant { Some(payload_variant) => payload_variant, None => { log::debug!( "Received FromRadio packet with no payload variant, not handling..." ); continue; } }; match payload_variant { PayloadVariant::Packet(packet) => { let payload_variant = match &packet.payload_variant { Some(payload_variant) => payload_variant, None => { log::debug!( "Received MeshPacket with no payload variant, not handling..." ); return Ok(()); } }; match payload_variant { mesh_packet::PayloadVariant::Decoded(decoded) => { handle_decoded_packet(&matrix_sender, &packet, decoded).await?; } mesh_packet::PayloadVariant::Encrypted(encrypted) => { log::debug!( "received encrypted packet (decryption not yet supported): {:?}", encrypted ); } }; } PayloadVariant::MyInfo(my_node_info) => { debug_message( &matrix_sender, format!("I'm alive! MyInfo: {:?}", my_node_info), ) .await? } PayloadVariant::LogRecord(record) => { log::info!( "[meshtastic] {} [{}] [{}] {}", record.level, record.time, record.source, record.message ) } PayloadVariant::Channel(channel) => { log::debug!("Channel: {:?}", channel); } PayloadVariant::Config(config) => { log::debug!("config: {:?}", config); } PayloadVariant::ConfigCompleteId(config_complete_id) => { log::debug!("config complete ID: {:?}", config_complete_id); } PayloadVariant::NodeInfo(node_info) => { log::debug!("node info: {:?}", node_info); } PayloadVariant::Rebooted(rebooted) => { log::debug!("rebooted: {:?}", rebooted); } PayloadVariant::ModuleConfig(module_config) => { log::debug!("module config: {:?}", module_config); } PayloadVariant::QueueStatus(queue_status) => { log::debug!("queue status: {:?}", queue_status); } PayloadVariant::XmodemPacket(xmodem_packet) => { log::debug!("xmodem packet: {:?}", xmodem_packet); } PayloadVariant::Metadata(metadata) => { log::debug!("metadata message: {:?}", metadata); } PayloadVariant::MqttClientProxyMessage(mqtt_client_proxy_message) => { log::debug!("mqtt client proxy message: {:?}", mqtt_client_proxy_message); } } } Ok(()) } } async fn handle_decoded_packet( matrix_sender: &Sender, packet: &meshtastic::protobufs::MeshPacket, decoded: &Data, ) -> Result<(), Box> { match decoded.portnum() { meshtastic::protobufs::PortNum::TextMessageApp => { log::debug!("posting packet to matrix: {:?}", packet); log::debug!("decoded: {:?}", decoded); let message = format!( "text from {:02x}: {} (snr: {}, rssi: {}, hop limit: {}, hop start: {})", packet.from, std::str::from_utf8(&decoded.payload)?, packet.rx_snr, packet.rx_rssi, packet.hop_limit, packet.hop_start ); debug_message(&matrix_sender, message).await?; } _ => log::debug!( "dropping packet we havent implemented support for ({})", decoded.portnum().as_str_name() ), } Ok(()) } async fn debug_message( matrix_sender: &Sender, message: String, ) -> Result<(), SendError> { log::debug!("[mashtastic]: received {}", message); matrix_sender .send(RoomMessageEventContent::text_plain(message)) .await }