diff --git a/Cargo.toml b/Cargo.toml index 9e2843f..f548ef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ tracing-subscriber = "0.3.18" thiserror = "1.0.50" turf = "0.7.0" tokio = "1.34.0" -flume = "0.11.0" log = "0.4.20" tracing = "0.1.40" futures-util = "0.3.29" diff --git a/src/base.rs b/src/base.rs index d1b84e4..b0f0c1c 100644 --- a/src/base.rs +++ b/src/base.rs @@ -1,5 +1,3 @@ -use futures::select; - // Cf. https://dioxuslabs.com/learn/0.4/reference/use_coroutine // In order to use/run the rx.next().await statement you will need to extend the [Stream] trait // (used by [UnboundedReceiver]) by adding 'futures_util' as a dependency to your project @@ -12,34 +10,36 @@ use fermi::*; use matrix_sdk::room::Room as MatrixRoom; use matrix_sdk::{ room::RoomMember, - ruma::{OwnedMxcUri, OwnedRoomId, OwnedUserId}, + ruma::{OwnedRoomId, OwnedUserId}, }; +use tokio::select; use tracing::{debug, error, warn}; use crate::matrix_interface::client::{Client, RoomTopicEvent}; +use crate::matrix_interface::requester::Receivers; use crate::matrix_interface::requester::Requester; use crate::matrix_interface::worker_tasks::LoginStyle; -#[derive(Clone, Debug)] -pub struct UserInfo { - pub avatar_url: Option, - pub display_name: Option, - pub blurhash: Option, -} +// #[derive(Clone, Debug)] +// pub struct UserInfo { +// pub avatar_url: Option, +// pub display_name: Option, +// pub blurhash: Option, +// } -impl UserInfo { - pub fn new( - avatar_url: Option, - display_name: Option, - blurhash: Option, - ) -> Self { - Self { - avatar_url, - display_name, - blurhash, - } - } -} +// impl UserInfo { +// pub fn new( +// avatar_url: Option, +// display_name: Option, +// blurhash: Option, +// ) -> Self { +// Self { +// avatar_url, +// display_name, +// blurhash, +// } +// } +// } #[derive(Clone, Debug)] pub struct Room { @@ -80,106 +80,101 @@ impl PartialEq for Room { } pub type ByIdRooms = HashMap>; -pub type ByIdUserInfos = HashMap; +// pub type ByIdUserInfos = HashMap; -#[derive(Clone)] -pub struct Store { - pub is_logged: bool, - pub rooms: ByIdRooms, - pub user_infos: ByIdUserInfos, - pub user_id: Option, -} +// #[derive(Clone)] +// pub struct Store { +// pub is_logged: bool, +// pub rooms: ByIdRooms, +// pub user_infos: ByIdUserInfos, +// pub user_id: Option, +// } -impl Store { - pub fn new() -> Self { - Self { - is_logged: false, - rooms: HashMap::new(), - user_infos: HashMap::new(), - user_id: None, - } - } -} +// impl Store { +// pub fn new() -> Self { +// Self { +// is_logged: false, +// rooms: HashMap::new(), +// user_infos: HashMap::new(), +// user_id: None, +// } +// } +// } -impl PartialEq for Store { - fn eq(&self, other: &Self) -> bool { - self.is_logged == other.is_logged - && self.user_id == other.user_id - && self.user_infos.len() == other.user_infos.len() - && self - .user_infos - .keys() - .all(|k| other.user_infos.contains_key(k)) - && self.rooms.len() == other.rooms.len() - && self.rooms.keys().all(|k| other.rooms.contains_key(k)) - } -} +// impl PartialEq for Store { +// fn eq(&self, other: &Self) -> bool { +// self.is_logged == other.is_logged +// && self.user_id == other.user_id +// && self.user_infos.len() == other.user_infos.len() +// && self +// .user_infos +// .keys() +// .all(|k| other.user_infos.contains_key(k)) +// && self.rooms.len() == other.rooms.len() +// && self.rooms.keys().all(|k| other.rooms.contains_key(k)) +// } +// } -impl Eq for Store {} +// impl Eq for Store {} pub struct AppSettings { - requester: Option>, - pub store: Store, + pub requester: Option>, } impl AppSettings { pub fn new() -> Self { - Self { - requester: None, - store: Store::new(), - } + Self { requester: None } } - pub fn set_requester(&mut self, requester: Box) { + pub fn set_requester(&mut self, requester: RefCell) { self.requester = Some(requester); } } pub static APP_SETTINGS: AtomRef = AtomRef(|_| AppSettings::new()); -async fn on_room(room_option: Option, rooms_ref: &UseAtomRef) { - if let Some(room) = room_option { - let room_id = room.id(); +async fn on_room(room: Room, rooms_ref: &UseAtomRef) { + let room_id = room.id(); - // TODO: Update rooms - rooms_ref - .write() - .insert(room_id, RefCell::::new(room)); - } + // TODO: Update rooms + rooms_ref + .write() + .insert(room_id, RefCell::::new(room)); } -async fn on_room_topic( - room_topic_event_option: Option, - rooms_ref: &UseAtomRef, -) { - if let Some(room_topic_event) = room_topic_event_option { - let room_id = room_topic_event.0; +pub async fn on_room_topic(room_topic_event: RoomTopicEvent, rooms_ref: &UseAtomRef) { + let room_id = room_topic_event.0; - if let Some(room_ref) = rooms_ref.read().get(&room_id) { - let topic = room_topic_event.1; + if let Some(room_ref) = rooms_ref.read().get(&room_id) { + let topic = room_topic_event.1; - let mut room = room_ref.borrow_mut(); - room.topic = Some(RefCell::new(topic)); - } else { - warn!("No room found with the \"{}\" id", room_id); - } + let mut room = room_ref.borrow_mut(); + room.topic = Some(RefCell::new(topic)); + } else { + warn!("No room found with the \"{}\" id", room_id); } } pub async fn sync_rooms( mut rx: UnboundedReceiver, - app_settings_ref: UseAtomRef, + receivers: Receivers, rooms_ref: UseAtomRef, ) { while let Some(_is_logged) = rx.next().await { - if let Some(requester) = &app_settings_ref.read().requester { - let mut room_stream = requester.rooms_receiver.stream(); - let mut room_topic_stream = requester.room_topic_receiver.stream(); + let mut rooms_receiver = receivers.rooms_receiver.borrow_mut(); + let mut room_topic_receiver = receivers.room_topic_receiver.borrow_mut(); - loop { - select! { - room = room_stream.next() => on_room(room, &rooms_ref).await, - room_topic_event = room_topic_stream.next() => on_room_topic(room_topic_event, &rooms_ref).await, + loop { + select! { + res = rooms_receiver.recv() => { + if let Ok(room) = res { + on_room(room, &rooms_ref).await; + } + }, + res = room_topic_receiver.recv() => { + if let Ok(room_topic_event) = res { + on_room_topic(room_topic_event, &rooms_ref).await; + } } } } @@ -215,7 +210,7 @@ pub async fn login( // invalid_login.modify(|_| true); } } - app_settings_ref.write().set_requester(Box::new(client)); + app_settings_ref.write().set_requester(RefCell::new(client)); } else { warn!("At least one of the following values is/are invalid: homeserver, username or password"); } diff --git a/src/components/chats_window/chats_window.rs b/src/components/chats_window/chats_window.rs index 2c5852a..e9e8ff2 100644 --- a/src/components/chats_window/chats_window.rs +++ b/src/components/chats_window/chats_window.rs @@ -1,25 +1,40 @@ use dioxus::prelude::*; +use fermi::*; use tracing::debug; +use crate::base::{sync_rooms, ROOMS}; use crate::components::avatar_selector::AvatarSelector; -use crate::components::chats_window::edit_section::EditSection; use crate::components::icons::DownArrowIcon; +use crate::matrix_interface::requester::Receivers; + +use super::edit_section::EditSection; turf::style_sheet!("src/components/chats_window/chats_window.scss"); -pub fn ChatsWindow(cx: Scope) -> Element { +pub struct ChatsWindowProps { + pub receivers: Receivers, +} + +pub fn ChatsWindow(cx: Scope) -> Element { debug!("ChatsWindow rendering"); - let room_names = vec![ - "MON POTE", - "Second room", - "Third room", - "AAAAAAAAAAAA", - "BBBBBBBBBBBBBBBBBBB", - "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", - ]; + let receivers = &cx.props.receivers; - let rendered_room_tabs = room_names.into_iter().map(|room_name| { + use_init_atom_root(cx); + + let rooms_ref = use_atom_ref(cx, &ROOMS); + + let sync_rooms_coro = use_coroutine(cx, |rx| { + to_owned![receivers]; + + sync_rooms(rx, receivers, rooms_ref.clone()) + }); + sync_rooms_coro.send(true); + + let rooms = rooms_ref.read(); + let rendered_room_tabs = rooms.values().map(|room| { + let room = room.borrow(); + let room_name = room.name().unwrap_or(room.id().to_string()); rsx!( div { class: ClassName::TAB, diff --git a/src/components/contacts_window/user_infos.rs b/src/components/contacts_window/user_infos.rs index aad1cc1..3597ff0 100644 --- a/src/components/contacts_window/user_infos.rs +++ b/src/components/contacts_window/user_infos.rs @@ -1,8 +1,8 @@ use dioxus::prelude::*; -use fermi::*; +// use fermi::*; use tracing::debug; -use crate::base::APP_SETTINGS; +// use crate::base::APP_SETTINGS; use crate::components::avatar_selector::AvatarSelector; use crate::components::icons::DownArrowIcon; @@ -13,28 +13,28 @@ static MESSAGE_PLACEHOLDER: &str = ""; pub fn UserInfos(cx: Scope) -> Element { debug!("UserInfos rendering"); - let app_settings = use_atom_ref(cx, &APP_SETTINGS); - let store = &app_settings.read().store; + // let app_settings = use_atom_ref(cx, &APP_SETTINGS); + // let store = &app_settings.read().store; - println!("----------------------------------"); - println!("UserInfos rendering"); - // println!("store={:?}", &store); - dbg!(&store.user_id); - println!("----------------------------------"); + // println!("----------------------------------"); + // println!("UserInfos rendering"); + // // println!("store={:?}", &store); + // dbg!(&store.user_id); + // println!("----------------------------------"); // let user_id = store.user_id..as_ref().unwrap(); // let mut user_info_option = None; - let mut user_display_name_option = None; + let user_display_name_option: Option = None; - let user_id_option = &store.user_id; - if user_id_option.is_some() { - let user_id = user_id_option.as_ref().unwrap(); - let user_info_option = store.user_infos.get(user_id); - if user_info_option.is_some() { - user_display_name_option = user_info_option.unwrap().display_name.as_ref(); - } - } + // let user_id_option = &store.user_id; + // if user_id_option.is_some() { + // let user_id = user_id_option.as_ref().unwrap(); + // let user_info_option = store.user_infos.get(user_id); + // if user_info_option.is_some() { + // user_display_name_option = user_info_option.unwrap().display_name.as_ref(); + // } + // } cx.render(rsx! { style { STYLE_SHEET }, diff --git a/src/components/login.rs b/src/components/login.rs index 4f9dad2..777da91 100644 --- a/src/components/login.rs +++ b/src/components/login.rs @@ -2,14 +2,11 @@ use std::str::FromStr; use dioxus::prelude::*; use fermi::*; -use matrix_sdk::ruma::user_id; -use tracing::{debug, error}; +use tracing::debug; -use crate::base::{APP_SETTINGS, SESSION}; +use crate::base::SESSION; use crate::components::avatar_selector::AvatarSelector; use crate::components::header::Header; -use crate::matrix_interface::client::Client; -use crate::matrix_interface::worker_tasks::LoginStyle; turf::style_sheet!("src/components/login.scss"); diff --git a/src/main.rs b/src/main.rs index 5b547ba..2516ad3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ pub mod components; pub mod matrix_interface; use crate::base::{login, sync_rooms, APP_SETTINGS, ROOMS, SESSION}; -use crate::components::chats_window::chats_window::ChatsWindow; +use crate::components::chats_window::chats_window::{ChatsWindow, ChatsWindowProps}; use crate::components::main_window::MainWindow; mod base; @@ -23,47 +23,63 @@ fn App(cx: Scope) -> Element { let session_ref = use_atom_ref(cx, &SESSION); let rooms_ref = use_atom_ref(cx, &ROOMS); + let chats_win_state = use_state(cx, || None); + let login_coro = use_coroutine(cx, |rx| { login(rx, app_settings_ref.clone(), session_ref.clone()) }); - let sync_rooms_coro = use_coroutine(cx, |rx| { - sync_rooms(rx, app_settings_ref.clone(), rooms_ref.clone()) - }); + let mut sync_rooms_coro = None; - let is_logged = session_ref.read().is_logged; - - login_coro.send(is_logged); - - if is_logged { - sync_rooms_coro.send(is_logged); + if let Some(requester) = &app_settings_ref.read().requester { + sync_rooms_coro = Some(use_coroutine(cx, |rx| { + sync_rooms(rx, requester.borrow().receivers.clone(), rooms_ref.clone()) + })); } - let chats_win_state = use_state(cx, || None); - - if is_logged && chats_win_state.is_none() { - let chats_window = dioxus_desktop::use_window(cx); - - let chats_dom = VirtualDom::new(ChatsWindow); - let window_cfg = Config::default().with_custom_head( - r#" - + "# + .to_owned(), + ); + + let chats_window_desktop_service = chats_window.new_window(chats_dom, window_cfg); + chats_win_state.set(Some(chats_window_desktop_service)); } - - "# - .to_owned(), - ); - let chats_window_desktop_service = chats_window.new_window(chats_dom, window_cfg); - chats_win_state.set(Some(chats_window_desktop_service)); } cx.render(rsx! { diff --git a/src/matrix_interface/client.rs b/src/matrix_interface/client.rs index bcd205d..c539740 100644 --- a/src/matrix_interface/client.rs +++ b/src/matrix_interface/client.rs @@ -1,12 +1,11 @@ -// TODO: make a choice: mpsc vs flume. use std::cell::RefCell; use std::sync::Arc; use std::time::Duration; use dioxus::prelude::to_owned; -use flume::{bounded, unbounded}; -use flume::{Receiver as FlumeReceiver, Sender}; +use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::sync::{broadcast, oneshot}; use tokio::task::JoinHandle; use tracing::{debug, error, warn}; @@ -39,7 +38,7 @@ use matrix_sdk::{ Client as MatrixClient, }; -use super::requester::Requester; +use super::requester::{Receivers, Requester}; use super::worker_tasks::{LoginStyle, WorkerTask}; use crate::base::Room; @@ -49,19 +48,17 @@ pub enum ClientError { Matrix(#[from] matrix_sdk::Error), } +#[derive(Clone)] pub struct RoomTopicEvent(pub OwnedRoomId, pub String); #[derive(Clone)] struct Senders { - rooms_sender: flume::Sender, - room_topic_sender: flume::Sender, + rooms_sender: Sender, + room_topic_sender: Sender, } impl Senders { - fn new( - rooms_sender: flume::Sender, - room_topic_sender: flume::Sender, - ) -> Self { + fn new(rooms_sender: Sender, room_topic_sender: Sender) -> Self { Self { rooms_sender, room_topic_sender, @@ -80,8 +77,8 @@ pub struct Client { impl Client { pub fn new( client: Arc, - rooms_sender: flume::Sender, - room_topic_sender: flume::Sender, + rooms_sender: Sender, + room_topic_sender: Sender, ) -> Self { Self { initialized: false, @@ -105,7 +102,6 @@ impl Client { async fn on_sync_state_event(_ev: SyncStateEvent, _room: MatrixRoom) { debug!("== on_sync_state_event =="); - dbg!(_ev); } async fn on_room_topic_event( @@ -113,25 +109,20 @@ impl Client { room: MatrixRoom, senders: Ctx, ) { - debug!("== on_room_topic_event =="); - dbg!(&ev); - if let SyncStateEvent::Original(ev) = ev { let room_topic_sender = &senders.room_topic_sender; let room_id = room.room_id(); - if let Err(err) = room_topic_sender - .send_async(RoomTopicEvent(room_id.to_owned(), ev.content.topic)) - .await + if let Err(err) = + room_topic_sender.send(RoomTopicEvent(room_id.to_owned(), ev.content.topic)) { error!("Unable to publish the \"{}\" new topic: {}", room_id, err); } } } - async fn on_room_member_event(ev: SyncStateEvent, _room: MatrixRoom) { + async fn on_room_member_event(_ev: SyncStateEvent, _room: MatrixRoom) { debug!("== on_room_member_event =="); - dbg!(ev); // // dbg!(room); // if room.invited_members_count() > 0 { // dbg!(room); @@ -165,12 +156,11 @@ impl Client { } async fn on_original_sync_room_member_event( - ev: OriginalSyncRoomMemberEvent, + _ev: OriginalSyncRoomMemberEvent, room: MatrixRoom, _client: MatrixClient, ) { debug!("== on_original_sync_room_member_event =="); - dbg!(ev); let room_id = room.room_id(); dbg!(room_id); @@ -243,8 +233,8 @@ impl Client { pub async fn spawn(homeserver_url: String) -> Requester { let (tx, rx) = unbounded_channel::(); - let (rooms_sender, rooms_receiver) = unbounded::(); - let (room_topic_sender, room_topic_receiver) = unbounded::(); + let (rooms_sender, rooms_receiver) = broadcast::channel(32); + let (room_topic_sender, room_topic_receiver) = broadcast::channel(32); let matrix_client = Arc::new( MatrixClient::builder() @@ -265,8 +255,10 @@ impl Client { Requester { matrix_client, tx, - rooms_receiver, - room_topic_receiver, + receivers: Receivers { + rooms_receiver: RefCell::new(rooms_receiver), + room_topic_receiver: RefCell::new(room_topic_receiver), + }, } } @@ -311,7 +303,8 @@ impl Client { topic, matrix_room.is_direct().await.ok(), ); - if let Err(err) = rooms_sender.send_async(room).await { + + if let Err(err) = rooms_sender.send(room) { warn!("Error: {}", err); } } @@ -344,7 +337,7 @@ impl Client { } } - let (synchronized_tx, synchronized_rx) = bounded(1); + let (synchronized_tx, synchronized_rx) = oneshot::channel(); self.sync_handle = tokio::spawn({ async move { @@ -370,7 +363,7 @@ impl Client { Ok(()) } - fn start_background_tasks(&mut self, synchronized_rx: FlumeReceiver) { + fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver) { let client = self.client.clone().unwrap(); let rooms_sender_ref = &self.senders.rooms_sender; @@ -378,7 +371,7 @@ impl Client { to_owned![rooms_sender_ref]; async move { - if let Err(err) = synchronized_rx.recv() { + if let Err(err) = synchronized_rx.await { error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); } diff --git a/src/matrix_interface/requester.rs b/src/matrix_interface/requester.rs index 2930a96..adf30d0 100644 --- a/src/matrix_interface/requester.rs +++ b/src/matrix_interface/requester.rs @@ -1,18 +1,34 @@ +use std::cell::RefCell; use std::sync::Arc; use matrix_sdk::Client as MatrixClient; +use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::UnboundedSender; use super::client::RoomTopicEvent; use super::worker_tasks::{oneshot, LoginStyle, WorkerTask}; use crate::base::Room; +#[derive(Debug)] +pub struct Receivers { + pub rooms_receiver: RefCell>, + pub room_topic_receiver: RefCell>, +} + +impl Clone for Receivers { + fn clone(&self) -> Self { + Self { + rooms_receiver: RefCell::new(self.rooms_receiver.borrow().resubscribe()), + room_topic_receiver: RefCell::new(self.room_topic_receiver.borrow().resubscribe()), + } + } +} + #[derive(Debug)] pub struct Requester { pub matrix_client: Arc, pub tx: UnboundedSender, - pub rooms_receiver: flume::Receiver, - pub room_topic_receiver: flume::Receiver, + pub receivers: Receivers, } impl Requester {