diff --git a/src/infrastructure/messaging/matrix/requester.rs b/src/infrastructure/messaging/matrix/requester.rs index c984ad3..6e934de 100644 --- a/src/infrastructure/messaging/matrix/requester.rs +++ b/src/infrastructure/messaging/matrix/requester.rs @@ -1,64 +1,263 @@ -use std::cell::RefCell; -use std::sync::Arc; +use std::{collections::HashMap, rc::Rc}; -use matrix_sdk::Client as MatrixClient; -use tokio::sync::broadcast::Receiver; -use tokio::sync::mpsc::UnboundedSender; +use async_trait::async_trait; +use futures::future::join_all; +use tokio::{ + select, + sync::{broadcast::Receiver, mpsc::UnboundedSender}, +}; +use tokio_stream::{wrappers::BroadcastStream, StreamExt, StreamMap}; +use tracing::error; -use super::client::RoomEvent; -use super::worker_tasks::{LoginStyle, WorkerTask}; -use crate::utils::oneshot; +use super::{ + account_event::AccountEvent, + room_event::RoomEvent, + worker_tasks::{LoginStyle, WorkerTask}, +}; +use crate::{ + domain::model::{ + common::{Avatar, UserId}, + messaging_interface::{ + AccountMessagingConsumerInterface, AccountMessagingProviderInterface, + MemberMessagingProviderInterface, RoomMessagingConsumerInterface, + RoomMessagingProviderInterface, SpaceMessagingConsumerInterface, + SpaceMessagingProviderInterface, + }, + room::{Room, RoomId}, + room_member::RoomMember, + space::Space, + }, + utils::oneshot, +}; -pub struct Receivers { - pub room_receiver: RefCell>, +pub struct Requester { + worker_tasks_sender: UnboundedSender, } -impl Clone for Receivers { + +impl Clone for Requester { fn clone(&self) -> Self { Self { - room_receiver: RefCell::new(self.room_receiver.borrow().resubscribe()), + worker_tasks_sender: self.worker_tasks_sender.clone(), } } } -impl PartialEq for Receivers { - fn eq(&self, other: &Self) -> bool { - self.room_receiver - .borrow() - .same_channel(&other.room_receiver.borrow()) + +impl Requester { + pub fn new(worker_tasks_sender: UnboundedSender) -> Self { + Self { + worker_tasks_sender, + } } } -pub struct Requester { - pub matrix_client: Arc, - pub tx: UnboundedSender, - pub receivers: Receivers, +// TODO: Is there a way to avoid this duplication? +macro_rules! request_to_worker { + ($self:ident, $task:expr) => { + { + let (reply, mut response) = oneshot(); + + let task = $task(reply); + + if let Err(err) = $self.worker_tasks_sender.send(task) { + let msg = format!("Unable to request to the Matrix client: {err}"); + return Err(anyhow::Error::msg(msg)); + } + + match response.recv().await { + Some(result) => result, + None => Err(anyhow::Error::msg("TBD")), + } + } + }; + + ($self:ident, $task:expr $(, $arg:expr)+) => { + { + let (reply, mut response) = oneshot(); + + let task = $task($($arg),*, reply); + + if let Err(err) = $self.worker_tasks_sender.send(task) { + let msg = format!("Unable to request to the Matrix client: {err}"); + return Err(anyhow::Error::msg(msg)); + } + + match response.recv().await { + Some(result) => result, + None => Err(anyhow::Error::msg("TBD")), + } + } + }; } impl Requester { pub async fn init(&self) -> anyhow::Result<()> { - let (reply, mut response) = oneshot(); - - if let Err(err) = self.tx.send(WorkerTask::Init(reply)) { - let msg = format!("Unable to request the init of the Matrix client: {err}"); - return Err(anyhow::Error::msg(msg)); - } - - match response.recv().await { - Some(result) => Ok(result), - None => Err(anyhow::Error::msg("TBD")), - } + request_to_worker!(self, WorkerTask::Init) } pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> { - let (reply, mut response) = oneshot(); + request_to_worker!(self, WorkerTask::Login, style) + } +} - if let Err(err) = self.tx.send(WorkerTask::Login(style, reply)) { +#[async_trait(?Send)] +impl AccountMessagingProviderInterface for Requester { + async fn get_display_name(&self) -> anyhow::Result> { + request_to_worker!(self, WorkerTask::GetDisplayName) + } + + async fn get_avatar(&self) -> anyhow::Result> { + request_to_worker!(self, WorkerTask::GetAvatar) + } + + async fn run_forever( + &self, + account_events_consumer: &dyn AccountMessagingConsumerInterface, + mut account_events_receiver: Receiver, + ) -> anyhow::Result<()> { + // TODO: manage the result provided by response + let (run_forever_tx, _run_forever_rx) = oneshot(); + + if let Err(err) = self + .worker_tasks_sender + .send(WorkerTask::RunForever(run_forever_tx)) + { let msg = format!("Unable to request login to the Matrix client: {err}"); return Err(anyhow::Error::msg(msg)); } - match response.recv().await { - Some(result) => result, - None => Err(anyhow::Error::msg("TBD")), + let mut rooms_events_streams = StreamMap::new(); + let mut spaces_events_streams = StreamMap::new(); + + let mut room_events_consumers = + HashMap::>::new(); + let mut space_events_consumers = + HashMap::>::new(); + + // TODO: Fix this... + let client = Rc::new(self.clone()); + + loop { + select! { + res = account_events_receiver.recv() => { + if let Ok(account_event) = res { + match account_event { + AccountEvent::NewRoom(id, spaces, name, topic, is_direct, state, receiver, new_room_tx) => { + let mut room = Room::new(id, spaces, name, topic, is_direct, Some(state)); + let room_id = room.id().clone(); + + room.set_messaging_provider(client.clone()); + + let stream = BroadcastStream::new(receiver.into()); + rooms_events_streams.insert(room_id.clone(), stream); + + let room_events_consumer = account_events_consumer.on_new_room(room).await; + room_events_consumers.insert(room_id, room_events_consumer); + + // We're now ready to recv and compute RoomEvent. + new_room_tx.send(true).await; + }, + AccountEvent::NewSpace(id, name, topic, receiver, new_space_tx) => { + let mut space = Space::new(id, name, topic); + let space_id = space.id().clone(); + + space.set_messaging_provider(client.clone()); + + let stream = BroadcastStream::new(receiver.into()); + spaces_events_streams.insert(space_id.clone(), stream); + + let space_events_consumer = account_events_consumer.on_new_space(space).await; + space_events_consumers.insert(space_id, space_events_consumer); + + // We're now ready to recv and compute SpaceEvent. + new_space_tx.send(true).await; + } + }; + } + }, + Some((room_id, room_event)) = rooms_events_streams.next() => { + if let Ok(room_event) = room_event { + if let Some(consumer) = room_events_consumers.get(&room_id) { + match room_event { + RoomEvent::Invitation() => { + consumer.on_invitation().await; + }, + // RoomEvent::Membership(user_id, is_account_user) => { + // let member = RoomMember::new(UserId::from(user_id), room_id, is_account_user); + // consumer.on_membership(member).await; + // }, + RoomEvent::NewTopic(topic) => { + consumer.on_new_topic(topic).await; + }, + RoomEvent::NewName(name) => { + consumer.on_new_name(name).await; + }, + _ => {} + } + } else { + error!("No consumer found for \"{}\" room", room_id); + } + } + }, + Some((space_id, room_event)) = spaces_events_streams.next() => { + if let Ok(room_event) = room_event { + if let Some(consumer) = space_events_consumers.get(&space_id) { + match room_event { + RoomEvent::NewTopic(topic) => { + consumer.on_new_topic(topic).await; + }, + RoomEvent::NewName(name) => { + consumer.on_new_name(name).await; + }, + RoomEvent::NewChild(child_id) => { + consumer.on_child(child_id).await; + }, + _ => {} + } + } else { + error!("No consumer found for \"{}\" space", space_id); + } + } + } + } } } } + +#[async_trait(?Send)] +impl RoomMessagingProviderInterface for Requester { + async fn get_avatar(&self, room_id: &RoomId) -> anyhow::Result> { + request_to_worker!(self, WorkerTask::GetRoomAvatar, room_id.clone()) + } + + // TODO: Fix return code + async fn get_members(&self, room_id: &RoomId) -> anyhow::Result> { + match request_to_worker!(self, WorkerTask::GetRoomMembers, room_id.clone()) { + Ok(matrix_room_members) => { + Ok(join_all(matrix_room_members.iter().map(|member| async { + RoomMember::from_matrix(member, room_id, Rc::new(self.clone())).await + })) + .await) + } + Err(err) => Err(err), + } + } +} + +#[async_trait(?Send)] +impl SpaceMessagingProviderInterface for Requester {} + +#[async_trait(?Send)] +impl MemberMessagingProviderInterface for Requester { + async fn get_avatar( + &self, + room_id: &RoomId, + user_id: &UserId, + ) -> anyhow::Result> { + request_to_worker!( + self, + WorkerTask::GetRoomMemberAvatar, + room_id.clone(), + user_id.clone() + ) + } +}