WIP: matrix session restore #4

Draft
finn wants to merge 2 commits from matrix-session-restore into main
6 changed files with 36 additions and 27 deletions
Showing only changes of commit 3b9d40cd97 - Show all commits

1
Cargo.lock generated
View file

@ -1747,6 +1747,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"env_logger", "env_logger",
"futures",
"log", "log",
"matrix-sdk", "matrix-sdk",
"meshtastic", "meshtastic",

View file

@ -6,6 +6,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.89" anyhow = "1.0.89"
env_logger = "0.11.5" env_logger = "0.11.5"
futures = "0.3.31"
log = "0.4.22" log = "0.4.22"
matrix-sdk = { version = "0.7.1", features = ["anyhow"] } matrix-sdk = { version = "0.7.1", features = ["anyhow"] }
meshtastic = "0.1.6" meshtastic = "0.1.6"

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite}; use sqlx::{migrate::MigrateDatabase, sqlite::SqlitePool, Pool, Sqlite};
#[derive(Debug, sqlx::FromRow)] #[derive(Debug, sqlx::FromRow)]
@ -29,7 +31,7 @@ pub(crate) struct DB {
pool: Pool<Sqlite>, pool: Pool<Sqlite>,
} }
pub(crate) async fn setup(db_url: String) -> Result<DB, Box<dyn std::error::Error>> { pub(crate) async fn setup(db_url: String) -> anyhow::Result<Arc<DB>> {
if !Sqlite::database_exists(&db_url).await.unwrap_or(false) { if !Sqlite::database_exists(&db_url).await.unwrap_or(false) {
log::debug!("Creating database {}", &db_url); log::debug!("Creating database {}", &db_url);
match Sqlite::create_database(&db_url).await { match Sqlite::create_database(&db_url).await {
@ -47,7 +49,7 @@ pub(crate) async fn setup(db_url: String) -> Result<DB, Box<dyn std::error::Erro
.await .await
.expect("error running db migrations"); .expect("error running db migrations");
Ok(DB { pool: pool }) Ok(Arc::new(DB { pool: pool }))
} }
impl DB { impl DB {

View file

@ -8,19 +8,6 @@ mod db;
mod matrix; mod matrix;
mod meshtastic; mod meshtastic;
// #[tokio::main]
// async fn main() -> anyhow::Result<()> {
// env_logger::init();
// let config = config::read_config().await;
// let bridge = bridge::Bridge::setup(config).await.unwrap();
// bridge.run().await.unwrap();
// Ok(())
// }
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::init();
@ -44,15 +31,15 @@ async fn main() -> anyhow::Result<()> {
log::info!("matrix and meshtatic clients ready, spawning threads"); log::info!("matrix and meshtatic clients ready, spawning threads");
tokio::spawn(async { let mut tasks = vec![];
meshtastic_client
.receive(matrix_tx)
.await
.expect("error receiving message from meshtastic");
});
tokio::spawn(matrix::sender(matrix_client.clone(), matrix_rx, room_id)); tasks.push(tokio::spawn(matrix::sender(
tokio::spawn(matrix::sync(matrix_client.clone(), &db)); matrix_client,
matrix_rx,
room_id,
)));
tasks.push(tokio::spawn(matrix::sync(matrix_client, db)));
tasks.push(tokio::spawn(meshtastic_client.receive(matrix_tx)));
match signal::ctrl_c().await { match signal::ctrl_c().await {
Ok(()) => {} Ok(()) => {}
@ -61,6 +48,12 @@ async fn main() -> anyhow::Result<()> {
} }
} }
log::info!("shutting down tasks");
meshtastic_client.close().await;
futures::future::join_all(tasks).await;
log::debug!("shutting down database connection"); log::debug!("shutting down database connection");
db.close().await; db.close().await;
log::debug!("db connection closed"); log::debug!("db connection closed");

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
use crate::{config, db}; use crate::{config, db};
use log; use log;
use matrix_sdk::{ use matrix_sdk::{
@ -119,7 +121,7 @@ pub(crate) async fn sender(
client: Client, client: Client,
mut rx: Receiver<RoomMessageEventContent>, mut rx: Receiver<RoomMessageEventContent>,
room_id: OwnedRoomId, room_id: OwnedRoomId,
) { ) -> anyhow::Result<()> {
let room = match client.get_room(&room_id) { let room = match client.get_room(&room_id) {
Some(room) => room, Some(room) => room,
None => panic!("requested matrix room not found"), None => panic!("requested matrix room not found"),
@ -131,9 +133,11 @@ pub(crate) async fn sender(
Ok(response) => log::debug!("sent message to matrix: {:?}", response), Ok(response) => log::debug!("sent message to matrix: {:?}", response),
} }
} }
Ok(())
} }
pub(crate) async fn sync(client: Client, db: &db::DB) -> anyhow::Result<()> { pub(crate) async fn sync(client: Client, db: Arc<db::DB>) -> anyhow::Result<()> {
let filter = FilterDefinition::with_lazy_loading(); let filter = FilterDefinition::with_lazy_loading();
let mut sync_settings = SyncSettings::default().filter(filter.into()); let mut sync_settings = SyncSettings::default().filter(filter.into());

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
use meshtastic::api::state::Connected; use meshtastic::api::state::Connected;
use meshtastic::api::{ConnectedStreamApi, StreamApi}; use meshtastic::api::{ConnectedStreamApi, StreamApi};
@ -33,7 +35,7 @@ impl MeshtasticClient {
pub async fn receive( pub async fn receive(
mut self: MeshtasticClient, mut self: MeshtasticClient,
matrix_sender: Sender<RoomMessageEventContent>, matrix_sender: Sender<RoomMessageEventContent>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> anyhow::Result<()> {
// let config_id = utils::generate_rand_id(); // let config_id = utils::generate_rand_id();
// let stream_api = stream_api.configure(config_id).await?; // let stream_api = stream_api.configure(config_id).await?;
@ -109,13 +111,19 @@ impl MeshtasticClient {
Ok(()) Ok(())
} }
pub async fn close(self: &mut MeshtasticClient) -> anyhow::Result<()> {
self.decoded_listener.close();
Ok(())
}
} }
async fn handle_decoded_packet( async fn handle_decoded_packet(
matrix_sender: &Sender<RoomMessageEventContent>, matrix_sender: &Sender<RoomMessageEventContent>,
// packet: MeshPacket, // packet: MeshPacket,
decoded: Data, decoded: Data,
) -> Result<(), Box<dyn std::error::Error>> { ) -> anyhow::Result<()> {
match decoded.portnum() { match decoded.portnum() {
meshtastic::protobufs::PortNum::TextMessageApp => { meshtastic::protobufs::PortNum::TextMessageApp => {
debug_message( debug_message(