From bc6b02bc3422deb216f0f6a9e4be5b1a32a0a828 Mon Sep 17 00:00:00 2001 From: Adrien Date: Fri, 10 May 2024 22:32:35 +0200 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20Rework=20the=20Matrix?= =?UTF-8?q?=20messaging=20Client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/infrastructure/messaging/matrix/client.rs | 594 ++++++++++++------ 1 file changed, 419 insertions(+), 175 deletions(-) diff --git a/src/infrastructure/messaging/matrix/client.rs b/src/infrastructure/messaging/matrix/client.rs index 5b0dc34..295b909 100644 --- a/src/infrastructure/messaging/matrix/client.rs +++ b/src/infrastructure/messaging/matrix/client.rs @@ -1,37 +1,46 @@ -use std::borrow::Borrow; -use std::cell::RefCell; -use std::sync::Arc; -use std::time::Duration; +use std::{ + borrow::Borrow, + collections::HashMap, + sync::{Arc, Mutex}, +}; -use async_std::task; +use async_std::stream::StreamExt; use dioxus::prelude::Task; -use log::{debug, error}; -use tokio::sync::broadcast; -use tokio::sync::broadcast::Sender; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::sync::oneshot; -use tokio::task::JoinHandle; - use matrix_sdk::{ config::SyncSettings, event_handler::Ctx, - room::Room as MatrixRoom, + media::{MediaFormat, MediaThumbnailSize}, + room::{Room, RoomMember}, ruma::{ + api::client::media::get_content_thumbnail::v3::Method, events::{ room::{ - member::{RoomMemberEventContent, StrippedRoomMemberEvent}, + member::{ + OriginalSyncRoomMemberEvent, RoomMemberEventContent, StrippedRoomMemberEvent, + }, topic::RoomTopicEventContent, }, SyncStateEvent, }, - OwnedRoomId, + uint, OwnedRoomId, RoomId, UserId, }, - Client as MatrixClient, RoomState as MatrixRoomState, + Client as MatrixClient, RoomMemberships, RoomState, }; -use super::requester::{Receivers, Requester}; -use super::worker_tasks::{LoginStyle, WorkerTask}; -use crate::domain::model::room::Room; +use tokio::sync::{ + broadcast, + broadcast::{error::SendError, Receiver, Sender}, + mpsc::{unbounded_channel, UnboundedReceiver}, +}; +use tracing::{debug, error, warn}; + +use super::{ + account_event::AccountEvent, + requester::Requester, + room_event::{RoomEvent, RoomEventsReceiver}, + worker_tasks::{LoginStyle, WorkerTask}, +}; +use crate::utils::oneshot; #[derive(thiserror::Error, Debug)] pub enum ClientError { @@ -39,21 +48,45 @@ pub enum ClientError { Matrix(#[from] matrix_sdk::Error), } -#[derive(Clone)] -pub enum RoomEvent { - TopicEvent(OwnedRoomId, String), - MemberEvent(OwnedRoomId, Room), - InviteEvent(OwnedRoomId, Room), -} - #[derive(Clone)] struct Senders { - room_events_sender: Sender, + account_events_sender: Sender, + room_events_senders: Arc>>>, } impl Senders { - fn new(room_events_sender: Sender) -> Self { - Self { room_events_sender } + fn new(account_events_sender: Sender) -> Self { + Self { + account_events_sender, + room_events_senders: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn contains(&self, room_id: &RoomId) -> bool { + let room_senders = self.room_events_senders.lock().unwrap(); + + room_senders.contains_key(room_id) + } + + fn send(&self, room_id: &RoomId, event: RoomEvent) -> Result> { + let room_senders = self.room_events_senders.lock().unwrap(); + if let Some(room_sender) = room_senders.get(room_id) { + room_sender.send(event) + } else { + error!("No sender found for \"{}\" room id", room_id); + Ok(0) + } + } + + fn add_room(&self, room_id: &OwnedRoomId) -> Option { + let mut senders = self.room_events_senders.lock().unwrap(); + if !senders.contains_key(room_id) { + let (room_sender, room_receiver) = broadcast::channel(32); + senders.insert(room_id.clone(), room_sender); + Some(RoomEventsReceiver::new(room_receiver)) + } else { + None + } } } @@ -65,16 +98,173 @@ pub struct Client { } impl Client { - pub fn new(client: Arc, room_events_sender: Sender) -> Self { + pub fn new(client: Arc, account_events_sender: Sender) -> Self { Self { initialized: false, client: Some(client), sync_task: None, - senders: Senders::new(room_events_sender), + senders: Senders::new(account_events_sender), } } - // async fn on_sync_typing_event(_ev: SyncTypingEvent, room: MatrixRoom) { + async fn create_space( + senders: &Ctx, + room_id: OwnedRoomId, + room: Option<&Room>, + ) -> anyhow::Result<(), SendError> { + let mut name = None; + let mut topic = None; + + if let Some(room) = room { + name = room.name(); + topic = room.topic(); + } + + if let Some(receiver) = senders.add_room(&room_id) { + let (reply, mut response) = oneshot::(); + + let event = AccountEvent::NewSpace(room_id.clone(), name, topic, receiver, reply); + + if let Err(err) = senders.account_events_sender.send(event) { + error!( + "Unable to publish the new room with \"{}\" id: {}", + room_id, err + ); + return Err(err); + } + + // We're expecting a response indicating that the client is able to compute the next RoomEvent + response.recv().await; + } else { + let events = vec![RoomEvent::NewTopic(topic), RoomEvent::NewName(name)]; + + for event in events { + if let Err(err) = senders.send(&room_id, event.clone()) { + error!( + "Unable to publish the {:?} event to the \"{}\" room: {}", + event, room_id, err + ); + // return Err(err); + } + } + } + Ok(()) + } + + async fn create_room( + senders: &Ctx, + room: &Room, + ) -> anyhow::Result<(), SendError> { + let room_id = room.room_id().to_owned(); + + if let Some(receiver) = senders.add_room(&room_id) { + let (reply, mut response) = oneshot::(); + + let is_direct = match room.is_direct().await { + Ok(is_direct) => Some(is_direct), + Err(err) => { + error!("Unable to know if the room \"{room_id}\" is direct: {err}"); + None + } + }; + + let mut parents = vec![]; + // TODO: Remove unwrap + let mut spaces = room.parent_spaces().await.unwrap(); + while let Some(parent) = spaces.next().await { + match parent { + Ok(parent) => match parent { + matrix_sdk::room::ParentSpace::Reciprocal(parent) => { + parents.push(parent.room_id().to_owned()); + } + _ => todo!(), + }, + Err(err) => { + error!("{}", err); + } + } + } + + let event = AccountEvent::NewRoom( + room_id.clone(), + parents.clone(), + room.name(), + room.topic(), + is_direct, + room.state(), + receiver, + reply, + ); + + if let Err(err) = senders.account_events_sender.send(event) { + error!( + "Unable to publish the new room with \"{}\" id: {}", + room.room_id(), + err + ); + return Err(err); + } + + // We're expecting a response indicating that the client is able to compute the next RoomEvent + response.recv().await; + } + Ok(()) + } + + async fn add_room( + senders: &Ctx, + room: &Room, + ) -> anyhow::Result<(), SendError> { + let room_id = room.room_id().to_owned(); + + if room.is_space() { + Self::create_space(senders, room_id, Some(room)).await + } else { + let ret = Self::create_room(senders, room).await; + + let mut parents = vec![]; + // TODO: Remove unwrap + let mut spaces = room.parent_spaces().await.unwrap(); + while let Some(parent) = spaces.next().await { + match parent { + Ok(parent) => match parent { + matrix_sdk::room::ParentSpace::Reciprocal(parent) => { + parents.push(parent.room_id().to_owned()); + } + _ => { + warn!( + "Only ParentSpace::Reciprocal taken into account, skip {:?}", + parent + ); + } + }, + Err(err) => { + error!("{}", err); + } + } + } + error!("parents={:?}", &parents); + + for parent in parents { + // Create a minimal space to make the relation consistent... its content will be sync later. + if !senders.contains(&parent) { + let _ = Self::create_space(senders, parent.clone(), None).await; + } + + let event = RoomEvent::NewChild(room_id.clone()); + if let Err(err) = senders.send(parent.as_ref(), event.clone()) { + error!( + "Unable to send the {:?} event to the \"{}\": {:?}", + event, parent, err + ); + } + } + + ret + } + } + + // async fn on_sync_typing_event(_ev: SyncTypingEvent, room: Room) { // debug!("== on_sync_typing_event =="); // let room_id = room.room_id().to_owned(); // dbg!(room_id); @@ -85,7 +275,7 @@ impl Client { // dbg!(_ev); // } - // async fn on_sync_state_event(ev: SyncStateEvent, _room: MatrixRoom) { + // async fn on_sync_state_event(ev: SyncStateEvent, _room: Room) { // error!("== on_sync_state_event =="); // if let SyncStateEvent::Original(ev) = ev { // dbg!(ev); @@ -94,32 +284,33 @@ impl Client { // async fn on_original_sync_room_message_event( // ev: OriginalSyncRoomMessageEvent, - // _matrix_room: MatrixRoom, + // _room: Room, // _senders: Ctx, // ) { // error!("== on_original_sync_room_message_event =="); // error!("ev={:?}", ev.content); - // } async fn on_stripped_room_member_event( ev: StrippedRoomMemberEvent, matrix_client: MatrixClient, - matrix_room: MatrixRoom, + room: Room, senders: Ctx, ) { - if ev.state_key == matrix_client.user_id().unwrap() - && matrix_room.state() == MatrixRoomState::Invited - { - let room_id = matrix_room.room_id(); - let room = Room::from_matrix_room(&matrix_room).await; + error!("*** on_stripped_room_member_event ***"); + // error!("ev={:?}", ev); - if let Err(err) = senders - .room_events_sender - .send(RoomEvent::InviteEvent(room_id.to_owned(), room)) - { + if ev.state_key == matrix_client.user_id().unwrap() + && room.state() == RoomState::Invited + && Self::add_room(&senders, &room).await.is_ok() + { + let room_id = room.room_id(); + + let event = RoomEvent::Invitation(); + if let Err(err) = senders.send(room_id, event) { error!( - "Unable to publish the new room with \"{}\" id: {}", - room_id, err + "Unable to publish the room \"{}\" invitation: {}", + room.room_id(), + err ); } } @@ -127,45 +318,56 @@ impl Client { async fn on_room_topic_event( ev: SyncStateEvent, - matrix_room: MatrixRoom, + room: Room, senders: Ctx, ) { - if let SyncStateEvent::Original(ev) = ev { - let room_id = matrix_room.room_id(); + error!("*** on_room_topic_event ***"); + // error!("ev={:?}", ev); - if let Err(err) = senders - .room_events_sender - .send(RoomEvent::TopicEvent(room_id.to_owned(), ev.content.topic)) - { - error!("Unable to publish the \"{}\" new topic: {}", room_id, err); + if let SyncStateEvent::Original(ev) = ev { + let _ = Self::add_room(&senders, &room).await; + + let room_id = room.room_id(); + let event = RoomEvent::NewTopic(Some(ev.content.topic)); + if let Err(err) = senders.send(room_id, event) { + error!( + "Unable to publish the room \"{}\" topic: {}", + room.room_id(), + err + ); } } } async fn on_room_member_event( ev: SyncStateEvent, - matrix_room: MatrixRoom, + room: Room, senders: Ctx, ) { - if let SyncStateEvent::Original(_ev) = ev { - let room_id = matrix_room.room_id(); - let room = Room::from_matrix_room(&matrix_room).await; + error!("*** on_room_member_event ***"); + // error!("ev={:?}", ev); - if let Err(err) = senders - .room_events_sender - .send(RoomEvent::MemberEvent(room_id.to_owned(), room)) - { - error!( - "Unable to publish the new room with \"{}\" id: {}", - room_id, err - ); + if let SyncStateEvent::Original(_ev) = ev { + if Self::add_room(&senders, &room).await.is_ok() { + // let room_id = room.room_id(); + // // TODO: Client shall only manage Matrix object... not BG92 ones. + // let event = RoomEvent::Membership(RoomMember::new(ev.sender, room_id)); + // if let Some(result) = senders.send(room_id, event) { + // if let Err(err) = result { + // error!( + // "Unable to publish the room \"{}\" membership: {}", + // room.room_id(), + // err + // ); + // } + // } } } } // async fn on_sync_message_like_room_message_event( // ev: SyncMessageLikeEvent, - // _room: MatrixRoom, + // _room: Room, // _client: MatrixClient, // ) { // debug!("== on_sync_message_like_room_message_event =="); @@ -174,7 +376,7 @@ impl Client { // async fn on_sync_message_like_reaction_event( // ev: SyncMessageLikeEvent, - // _room: MatrixRoom, + // _room: Room, // ) { // debug!("== on_sync_message_like_reaction_event =="); // dbg!(ev); @@ -182,28 +384,28 @@ impl Client { // async fn on_original_sync_room_redaction_event( // ev: OriginalSyncRoomRedactionEvent, - // _room: MatrixRoom, + // _room: Room, // ) { // debug!("== on_original_sync_room_redaction_event =="); // dbg!(ev); // } - // async fn on_original_sync_room_member_event( - // _ev: OriginalSyncRoomMemberEvent, - // _room: MatrixRoom, - // _client: MatrixClient, - // ) { - // debug!("== on_original_sync_room_member_event =="); - - // let mut store = store_ctx.read().unwrap().to_owned(); - // dbg!(store.rooms.keys()); - // let is_direct = room.is_direct().await.ok(); - // store.rooms.insert( - // OwnedRoomId::from(room_id), - // Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))), - // ); - // let _ = store_ctx.write(store); - // } + async fn on_original_sync_room_member_event( + _ev: OriginalSyncRoomMemberEvent, + _room: Room, + _client: MatrixClient, + ) { + // debug!("== on_original_sync_room_member_event =="); + // error!("room={:?}", room); + // let mut store = store_ctx.read().unwrap().to_owned(); + // dbg!(store.rooms.keys()); + // let is_direct = room.is_direct().await.ok(); + // store.rooms.insert( + // OwnedRoomId::from(room_id), + // Arc::new(RwLock::new(Room::new(Arc::new(room), None, is_direct))), + // ); + // let _ = store_ctx.write(store); + } // async fn on_original_sync_key_verif_start_event( // ev: OriginalSyncKeyVerificationStartEvent, @@ -265,11 +467,7 @@ impl Client { // debug!("== on_room_event({}) ==", ev.) // } - pub async fn spawn(homeserver_url: String) -> Requester { - let (tx, rx) = unbounded_channel::(); - - let (room_sender, room_receiver) = broadcast::channel(32); - + pub async fn spawn(homeserver_url: String) -> (Requester, Receiver) { let matrix_client = Arc::new( MatrixClient::builder() .homeserver_url(&homeserver_url) @@ -278,23 +476,22 @@ impl Client { .unwrap(), ); - let mut client = Client::new(matrix_client.clone(), room_sender); + let (worker_tasks_sender, worker_tasks_receiver) = unbounded_channel::(); + let (account_events_sender, account_events_receiver) = + broadcast::channel::(32); + + let mut client = Client::new(matrix_client, account_events_sender); dioxus::prelude::spawn(async move { - client.work(rx).await; + client.work(worker_tasks_receiver).await; }); - Requester { - matrix_client, - tx, - receivers: Receivers { - room_receiver: RefCell::new(room_receiver), - }, - } + (Requester::new(worker_tasks_sender), account_events_receiver) } fn init(&mut self) { if let Some(client) = self.client.borrow() { + // TODO: Remove clone? client.add_event_handler_context(self.senders.clone()); let _ = client.add_event_handler(Client::on_stripped_room_member_event); @@ -309,7 +506,9 @@ impl Client { // let _ = client.add_event_handler(Client::on_sync_message_like_room_message_event); // let _ = client.add_event_handler(Client::on_sync_message_like_reaction_event); // let _ = client.add_event_handler(Client::on_original_sync_room_redaction_event); - // let _ = client.add_event_handler(Client::on_original_sync_room_member_event); + + let _ = client.add_event_handler(Client::on_original_sync_room_member_event); + // let _ = client.add_event_handler(Client::on_original_sync_key_verif_start_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_key_event); // let _ = client.add_event_handler(Client::on_original_sync_key_verif_done_event); @@ -322,12 +521,12 @@ impl Client { } } - async fn login_and_sync(&mut self, style: LoginStyle) -> anyhow::Result<()> { - let client = self.client.clone().unwrap(); + async fn login(&mut self, style: LoginStyle) -> anyhow::Result<()> { + let client = self.client.as_ref().unwrap(); match style { LoginStyle::Password(username, password) => { - let _resp = client + client .matrix_auth() .login_username(&username, &password) .initial_device_display_name("TODO") @@ -337,7 +536,11 @@ impl Client { } } - let (synchronized_tx, synchronized_rx) = oneshot::channel::(); + Ok(()) + } + + async fn run_forever(&mut self) { + let client = self.client.clone().unwrap(); let task = dioxus::prelude::spawn(async move { // Sync once so we receive the client state and old messages @@ -350,87 +553,107 @@ impl Client { }; if let Some(sync_token) = sync_token_option { - let settings = SyncSettings::default().token(sync_token); - debug!("User connected to the homeserver, start syncing"); - if let Err(err) = synchronized_tx.send(true) { - error!("Unable to notify that the Matrix client is now synchronized ({err})"); - } - + let settings = SyncSettings::default().token(sync_token); let _ = client.sync(settings).await; } }); self.sync_task = Some(task); - - // self.start_background_tasks(synchronized_rx); - - Ok(()) } - // async fn register_room_events(&self, room_id: OwnedRoomId) { - // let client = self.client.unwrap(); + async fn get_display_name(&mut self) -> anyhow::Result> { + let client = self.client.as_ref().unwrap(); - // client.add_room_event_handler(&room_id, Client::on_room_event); - // } + match client.account().get_display_name().await { + Ok(display_name) => Ok(display_name), + Err(err) => Err(err.into()), + } + } - // async fn refresh_rooms( - // matrix_client: &Arc, - // room_events_sender: &Sender, - // ) { - // let joined_matrix_rooms_ref = &matrix_client.joined_rooms(); - // let invited_matrix_rooms_ref = &matrix_client.invited_rooms(); + async fn get_avatar(&mut self) -> anyhow::Result>> { + let client = self.client.as_ref().unwrap(); - // for matrix_rooms in [joined_matrix_rooms_ref, invited_matrix_rooms_ref] { - // for matrix_room in matrix_rooms.iter() { - // let room = Room::from_matrix_room(matrix_room).await; - // let event = RoomEvent::MemberEvent(room.id().clone(), room); + match client + .account() + .get_avatar(MediaFormat::Thumbnail(MediaThumbnailSize { + method: Method::Scale, + width: uint!(256), + height: uint!(256), + })) + .await + { + Ok(avatar) => Ok(avatar), + Err(err) => Err(err.into()), + } + } - // if let Err(err) = room_events_sender.send(event) { - // error!("Error: {}", err); - // } - // } - // } - // } + async fn get_room_avatar(&mut self, room_id: &OwnedRoomId) -> anyhow::Result>> { + let client = self.client.as_ref().unwrap(); - // async fn refresh_rooms_forever( - // matrix_client: Arc, - // room_events_sender: &Sender, - // ) { - // // TODO: Add interval to config - // let period_sec = Duration::from_secs(5); + if let Some(room) = client.get_room(room_id) { + match room + .avatar(MediaFormat::Thumbnail(MediaThumbnailSize { + method: Method::Scale, + width: uint!(256), + height: uint!(256), + })) + .await + { + Ok(avatar) => Ok(avatar), + Err(err) => Err(err.into()), + } + } else { + error!("No room found with the \"{}\" id", room_id.as_str()); + // TODO: Return an error if the room has not been found + Ok(None) + } + } - // loop { - // Self::refresh_rooms(&matrix_client, room_events_sender).await; + async fn get_room_members(&mut self, room_id: &OwnedRoomId) -> anyhow::Result> { + let client = self.client.as_ref().unwrap(); - // task::sleep(period_sec).await; - // } - // } + if let Some(room) = client.get_room(room_id) { + match room.members(RoomMemberships::ACTIVE).await { + Ok(room_members) => Ok(room_members), + Err(err) => Err(err.into()), + } + } else { + error!("No room found with the \"{}\" id", room_id.as_str()); + // TODO: Return an error if the room has not been found + Ok(vec![]) + } + } - // fn start_background_tasks(&mut self, synchronized_rx: oneshot::Receiver) { - // let client = self.client.clone().unwrap(); - // let room_events_sender = self.senders.room_events_sender.clone(); + async fn get_room_member_avatar( + &self, + room_id: &RoomId, + user_id: &UserId, + ) -> anyhow::Result>> { + let client = self.client.as_ref().unwrap(); - // let task = dioxus::prelude::spawn(async move { - // if let Err(err) = synchronized_rx.await { - // error!("Unable to setup the rx channel notifying that the Matrix client is now synchronized ({err})"); - // } - - // debug!("Start room refreshing forever"); - - // let _ = Self::refresh_rooms_forever(client, &room_events_sender).await; - // }); - // self.background_task = Some(task); - // } + if let Some(room) = client.get_room(room_id) { + if let Ok(Some(room_member)) = room.get_member(user_id).await { + let res = match room_member + .avatar(MediaFormat::Thumbnail(MediaThumbnailSize { + method: Method::Scale, + width: uint!(256), + height: uint!(256), + })) + .await + { + Ok(avatar) => Ok(avatar), + Err(err) => Err(err.into()), + }; + return res; + } + } + Ok(None) + } async fn work(&mut self, mut rx: UnboundedReceiver) { - loop { - match rx.recv().await { - Some(task) => self.run(task).await, - None => { - break; - } - } + while let Some(task) = rx.recv().await { + self.run(task).await; } if let Some(task) = self.sync_task.take() { @@ -441,17 +664,38 @@ impl Client { async fn run(&mut self, task: WorkerTask) { match task { WorkerTask::Init(reply) => { - assert!(!self.initialized); self.init(); - reply.send(()).await; + reply.send(Ok(())).await; + } + WorkerTask::RunForever(reply) => { + { + self.run_forever().await; + reply.send(()) + } + .await } WorkerTask::Login(style, reply) => { - assert!(self.initialized); - reply.send(self.login_and_sync(style).await).await; - } // WorkerTask::registerRoomEvents(room_id, reply) => { - // assert!(self.initialized); - // reply.send(self.register_room_events(room_id).await).await; - // } + reply.send(self.login(style).await).await; + } + WorkerTask::GetDisplayName(reply) => { + reply.send(self.get_display_name().await).await; + } + WorkerTask::GetAvatar(reply) => { + reply.send(self.get_avatar().await).await; + } + + WorkerTask::GetRoomAvatar(id, reply) => { + reply.send(self.get_room_avatar(&id).await).await; + } + WorkerTask::GetRoomMembers(id, reply) => { + reply.send(self.get_room_members(&id).await).await; + } + + WorkerTask::GetRoomMemberAvatar(room_id, user_id, reply) => { + reply + .send(self.get_room_member_avatar(&room_id, &user_id).await) + .await; + } } } }