From 04628ae10d15875dbe5df92040ee9ac8b54323f2 Mon Sep 17 00:00:00 2001 From: Adrien Date: Sun, 31 Dec 2023 15:04:18 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20Remove=20the=20periodic=20?= =?UTF-8?q?pooling=20to=20get=20the=20rooms=20(joined=20or=20not)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/base.rs | 45 +++--- src/components/chats_window/mod.rs | 3 +- src/matrix_interface/client.rs | 224 +++++++++++++++++------------ src/matrix_interface/requester.rs | 11 +- src/utils.rs | 1 - 5 files changed, 170 insertions(+), 114 deletions(-) diff --git a/src/base.rs b/src/base.rs index b14c2b4..47c32c5 100644 --- a/src/base.rs +++ b/src/base.rs @@ -16,7 +16,7 @@ use tokio::select; use tracing::{debug, error, warn}; use crate::components::chats_window::interface::Interface as ChatsWinInterface; -use crate::matrix_interface::client::{Client, RoomTopicEvent}; +use crate::matrix_interface::client::{Client, RoomEvent}; use crate::matrix_interface::requester::{Receivers, Requester}; use crate::matrix_interface::worker_tasks::LoginStyle; @@ -133,21 +133,30 @@ impl AppSettings { pub static APP_SETTINGS: AtomRef = AtomRef(|_| AppSettings::new()); -async fn on_room(room: Room, rooms_ref: &UseAtomRef) { - let room_id = room.id(); - +async fn on_room(room_id: OwnedRoomId, room: Room, rooms_ref: &UseAtomRef) { // TODO: Update rooms rooms_ref .write() .insert(room_id, RefCell::::new(room)); } -pub async fn on_room_topic(room_topic_event: RoomTopicEvent, rooms_ref: &UseAtomRef) { - let room_id = room_topic_event.0; +async fn on_joining_invitation( + room_id: OwnedRoomId, + room: Room, + rooms_ref: &UseAtomRef, +) { + debug!( + "You're invited to join the \"{}\" room", + room.name().unwrap() + ); + // TODO: Update rooms + rooms_ref + .write() + .insert(room_id, RefCell::::new(room)); +} +pub async fn on_room_topic(room_id: OwnedRoomId, topic: String, rooms_ref: &UseAtomRef) { if let Some(room_ref) = rooms_ref.read().get(&room_id) { - let topic = room_topic_event.1; - let mut room = room_ref.borrow_mut(); room.topic = Some(RefCell::new(topic)); } else { @@ -161,21 +170,20 @@ pub async fn sync_rooms( rooms_ref: UseAtomRef, ) { if let Some(_is_logged) = rx.next().await { - let mut rooms_receiver = receivers.rooms_receiver.borrow_mut(); - let mut room_topic_receiver = receivers.room_topic_receiver.borrow_mut(); + let mut rooms_receiver = receivers.room_receiver.borrow_mut(); loop { + // TODO: Remove select if no more receivers will be used. select! { res = rooms_receiver.recv() => { - if let Ok(room) = res { - on_room(room, &rooms_ref).await; + if let Ok(room_event) = res { + match room_event { + RoomEvent::MemberEvent(room_id, room) => on_room(room_id, room, &rooms_ref).await, + RoomEvent::InviteEvent(room_id, room) => on_joining_invitation(room_id, room, &rooms_ref).await, + RoomEvent::TopicEvent(room_id, topic) => on_room_topic(room_id, topic, &rooms_ref).await, + }; } }, - res = room_topic_receiver.recv() => { - if let Ok(room_topic_event) = res { - on_room_topic(room_topic_event, &rooms_ref).await; - } - } } } } @@ -195,7 +203,8 @@ pub async fn login( if homeserver_url.is_some() && username.is_some() && password.is_some() { let client = Client::spawn(homeserver_url.unwrap()).await; - client.init().await; + // TODO: Handle error case. + let _ = client.init().await; match client .login(LoginStyle::Password(username.unwrap(), password.unwrap())) diff --git a/src/components/chats_window/mod.rs b/src/components/chats_window/mod.rs index d0dfb07..4f4d68d 100644 --- a/src/components/chats_window/mod.rs +++ b/src/components/chats_window/mod.rs @@ -67,9 +67,10 @@ async fn handle_controls<'a>( let mut displayed_room_ids = displayed_room_ids_ref.write(); match displayed_room_ids.take(&room_id) { Some(_) => { - error!("Toggle {} already dispayed... close it", room_id); + error!("{} room already dispayed... close it", room_id); } None => { + error!("{} room isn't dispayed... open it", room_id); displayed_room_ids.insert(room_id); } } diff --git a/src/matrix_interface/client.rs b/src/matrix_interface/client.rs index ca24a19..b93562a 100644 --- a/src/matrix_interface/client.rs +++ b/src/matrix_interface/client.rs @@ -1,12 +1,11 @@ use std::sync::Arc; -use std::time::Duration; use dioxus::prelude::*; +use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::sync::{broadcast, oneshot}; use tokio::task::JoinHandle; -use tracing::{debug, error, warn}; +use tracing::{debug, error}; use matrix_sdk::{ config::SyncSettings, @@ -23,7 +22,9 @@ use matrix_sdk::{ presence::PresenceEvent, reaction::ReactionEventContent, room::{ - member::{OriginalSyncRoomMemberEvent, RoomMemberEventContent}, + member::{ + OriginalSyncRoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent, + }, message::RoomMessageEventContent, name::RoomNameEventContent, redaction::OriginalSyncRoomRedactionEvent, @@ -34,7 +35,7 @@ use matrix_sdk::{ }, OwnedRoomId, }, - Client as MatrixClient, + Client as MatrixClient, RoomState as MatrixRoomState, }; use super::requester::{Receivers, Requester}; @@ -48,43 +49,37 @@ pub enum ClientError { } #[derive(Clone)] -pub struct RoomTopicEvent(pub OwnedRoomId, pub String); +pub enum RoomEvent { + TopicEvent(OwnedRoomId, String), + MemberEvent(OwnedRoomId, Room), + InviteEvent(OwnedRoomId, Room), +} #[derive(Clone)] struct Senders { - rooms_sender: Sender, - room_topic_sender: Sender, + room_sender: Sender, } impl Senders { - fn new(rooms_sender: Sender, room_topic_sender: Sender) -> Self { - Self { - rooms_sender, - room_topic_sender, - } + fn new(room_sender: Sender) -> Self { + Self { room_sender } } } pub struct Client { initialized: bool, client: Option>, - load_handle: Option>, sync_handle: Option>, senders: Senders, } impl Client { - pub fn new( - client: Arc, - rooms_sender: Sender, - room_topic_sender: Sender, - ) -> Self { + pub fn new(client: Arc, room_sender: Sender) -> Self { Self { initialized: false, client: Some(client), - load_handle: None, sync_handle: None, - senders: Senders::new(rooms_sender, room_topic_sender), + senders: Senders::new(room_sender), } } @@ -99,34 +94,88 @@ impl Client { dbg!(_ev); } - async fn on_sync_state_event(_ev: SyncStateEvent, _room: MatrixRoom) { - debug!("== on_sync_state_event =="); + async fn on_sync_state_event(ev: SyncStateEvent, _room: MatrixRoom) { + error!("== on_sync_state_event =="); + if let SyncStateEvent::Original(ev) = ev { + dbg!(ev); + } + } + + async fn on_stripped_room_member_event( + ev: StrippedRoomMemberEvent, + matrix_client: MatrixClient, + matrix_room: MatrixRoom, + senders: Ctx, + ) { + if ev.state_key == matrix_client.user_id().unwrap() { + if matrix_room.state() == MatrixRoomState::Invited { + let room_id = matrix_room.room_id(); + let room_topic = matrix_room.topic().map(RefCell::new); + let room = Room::new( + Arc::new(matrix_room.to_owned()), + room_topic, + matrix_room.is_direct().await.ok(), + ); + + if let Err(err) = senders + .room_sender + .send(RoomEvent::InviteEvent(room_id.to_owned(), room)) + { + error!( + "Unable to publish the new room with \"{}\" id: {}", + room_id, err + ); + } + } + } } async fn on_room_topic_event( ev: SyncStateEvent, - room: MatrixRoom, + matrix_room: MatrixRoom, senders: Ctx, ) { if let SyncStateEvent::Original(ev) = ev { - let room_topic_sender = &senders.room_topic_sender; - let room_id = room.room_id(); + let room_id = matrix_room.room_id(); - if let Err(err) = - room_topic_sender.send(RoomTopicEvent(room_id.to_owned(), ev.content.topic)) + if let Err(err) = senders + .room_sender + .send(RoomEvent::TopicEvent(room_id.to_owned(), ev.content.topic)) { error!("Unable to publish the \"{}\" new topic: {}", room_id, err); } } } - async fn on_room_member_event(_ev: SyncStateEvent, _room: MatrixRoom) { - debug!("== on_room_member_event =="); - // // dbg!(room); - // if room.invited_members_count() > 0 { - // dbg!(room); - // } - // if let SyncStateEvent::Original(ev) = ev {} + async fn on_room_member_event( + ev: SyncStateEvent, + matrix_room: MatrixRoom, + senders: Ctx, + ) { + error!("== on_room_member_event =="); + // dbg!(&matrix_room); + dbg!(matrix_room.room_id()); + + dbg!(ev.membership()); + + if let SyncStateEvent::Original(_ev) = ev { + let room_sender = &senders.room_sender; + + let room_id = matrix_room.room_id(); + let room_topic = matrix_room.topic().map(RefCell::new); + let room = Room::new( + Arc::new(matrix_room.to_owned()), + room_topic, + matrix_room.is_direct().await.ok(), + ); + + if let Err(err) = room_sender.send(RoomEvent::MemberEvent(room_id.to_owned(), room)) { + error!( + "Unable to publish the new room with \"{}\" id: {}", + room_id, err + ); + } + } } async fn on_sync_message_like_room_message_event( @@ -230,8 +279,7 @@ impl Client { pub async fn spawn(homeserver_url: String) -> Requester { let (tx, rx) = unbounded_channel::(); - let (rooms_sender, rooms_receiver) = broadcast::channel(32); - let (room_topic_sender, room_topic_receiver) = broadcast::channel(32); + let (room_sender, room_receiver) = broadcast::channel(32); let matrix_client = Arc::new( MatrixClient::builder() @@ -241,7 +289,7 @@ impl Client { .unwrap(), ); - let mut client = Client::new(matrix_client.clone(), rooms_sender, room_topic_sender); + let mut client = Client::new(matrix_client.clone(), room_sender); tokio::spawn({ async move { @@ -253,8 +301,7 @@ impl Client { matrix_client, tx, receivers: Receivers { - rooms_receiver: RefCell::new(rooms_receiver), - room_topic_receiver: RefCell::new(room_topic_receiver), + room_receiver: RefCell::new(room_receiver), }, } } @@ -267,6 +314,7 @@ impl Client { let _ = client.add_event_handler(Client::on_sync_typing_event); let _ = client.add_event_handler(Client::on_presence_event); let _ = client.add_event_handler(Client::on_sync_state_event); + let _ = client.add_event_handler(Client::on_stripped_room_member_event); let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event); let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event); let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event); @@ -284,36 +332,36 @@ impl Client { self.initialized = true; } - async fn refresh_rooms(matrix_client: &MatrixClient, rooms_sender: &Sender) { - let joined_matrix_rooms_ref = &matrix_client.joined_rooms(); - let invited_matrix_rooms_ref = &matrix_client.invited_rooms(); + // async fn refresh_rooms(matrix_client: &MatrixClient, room_sender: &Sender) { + // let joined_matrix_rooms_ref = &matrix_client.joined_rooms(); + // let invited_matrix_rooms_ref = &matrix_client.invited_rooms(); - for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] { - for matrix_room in matrix_rooms.iter() { - let topic = matrix_room.topic().map(RefCell::new); - let room = Room::new( - Arc::new(matrix_room.to_owned()), - topic, - matrix_room.is_direct().await.ok(), - ); + // for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] { + // for matrix_room in matrix_rooms.iter() { + // let topic = matrix_room.topic().map(RefCell::new); + // let room = Room::new( + // Arc::new(matrix_room.to_owned()), + // topic, + // matrix_room.is_direct().await.ok(), + // ); - if let Err(err) = rooms_sender.send(room) { - warn!("Error: {}", err); - } - } - } - } + // if let Err(err) = room_sender.send(room) { + // warn!("Error: {}", err); + // } + // } + // } + // } - async fn refresh_rooms_forever(matrix_client: &MatrixClient, rooms_channel: &Sender) { - // TODO: Add interval to config - let mut interval = tokio::time::interval(Duration::from_secs(5)); + // async fn refresh_rooms_forever(matrix_client: &MatrixClient, room_channel: &Sender) { + // // TODO: Add interval to config + // let mut interval = tokio::time::interval(Duration::from_secs(5)); - loop { - Self::refresh_rooms(matrix_client, rooms_channel).await; + // loop { + // // Self::refresh_rooms(matrix_client, room_channel).await; - interval.tick().await; - } - } + // interval.tick().await; + // } + // } async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> { let client = self.client.clone().unwrap(); @@ -330,7 +378,7 @@ impl Client { } } - let (synchronized_tx, synchronized_rx) = oneshot::channel(); + // let (synchronized_tx, synchronized_rx) = oneshot::channel(); self.sync_handle = tokio::spawn({ async move { @@ -339,9 +387,9 @@ impl Client { debug!("User connected to the homeserver"); - if let Err(err) = synchronized_tx.send(true) { - warn!("Unable to notify that the Matrix client is now synchronized ({err})"); - } + // if let Err(err) = synchronized_tx.send(true) { + // warn!("Unable to notify that the Matrix client is now synchronized ({err})"); + // } loop { let settings = SyncSettings::default(); @@ -351,32 +399,32 @@ impl Client { }) .into(); - self.start_background_tasks(synchronized_rx); + // self.start_background_tasks(synchronized_rx); Ok(()) } - fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver) { - let client = self.client.clone().unwrap(); - let rooms_sender_ref = &self.senders.rooms_sender; + // fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver) { + // let client = self.client.clone().unwrap(); + // let room_sender_ref = &self.senders.room_sender; - self.load_handle = tokio::spawn({ - to_owned![rooms_sender_ref]; + // self.load_handle = tokio::spawn({ + // to_owned![room_sender_ref]; - async move { - if let Err(err) = synchronized_rx.await { - error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); - } + // async move { + // if let Err(err) = synchronized_rx.await { + // error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); + // } - let rooms_refresh = Self::refresh_rooms_forever( - client.as_ref(), - &rooms_sender_ref - ); - let ((),) = tokio::join!(rooms_refresh); - } - }) - .into(); - } + // let rooms_refresh = Self::refresh_rooms_forever( + // client.as_ref(), + // &room_sender_ref + // ); + // let ((),) = tokio::join!(rooms_refresh); + // } + // }) + // .into(); + // } async fn work(&mut self, mut rx: UnboundedReceiver) { loop { diff --git a/src/matrix_interface/requester.rs b/src/matrix_interface/requester.rs index 9174f79..2be1977 100644 --- a/src/matrix_interface/requester.rs +++ b/src/matrix_interface/requester.rs @@ -5,21 +5,18 @@ use matrix_sdk::Client as MatrixClient; use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::UnboundedSender; -use super::client::RoomTopicEvent; +use super::client::RoomEvent; use super::worker_tasks::{LoginStyle, WorkerTask}; -use crate::base::Room; use crate::utils::oneshot; pub struct Receivers { - pub rooms_receiver: RefCell>, - pub room_topic_receiver: RefCell>, + pub room_receiver: RefCell>, } impl Clone for Receivers { fn clone(&self) -> Self { Self { - rooms_receiver: RefCell::new(self.rooms_receiver.borrow().resubscribe()), - room_topic_receiver: RefCell::new(self.room_topic_receiver.borrow().resubscribe()), + room_receiver: RefCell::new(self.room_receiver.borrow().resubscribe()), } } } @@ -33,6 +30,7 @@ pub struct Requester { impl Requester { pub async fn init(&self) -> anyhow::Result<()> { let (reply, mut response) = oneshot(); + // TODO: Handle error case. self.tx.send(WorkerTask::Init(reply)).unwrap(); match response.recv().await { @@ -43,6 +41,7 @@ impl Requester { pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> { let (reply, mut response) = oneshot(); + // TODO: Handle error case. self.tx.send(WorkerTask::Login(style, reply)).unwrap(); match response.recv().await { diff --git a/src/utils.rs b/src/utils.rs index f55a1d4..64baf06 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -13,7 +13,6 @@ pub struct Sender(_Sender); // TODO: Handle error impl Sender { pub(super) async fn send(self, t: T) { - // self.0.send(t).unwrap(); let _ = self.0.send(t).await; } }