use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use meshtastic::api::state::Connected; use meshtastic::api::{ConnectedStreamApi, StreamApi}; use meshtastic::protobufs::{from_radio::PayloadVariant, FromRadio}; use meshtastic::protobufs::{mesh_packet, Data}; use meshtastic::utils::{self, generate_rand_id}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{Sender, UnboundedReceiver}; use crate::config; pub struct MeshtasticClient { decoded_listener: UnboundedReceiver, stream_api: ConnectedStreamApi, } pub(crate) async fn build( config: config::MeshtasticConfig, ) -> Result> { let stream_api = StreamApi::new(); let serial_stream = utils::stream::build_serial_stream(config.device, None, None, None)?; let (decoded_listener, stream_api) = stream_api.connect(serial_stream).await; log::info!("connected to meshtastic device"); Ok(MeshtasticClient { decoded_listener: decoded_listener, stream_api: stream_api, }) } impl MeshtasticClient { pub async fn receive( mut self: MeshtasticClient, matrix_sender: Sender, ) -> Result<(), Box> { // let config_id = utils::generate_rand_id(); // let stream_api = stream_api.configure(config_id).await?; // This loop can be broken with ctrl+c, or by disconnecting // the attached serial port. self.stream_api.configure(generate_rand_id()).await?; log::debug!("listening for messages from meshtastic"); while let Some(packet) = self.decoded_listener.recv().await { let payload_variant = match packet.payload_variant { Some(payload_variant) => payload_variant, None => { log::debug!( "Received FromRadio packet with no payload variant, not handling..." ); continue; } }; match payload_variant { PayloadVariant::Packet(packet) => { let payload_variant = match &packet.payload_variant { Some(payload_variant) => payload_variant, None => { log::debug!( "Received MeshPacket with no payload variant, not handling..." ); return Ok(()); } }; match payload_variant { mesh_packet::PayloadVariant::Decoded(decoded) => { handle_decoded_packet(&matrix_sender, &packet, decoded).await?; } mesh_packet::PayloadVariant::Encrypted(encrypted) => { log::debug!( "received encrypted packet (decryption not yet supported): {:?}", encrypted ); } }; } PayloadVariant::MyInfo(my_node_info) => { debug_message( &matrix_sender, format!("I'm alive! MyInfo: {:?}", my_node_info), ) .await? } PayloadVariant::LogRecord(record) => { log::info!( "[meshtastic] {} [{}] [{}] {}", record.level, record.time, record.source, record.message ) } PayloadVariant::Channel(channel) => { log::debug!("Channel: {:?}", channel); } // PayloadVariant::NodeInfo(node_info) => { // upsert_remote_node(db, node_info).await? // } // PayloadVariant::Decoded(decoded) => { // debug_message(&matrix_sender, format!("Decoded: {:?}", decoded)) // } _ => log::debug!("dropping unknown packet"), } } Ok(()) } } async fn handle_decoded_packet( matrix_sender: &Sender, packet: &meshtastic::protobufs::MeshPacket, decoded: &Data, ) -> Result<(), Box> { match decoded.portnum() { meshtastic::protobufs::PortNum::TextMessageApp => { log::debug!("posting packet to matrix: {:?}", packet); log::debug!("decoded: {:?}", decoded); debug_message( &matrix_sender, format!( "Text from {:?}: {}", decoded.source, std::str::from_utf8(&decoded.payload)? ), ) .await?; } _ => log::debug!( "dropping packet we havent implemented support for ({})", decoded.portnum().as_str_name() ), } Ok(()) } async fn debug_message( matrix_sender: &Sender, message: String, ) -> Result<(), SendError> { log::debug!("[mashtastic]: received {}", message); matrix_sender .send(RoomMessageEventContent::text_plain(message)) .await }