diff --git a/Cargo.lock b/Cargo.lock index 5f096c4..46eec76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1747,6 +1747,7 @@ version = "0.1.0" dependencies = [ "anyhow", "env_logger", + "futures", "log", "matrix-sdk", "meshtastic", diff --git a/Cargo.toml b/Cargo.toml index 49dcadd..278dcf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0.89" env_logger = "0.11.5" +futures = "0.3.31" log = "0.4.22" matrix-sdk = { version = "0.7.1", features = ["anyhow"] } meshtastic = "0.1.6" diff --git a/src/db.rs b/src/db.rs index 5523229..666d5c4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite}; #[derive(Debug, sqlx::FromRow)] @@ -29,7 +31,7 @@ pub(crate) struct DB { pool: Pool, } -pub(crate) async fn setup(db_url: String) -> Result> { +pub(crate) async fn setup(db_url: String) -> anyhow::Result> { if !Sqlite::database_exists(&db_url).await.unwrap_or(false) { log::debug!("Creating database {}", &db_url); match Sqlite::create_database(&db_url).await { @@ -47,7 +49,7 @@ pub(crate) async fn setup(db_url: String) -> Result anyhow::Result<()> { -// env_logger::init(); - -// let config = config::read_config().await; - -// let bridge = bridge::Bridge::setup(config).await.unwrap(); - -// bridge.run().await.unwrap(); - -// Ok(()) -// } - #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); @@ -44,15 +31,15 @@ async fn main() -> anyhow::Result<()> { log::info!("matrix and meshtatic clients ready, spawning threads"); - tokio::spawn(async { - meshtastic_client - .receive(matrix_tx) - .await - .expect("error receiving message from meshtastic"); - }); + let mut tasks = vec![]; - tokio::spawn(matrix::sender(matrix_client.clone(), matrix_rx, room_id)); - tokio::spawn(matrix::sync(matrix_client.clone(), &db)); + tasks.push(tokio::spawn(matrix::sender( + matrix_client, + matrix_rx, + room_id, + ))); + tasks.push(tokio::spawn(matrix::sync(matrix_client, db))); + tasks.push(tokio::spawn(meshtastic_client.receive(matrix_tx))); match signal::ctrl_c().await { Ok(()) => {} @@ -61,6 +48,12 @@ async fn main() -> anyhow::Result<()> { } } + log::info!("shutting down tasks"); + + meshtastic_client.close().await; + + futures::future::join_all(tasks).await; + log::debug!("shutting down database connection"); db.close().await; log::debug!("db connection closed"); diff --git a/src/matrix.rs b/src/matrix.rs index 8528f66..d949ebb 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{config, db}; use log; use matrix_sdk::{ @@ -119,7 +121,7 @@ pub(crate) async fn sender( client: Client, mut rx: Receiver, room_id: OwnedRoomId, -) { +) -> anyhow::Result<()> { let room = match client.get_room(&room_id) { Some(room) => room, None => panic!("requested matrix room not found"), @@ -131,9 +133,11 @@ pub(crate) async fn sender( Ok(response) => log::debug!("sent message to matrix: {:?}", response), } } + + Ok(()) } -pub(crate) async fn sync(client: Client, db: &db::DB) -> anyhow::Result<()> { +pub(crate) async fn sync(client: Client, db: Arc) -> anyhow::Result<()> { let filter = FilterDefinition::with_lazy_loading(); let mut sync_settings = SyncSettings::default().filter(filter.into()); diff --git a/src/meshtastic.rs b/src/meshtastic.rs index 60593d8..06e34a6 100644 --- a/src/meshtastic.rs +++ b/src/meshtastic.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use meshtastic::api::state::Connected; use meshtastic::api::{ConnectedStreamApi, StreamApi}; @@ -33,7 +35,7 @@ impl MeshtasticClient { pub async fn receive( mut self: MeshtasticClient, matrix_sender: Sender, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { // let config_id = utils::generate_rand_id(); // let stream_api = stream_api.configure(config_id).await?; @@ -109,13 +111,19 @@ impl MeshtasticClient { Ok(()) } + + pub async fn close(self: &mut MeshtasticClient) -> anyhow::Result<()> { + self.decoded_listener.close(); + + Ok(()) + } } async fn handle_decoded_packet( matrix_sender: &Sender, // packet: MeshPacket, decoded: Data, -) -> Result<(), Box> { +) -> anyhow::Result<()> { match decoded.portnum() { meshtastic::protobufs::PortNum::TextMessageApp => { debug_message(