From b19cafc2b268ec69f47428c3eab891dd5d404f06 Mon Sep 17 00:00:00 2001 From: Finn Date: Tue, 15 Oct 2024 23:22:07 -0700 Subject: [PATCH 1/2] wip --- .gitignore | 2 +- Cargo.toml | 2 +- migrations/20241011053437_init.sql | 6 ++ src/bridge.rs | 2 +- src/config.rs | 29 ++++++++- src/db.rs | 67 ++++++++++++++++++--- src/main.rs | 14 ++--- src/matrix.rs | 95 ++++++++++++++++++++++++++---- 8 files changed, 186 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index 74bdf22..d9227c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target config.json -matrix-meshtastic-bridge.db* +/state diff --git a/Cargo.toml b/Cargo.toml index 5651713..49dcadd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,4 +12,4 @@ meshtastic = "0.1.6" serde = { version = "1.0.210", features = ["derive"]} serde_json = "1.0.128" sqlx = { version = "0.7.3", features = ["runtime-tokio-native-tls", "sqlite"] } -tokio = "1.40.0" +tokio = { version = "1.40.0", features = ["full"] } diff --git a/migrations/20241011053437_init.sql b/migrations/20241011053437_init.sql index e8ed445..19801ec 100644 --- a/migrations/20241011053437_init.sql +++ b/migrations/20241011053437_init.sql @@ -13,3 +13,9 @@ CREATE TABLE remote_user ( node_name VARCHAR(100) NOT NULL, room VARCHAR(100) NOT NULL ); + +CREATE TABLE matrix_state ( + id INTEGER PRIMARY KEY NOT NULL, + key TEXT UNIQUE NOT NULL, + value TEXT NOT NULL +); diff --git a/src/bridge.rs b/src/bridge.rs index aaff398..6f72e8c 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -31,7 +31,7 @@ impl Bridge { let (meshtastic_listener, meshtastic_stream_api) = stream_api.connect(serial_stream).await; // setup matrix client - let matrix_client = matrix::build(config.matrix) + let matrix_client = matrix::build(config.matrix, &db) .await .expect("error logging into matrix"); matrix_client.sync_once(SyncSettings::default()).await?; diff --git a/src/config.rs b/src/config.rs index b483c54..da8a70e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,10 +11,15 @@ pub(crate) struct Config { #[derive(serde::Deserialize, Debug)] pub(crate) struct MatrixConfig { pub(crate) username: String, + #[serde(default = "get_matrix_password")] pub(crate) password: String, - #[serde(default = "get_device_name")] + #[serde(default = "get_matrix_device_name")] pub(crate) device_name: String, pub(crate) room: String, + #[serde(default = "get_matrix_state_dir")] + pub(crate) state_dir: String, + #[serde(default = "get_matrix_state_passphrase")] + pub(crate) state_passphrase: String, } #[derive(serde::Deserialize, Debug)] @@ -22,12 +27,30 @@ pub(crate) struct MeshtasticConfig { pub(crate) device: String, } -fn get_device_name() -> String { +fn get_matrix_device_name() -> String { "meshtastic-bridge".to_string() } fn get_db_uri() -> String { - "sqlite://matrix-meshtastic-bridge.db".to_string() + "sqlite://state/matrix-meshtastic-bridge.db".to_string() +} + +fn get_matrix_password() -> String { + match std::env::var("MATRIX_PASSWORD") { + Ok(p) => p, + Err(_) => "".to_string(), + } +} + +fn get_matrix_state_dir() -> String { + "state".to_string() +} + +fn get_matrix_state_passphrase() -> String { + match std::env::var("MATRIX_STATE_PASSPHRASE") { + Ok(p) => p, + Err(_) => "a".to_string(), // I'm sorry but encryption at rest is dumb if the decryption key is stored in the same place as the encrypted data + } } pub(crate) async fn read_config() -> Config { diff --git a/src/db.rs b/src/db.rs index 0845bf0..5523229 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,4 @@ -use meshtastic::protobufs::MyNodeInfo; -use sqlx::{ - migrate::{MigrateDatabase, Migrator}, - sqlite::SqlitePool, - Executor, Pool, Sqlite, -}; +use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite}; #[derive(Debug, sqlx::FromRow)] pub(crate) struct Device { @@ -24,6 +19,12 @@ pub(crate) struct RemoteUser { pub(crate) room: String, } +#[derive(Debug, sqlx::FromRow)] +struct MatrixState { + key: String, + value: String, +} + pub(crate) struct DB { pool: Pool, } @@ -51,7 +52,7 @@ pub(crate) async fn setup(db_url: String) -> Result Result, Box> { let query = sqlx::query_as::<_, Device>("SELECT * FROM devices WHERE node_num = ? LIMIT 1"); @@ -72,7 +73,57 @@ impl DB { Ok(()) } - pub async fn close(self: DB) { + pub async fn get_matrix_session( + self: &DB, + ) -> Result, Box> { + let session = match self.get_matrix_state("session".to_string()).await? { + Some(session_json) => serde_json::from_str(&session_json)?, + None => None, + }; + + Ok(session) + } + + pub async fn set_matrix_session( + self: &DB, + session: matrix_sdk::matrix_auth::MatrixSession, + ) -> anyhow::Result<()> { + let session_json = serde_json::to_string(&session)?; + + self.set_matrix_state("session".to_string(), session_json) + .await + } + + pub async fn get_matrix_sync_token(self: &DB) -> anyhow::Result> { + self.get_matrix_state("session_store".to_string()).await + } + + pub async fn set_matrix_sync_token(self: &DB, token: String) -> anyhow::Result<()> { + self.set_matrix_state("session_store".to_string(), token) + .await + } + + async fn set_matrix_state(self: &DB, key: String, value: String) -> anyhow::Result<()> { + let query = sqlx::query("INSERT INTO matrix_state (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value"); + + query.bind(key).bind(value).execute(&self.pool).await?; + + Ok(()) + } + + pub async fn get_matrix_state(self: &DB, key: String) -> anyhow::Result> { + let query = sqlx::query_as::<_, MatrixState>( + "SELECT value FROM matrix_state WHERE 'key' = ? LIMIT 1", + ); + let result = match query.bind(key).fetch_optional(&self.pool).await.unwrap() { + Some(row) => Some(row.value), + None => None, + }; + + Ok(result) + } + + pub async fn close(self: &DB) { self.pool.close().await } } diff --git a/src/main.rs b/src/main.rs index d284df8..309080c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use matrix_sdk::{config::SyncSettings, ruma::RoomId}; +use matrix_sdk::ruma::RoomId; use tokio::signal; use tokio::sync::mpsc; @@ -27,23 +27,23 @@ async fn main() -> anyhow::Result<()> { let config = config::read_config().await; - let pool = db::setup(config.db).await.expect("error connecting to db"); + let db = db::setup(config.db).await.expect("error connecting to db"); let (matrix_tx, matrix_rx) = mpsc::channel(32); let room_id = RoomId::parse(&config.matrix.room).expect("invalid room id"); log::debug!("matrix room: {:?}", room_id); - let matrix_client = matrix::build(config.matrix) + let matrix_client = matrix::build(config.matrix, &db) .await .expect("error logging into matrix"); - matrix_client.sync_once(SyncSettings::default()).await?; - let meshtastic_client = meshtastic::build(config.meshtastic) .await .expect("error connecting to meshtastic"); + log::info!("matrix and meshtatic clients ready, spawning threads"); + tokio::spawn(async { meshtastic_client .receive(matrix_tx) @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> { }); tokio::spawn(matrix::sender(matrix_client.clone(), matrix_rx, room_id)); - tokio::spawn(matrix::sync(matrix_client.clone())); + tokio::spawn(matrix::sync(matrix_client.clone(), &db)); match signal::ctrl_c().await { Ok(()) => {} @@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> { } log::debug!("shutting down database connection"); - pool.close().await; + db.close().await; log::debug!("db connection closed"); Ok(()) diff --git a/src/matrix.rs b/src/matrix.rs index 2e5fbc9..8528f66 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -1,35 +1,58 @@ -use crate::config; +use crate::{config, db}; use log; use matrix_sdk::{ config::SyncSettings, ruma::{ + api::client::filter::FilterDefinition, events::room::{ member::StrippedRoomMemberEvent, message::{RoomMessageEventContent, SyncRoomMessageEvent}, }, OwnedRoomId, TransactionId, UserId, }, - Client, Room, + Client, Error, LoopCtrl, Room, }; use tokio::sync::mpsc::Receiver; use tokio::time::{sleep, Duration}; pub(crate) async fn build( config: config::MatrixConfig, + db: &db::DB, ) -> Result { + log::debug!("creating matrix client"); let user = UserId::parse(config.username) .expect("invalid matrix username - should be a full mxid with server"); + + log::debug!("storing matrix client state in {}", config.state_dir); let client = Client::builder() .server_name(user.server_name()) + .sqlite_store(config.state_dir, Some(&config.state_passphrase)) .build() .await?; - client - .matrix_auth() - .login_username(user, &config.password) - .initial_device_display_name(&config.device_name) - .send() - .await?; + log::debug!("checking if there's an existing session"); + match db.get_matrix_session().await.unwrap() { + Some(session) => { + log::debug!("restoring session from database"); + client.restore_session(session).await?; + } + None => { + log::debug!("no existing session. Logging in"); + let matrix_auth = client.matrix_auth(); + matrix_auth + .login_username(user, &config.password) + .initial_device_display_name(&config.device_name) + .send() + .await?; + + log::debug!("creating new session"); + let session = matrix_auth + .session() + .expect("A logged-in matrix client should have a session"); + + db.set_matrix_session(session).await.unwrap(); + } + } client.add_event_handler(|ev: SyncRoomMessageEvent| async move { log::debug!("[matrix] Received a message {:?}", ev); @@ -37,7 +60,11 @@ pub(crate) async fn build( client.add_event_handler(on_stripped_state_member); - log::debug!("connected to matrix"); + log::debug!("connected to matrix, syncing"); + + client.sync_once(SyncSettings::default()).await?; + + log::info!("connected to matrix server and synchronized state, ready to continue startup"); return Ok(client); } @@ -106,9 +133,57 @@ pub(crate) async fn sender( } } -pub(crate) async fn sync(client: Client) { +pub(crate) async fn sync(client: Client, db: &db::DB) -> anyhow::Result<()> { + let filter = FilterDefinition::with_lazy_loading(); + let mut sync_settings = SyncSettings::default().filter(filter.into()); + + if let Some(token) = db + .get_matrix_sync_token() + .await + .map_err(|err| Error::UnknownError(err.into()))? + { + sync_settings = sync_settings.token(token); + } + client .sync(SyncSettings::default()) .await .expect("error syncing matrix"); + + loop { + match client.sync_once(sync_settings.clone()).await { + Ok(response) => { + // This is the last time we need to provide this token, the sync method after + // will handle it on its own. + sync_settings = sync_settings.token(response.next_batch.clone()); + db.set_matrix_sync_token(response.next_batch).await?; + break; + } + Err(error) => { + log::error!("An error occurred during initial sync: {error}"); + log::error!("Trying again…"); + } + } + } + + log::info!("The client is ready! Listening to new messages…"); + + // Now that we've synced, let's attach a handler for incoming room messages. + // client.add_event_handler(on_room_message); + + // This loops until we kill the program or an error happens. + client + .sync_with_result_callback(sync_settings, |sync_result| async move { + let response = sync_result?; + + // We persist the token each time to be able to restore our session + db.set_matrix_sync_token(response.next_batch) + .await + .map_err(|err| Error::UnknownError(err.into()))?; + + Ok(LoopCtrl::Continue) + }) + .await?; + + Ok(()) } -- 2.45.2 From 3b9d40cd97632185daf6efdcc706f3fd6ece596a Mon Sep 17 00:00:00 2001 From: Finn Date: Thu, 17 Oct 2024 00:03:11 -0700 Subject: [PATCH 2/2] still failing :sob: --- Cargo.lock | 1 + Cargo.toml | 1 + src/db.rs | 6 ++++-- src/main.rs | 35 ++++++++++++++--------------------- src/matrix.rs | 8 ++++++-- src/meshtastic.rs | 12 ++++++++++-- 6 files changed, 36 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f096c4..46eec76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1747,6 +1747,7 @@ version = "0.1.0" dependencies = [ "anyhow", "env_logger", + "futures", "log", "matrix-sdk", "meshtastic", diff --git a/Cargo.toml b/Cargo.toml index 49dcadd..278dcf3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] anyhow = "1.0.89" env_logger = "0.11.5" +futures = "0.3.31" log = "0.4.22" matrix-sdk = { version = "0.7.1", features = ["anyhow"] } meshtastic = "0.1.6" diff --git a/src/db.rs b/src/db.rs index 5523229..666d5c4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite}; #[derive(Debug, sqlx::FromRow)] @@ -29,7 +31,7 @@ pub(crate) struct DB { pool: Pool, } -pub(crate) async fn setup(db_url: String) -> Result> { +pub(crate) async fn setup(db_url: String) -> anyhow::Result> { if !Sqlite::database_exists(&db_url).await.unwrap_or(false) { log::debug!("Creating database {}", &db_url); match Sqlite::create_database(&db_url).await { @@ -47,7 +49,7 @@ pub(crate) async fn setup(db_url: String) -> Result anyhow::Result<()> { -// env_logger::init(); - -// let config = config::read_config().await; - -// let bridge = bridge::Bridge::setup(config).await.unwrap(); - -// bridge.run().await.unwrap(); - -// Ok(()) -// } - #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); @@ -44,15 +31,15 @@ async fn main() -> anyhow::Result<()> { log::info!("matrix and meshtatic clients ready, spawning threads"); - tokio::spawn(async { - meshtastic_client - .receive(matrix_tx) - .await - .expect("error receiving message from meshtastic"); - }); + let mut tasks = vec![]; - tokio::spawn(matrix::sender(matrix_client.clone(), matrix_rx, room_id)); - tokio::spawn(matrix::sync(matrix_client.clone(), &db)); + tasks.push(tokio::spawn(matrix::sender( + matrix_client, + matrix_rx, + room_id, + ))); + tasks.push(tokio::spawn(matrix::sync(matrix_client, db))); + tasks.push(tokio::spawn(meshtastic_client.receive(matrix_tx))); match signal::ctrl_c().await { Ok(()) => {} @@ -61,6 +48,12 @@ async fn main() -> anyhow::Result<()> { } } + log::info!("shutting down tasks"); + + meshtastic_client.close().await; + + futures::future::join_all(tasks).await; + log::debug!("shutting down database connection"); db.close().await; log::debug!("db connection closed"); diff --git a/src/matrix.rs b/src/matrix.rs index 8528f66..d949ebb 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{config, db}; use log; use matrix_sdk::{ @@ -119,7 +121,7 @@ pub(crate) async fn sender( client: Client, mut rx: Receiver, room_id: OwnedRoomId, -) { +) -> anyhow::Result<()> { let room = match client.get_room(&room_id) { Some(room) => room, None => panic!("requested matrix room not found"), @@ -131,9 +133,11 @@ pub(crate) async fn sender( Ok(response) => log::debug!("sent message to matrix: {:?}", response), } } + + Ok(()) } -pub(crate) async fn sync(client: Client, db: &db::DB) -> anyhow::Result<()> { +pub(crate) async fn sync(client: Client, db: Arc) -> anyhow::Result<()> { let filter = FilterDefinition::with_lazy_loading(); let mut sync_settings = SyncSettings::default().filter(filter.into()); diff --git a/src/meshtastic.rs b/src/meshtastic.rs index 60593d8..06e34a6 100644 --- a/src/meshtastic.rs +++ b/src/meshtastic.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use meshtastic::api::state::Connected; use meshtastic::api::{ConnectedStreamApi, StreamApi}; @@ -33,7 +35,7 @@ impl MeshtasticClient { pub async fn receive( mut self: MeshtasticClient, matrix_sender: Sender, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { // let config_id = utils::generate_rand_id(); // let stream_api = stream_api.configure(config_id).await?; @@ -109,13 +111,19 @@ impl MeshtasticClient { Ok(()) } + + pub async fn close(self: &mut MeshtasticClient) -> anyhow::Result<()> { + self.decoded_listener.close(); + + Ok(()) + } } async fn handle_decoded_packet( matrix_sender: &Sender, // packet: MeshPacket, decoded: Data, -) -> Result<(), Box> { +) -> anyhow::Result<()> { match decoded.portnum() { meshtastic::protobufs::PortNum::TextMessageApp => { debug_message( -- 2.45.2