♻️ Rework the Matrix messaging Requester

This commit is contained in:
2024-05-10 22:20:32 +02:00
parent ef41c0bd48
commit 0a936dd12b

View File

@@ -1,58 +1,65 @@
use std::cell::RefCell;
use std::sync::Arc;
use std::{collections::HashMap, rc::Rc};
use matrix_sdk::Client as MatrixClient;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::UnboundedSender;
use async_trait::async_trait;
use futures::future::join_all;
use tokio::{
select,
sync::{broadcast::Receiver, mpsc::UnboundedSender},
};
use tokio_stream::{wrappers::BroadcastStream, StreamExt, StreamMap};
use tracing::error;
use super::client::RoomEvent;
use super::worker_tasks::{LoginStyle, WorkerTask};
use crate::utils::oneshot;
pub struct Receivers {
pub room_receiver: RefCell<Receiver<RoomEvent>>,
}
impl Clone for Receivers {
fn clone(&self) -> Self {
Self {
room_receiver: RefCell::new(self.room_receiver.borrow().resubscribe()),
}
}
}
impl PartialEq for Receivers {
fn eq(&self, other: &Self) -> bool {
self.room_receiver
.borrow()
.same_channel(&other.room_receiver.borrow())
}
}
use super::{
account_event::AccountEvent,
room_event::RoomEvent,
worker_tasks::{LoginStyle, WorkerTask},
};
use crate::{
domain::model::{
common::{Avatar, UserId},
messaging_interface::{
AccountMessagingConsumerInterface, AccountMessagingProviderInterface,
MemberMessagingProviderInterface, RoomMessagingConsumerInterface,
RoomMessagingProviderInterface, SpaceMessagingConsumerInterface,
SpaceMessagingProviderInterface,
},
room::{Room, RoomId},
room_member::RoomMember,
space::Space,
},
utils::oneshot,
};
pub struct Requester {
pub matrix_client: Arc<MatrixClient>,
pub tx: UnboundedSender<WorkerTask>,
pub receivers: Receivers,
worker_tasks_sender: UnboundedSender<WorkerTask>,
}
impl Clone for Requester {
fn clone(&self) -> Self {
Self {
worker_tasks_sender: self.worker_tasks_sender.clone(),
}
}
}
impl Requester {
pub async fn init(&self) -> anyhow::Result<()> {
pub fn new(worker_tasks_sender: UnboundedSender<WorkerTask>) -> Self {
Self {
worker_tasks_sender,
}
}
}
// TODO: Is there a way to avoid this duplication?
macro_rules! request_to_worker {
($self:ident, $task:expr) => {
{
let (reply, mut response) = oneshot();
if let Err(err) = self.tx.send(WorkerTask::Init(reply)) {
let msg = format!("Unable to request the init of the Matrix client: {err}");
return Err(anyhow::Error::msg(msg));
}
let task = $task(reply);
match response.recv().await {
Some(result) => Ok(result),
None => Err(anyhow::Error::msg("TBD")),
}
}
pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> {
let (reply, mut response) = oneshot();
if let Err(err) = self.tx.send(WorkerTask::Login(style, reply)) {
let msg = format!("Unable to request login to the Matrix client: {err}");
if let Err(err) = $self.worker_tasks_sender.send(task) {
let msg = format!("Unable to request to the Matrix client: {err}");
return Err(anyhow::Error::msg(msg));
}
@@ -61,4 +68,196 @@ impl Requester {
None => Err(anyhow::Error::msg("TBD")),
}
}
};
($self:ident, $task:expr $(, $arg:expr)+) => {
{
let (reply, mut response) = oneshot();
let task = $task($($arg),*, reply);
if let Err(err) = $self.worker_tasks_sender.send(task) {
let msg = format!("Unable to request to the Matrix client: {err}");
return Err(anyhow::Error::msg(msg));
}
match response.recv().await {
Some(result) => result,
None => Err(anyhow::Error::msg("TBD")),
}
}
};
}
impl Requester {
pub async fn init(&self) -> anyhow::Result<()> {
request_to_worker!(self, WorkerTask::Init)
}
pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> {
request_to_worker!(self, WorkerTask::Login, style)
}
}
#[async_trait(?Send)]
impl AccountMessagingProviderInterface for Requester {
async fn get_display_name(&self) -> anyhow::Result<Option<String>> {
request_to_worker!(self, WorkerTask::GetDisplayName)
}
async fn get_avatar(&self) -> anyhow::Result<Option<Avatar>> {
request_to_worker!(self, WorkerTask::GetAvatar)
}
async fn run_forever(
&self,
account_events_consumer: &dyn AccountMessagingConsumerInterface,
mut account_events_receiver: Receiver<AccountEvent>,
) -> anyhow::Result<()> {
// TODO: manage the result provided by response
let (run_forever_tx, _run_forever_rx) = oneshot();
if let Err(err) = self
.worker_tasks_sender
.send(WorkerTask::RunForever(run_forever_tx))
{
let msg = format!("Unable to request login to the Matrix client: {err}");
return Err(anyhow::Error::msg(msg));
}
let mut rooms_events_streams = StreamMap::new();
let mut spaces_events_streams = StreamMap::new();
let mut room_events_consumers =
HashMap::<RoomId, Rc<dyn RoomMessagingConsumerInterface>>::new();
let mut space_events_consumers =
HashMap::<RoomId, Rc<dyn SpaceMessagingConsumerInterface>>::new();
// TODO: Fix this...
let client = Rc::new(self.clone());
loop {
select! {
res = account_events_receiver.recv() => {
if let Ok(account_event) = res {
match account_event {
AccountEvent::NewRoom(id, spaces, name, topic, is_direct, state, receiver, new_room_tx) => {
let mut room = Room::new(id, spaces, name, topic, is_direct, Some(state));
let room_id = room.id().clone();
room.set_messaging_provider(client.clone());
let stream = BroadcastStream::new(receiver.into());
rooms_events_streams.insert(room_id.clone(), stream);
let room_events_consumer = account_events_consumer.on_new_room(room).await;
room_events_consumers.insert(room_id, room_events_consumer);
// We're now ready to recv and compute RoomEvent.
new_room_tx.send(true).await;
},
AccountEvent::NewSpace(id, name, topic, receiver, new_space_tx) => {
let mut space = Space::new(id, name, topic);
let space_id = space.id().clone();
space.set_messaging_provider(client.clone());
let stream = BroadcastStream::new(receiver.into());
spaces_events_streams.insert(space_id.clone(), stream);
let space_events_consumer = account_events_consumer.on_new_space(space).await;
space_events_consumers.insert(space_id, space_events_consumer);
// We're now ready to recv and compute SpaceEvent.
new_space_tx.send(true).await;
}
};
}
},
Some((room_id, room_event)) = rooms_events_streams.next() => {
if let Ok(room_event) = room_event {
if let Some(consumer) = room_events_consumers.get(&room_id) {
match room_event {
RoomEvent::Invitation() => {
consumer.on_invitation().await;
},
// RoomEvent::Membership(user_id, is_account_user) => {
// let member = RoomMember::new(UserId::from(user_id), room_id, is_account_user);
// consumer.on_membership(member).await;
// },
RoomEvent::NewTopic(topic) => {
consumer.on_new_topic(topic).await;
},
RoomEvent::NewName(name) => {
consumer.on_new_name(name).await;
},
_ => {}
}
} else {
error!("No consumer found for \"{}\" room", room_id);
}
}
},
Some((space_id, room_event)) = spaces_events_streams.next() => {
if let Ok(room_event) = room_event {
if let Some(consumer) = space_events_consumers.get(&space_id) {
match room_event {
RoomEvent::NewTopic(topic) => {
consumer.on_new_topic(topic).await;
},
RoomEvent::NewName(name) => {
consumer.on_new_name(name).await;
},
RoomEvent::NewChild(child_id) => {
consumer.on_child(child_id).await;
},
_ => {}
}
} else {
error!("No consumer found for \"{}\" space", space_id);
}
}
}
}
}
}
}
#[async_trait(?Send)]
impl RoomMessagingProviderInterface for Requester {
async fn get_avatar(&self, room_id: &RoomId) -> anyhow::Result<Option<Avatar>> {
request_to_worker!(self, WorkerTask::GetRoomAvatar, room_id.clone())
}
// TODO: Fix return code
async fn get_members(&self, room_id: &RoomId) -> anyhow::Result<Vec<RoomMember>> {
match request_to_worker!(self, WorkerTask::GetRoomMembers, room_id.clone()) {
Ok(matrix_room_members) => {
Ok(join_all(matrix_room_members.iter().map(|member| async {
RoomMember::from_matrix(member, room_id, Rc::new(self.clone())).await
}))
.await)
}
Err(err) => Err(err),
}
}
}
#[async_trait(?Send)]
impl SpaceMessagingProviderInterface for Requester {}
#[async_trait(?Send)]
impl MemberMessagingProviderInterface for Requester {
async fn get_avatar(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> anyhow::Result<Option<Avatar>> {
request_to_worker!(
self,
WorkerTask::GetRoomMemberAvatar,
room_id.clone(),
user_id.clone()
)
}
}