🔊 Trace events from Matrix client callbacks to domain methods
This commit is contained in:
@@ -1,12 +1,13 @@
|
||||
use std::{collections::HashMap, rc::Rc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use matrix_sdk::ruma::{OwnedMxcUri, OwnedRoomId, OwnedUserId};
|
||||
use tokio::{
|
||||
select,
|
||||
sync::{broadcast::Receiver, mpsc::UnboundedSender},
|
||||
};
|
||||
use tokio_stream::{wrappers::BroadcastStream, StreamExt, StreamMap};
|
||||
use tracing::error;
|
||||
use tracing::{error, instrument, Instrument};
|
||||
|
||||
use super::{
|
||||
account_event::AccountEvent,
|
||||
@@ -96,6 +97,96 @@ impl Requester {
|
||||
pub async fn login(&self, style: LoginStyle) -> anyhow::Result<()> {
|
||||
request_to_worker!(self, WorkerTask::Login, style)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_room_invitation(
|
||||
consumer: &Rc<dyn RoomMessagingConsumerInterface>,
|
||||
user_id: OwnedUserId,
|
||||
sender_id: OwnedUserId,
|
||||
is_account_user: bool,
|
||||
) {
|
||||
let invitation = Invitation::new(user_id, sender_id, is_account_user);
|
||||
consumer.on_invitation(invitation).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_room_join(
|
||||
consumer: &Rc<dyn RoomMessagingConsumerInterface>,
|
||||
room_id: OwnedRoomId,
|
||||
user_id: OwnedUserId,
|
||||
user_name: Option<String>,
|
||||
avatar_url: Option<OwnedMxcUri>,
|
||||
is_account_user: bool,
|
||||
messaging_provider: Rc<dyn MemberMessagingProviderInterface>,
|
||||
) {
|
||||
let member = RoomMember::new(
|
||||
UserId::from(user_id),
|
||||
user_name,
|
||||
avatar_url,
|
||||
room_id,
|
||||
is_account_user,
|
||||
messaging_provider,
|
||||
);
|
||||
consumer.on_membership(member).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_room_new_topic(
|
||||
consumer: &Rc<dyn RoomMessagingConsumerInterface>,
|
||||
topic: Option<String>,
|
||||
) {
|
||||
consumer.on_new_topic(topic).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_room_new_name(
|
||||
consumer: &Rc<dyn RoomMessagingConsumerInterface>,
|
||||
name: Option<String>,
|
||||
) {
|
||||
consumer.on_new_name(name).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_room_new_avatar(
|
||||
consumer: &Rc<dyn RoomMessagingConsumerInterface>,
|
||||
avatar: Option<Avatar>,
|
||||
) {
|
||||
consumer.on_new_avatar(avatar).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_space_new_child(
|
||||
consumer: &Rc<dyn SpaceMessagingConsumerInterface>,
|
||||
child_id: RoomId,
|
||||
) {
|
||||
// TODO: Make name consistent
|
||||
consumer.on_child(child_id).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_space_new_topic(
|
||||
consumer: &Rc<dyn SpaceMessagingConsumerInterface>,
|
||||
topic: Option<String>,
|
||||
) {
|
||||
consumer.on_new_topic(topic).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn on_space_new_name(
|
||||
consumer: &Rc<dyn SpaceMessagingConsumerInterface>,
|
||||
name: Option<String>,
|
||||
) {
|
||||
consumer.on_new_name(name).await;
|
||||
}
|
||||
|
||||
// #[instrument(name="SpaceAvatar", skip_all, fields(s = %space_id, a = avatar.is_some()))]
|
||||
// async fn on_space_new_avatar(
|
||||
// consumer: &Rc<dyn SpaceMessagingConsumerInterface>,
|
||||
// space_id: OwnedRoomId,
|
||||
// avatar: Option<Avatar>,
|
||||
// ) {
|
||||
// consumer.on_new_avatar(avatar).await;
|
||||
// }
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
@@ -140,7 +231,17 @@ impl AccountMessagingProviderInterface for Requester {
|
||||
res = account_events_receiver.recv() => {
|
||||
if let Ok(account_event) = res {
|
||||
match account_event {
|
||||
AccountEvent::NewRoom(id, spaces, name, topic, is_direct, state, receiver, new_room_tx) => {
|
||||
AccountEvent::NewRoom(
|
||||
id,
|
||||
spaces,
|
||||
name,
|
||||
topic,
|
||||
is_direct,
|
||||
state,
|
||||
receiver,
|
||||
new_room_tx,
|
||||
span
|
||||
) => {
|
||||
let mut room = Room::new(id, spaces, name, topic, is_direct, Some(state));
|
||||
let room_id = room.id().clone();
|
||||
|
||||
@@ -151,13 +252,15 @@ impl AccountMessagingProviderInterface for Requester {
|
||||
let stream = BroadcastStream::new(receiver.into());
|
||||
rooms_events_streams.insert(room_id.clone(), stream);
|
||||
|
||||
let room_events_consumer = account_events_consumer.on_new_room(room).await;
|
||||
let room_events_consumer = account_events_consumer.on_new_room(room)
|
||||
.instrument(span)
|
||||
.await;
|
||||
room_events_consumers.insert(room_id, room_events_consumer);
|
||||
|
||||
// We're now ready to recv and compute RoomEvent.
|
||||
new_room_tx.send(true).await;
|
||||
},
|
||||
AccountEvent::NewSpace(id, name, topic, receiver, new_space_tx) => {
|
||||
AccountEvent::NewSpace(id, name, topic, receiver, new_space_tx, span) => {
|
||||
let mut space = Space::new(id, name, topic);
|
||||
let space_id = space.id().clone();
|
||||
|
||||
@@ -168,12 +271,14 @@ impl AccountMessagingProviderInterface for Requester {
|
||||
let stream = BroadcastStream::new(receiver.into());
|
||||
spaces_events_streams.insert(space_id.clone(), stream);
|
||||
|
||||
let space_events_consumer = account_events_consumer.on_new_space(space).await;
|
||||
let space_events_consumer = account_events_consumer.on_new_space(space)
|
||||
.instrument(span)
|
||||
.await;
|
||||
space_events_consumers.insert(space_id, space_events_consumer);
|
||||
|
||||
// We're now ready to recv and compute SpaceEvent.
|
||||
new_space_tx.send(true).await;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
},
|
||||
@@ -181,33 +286,43 @@ impl AccountMessagingProviderInterface for Requester {
|
||||
if let Ok(room_event) = room_event {
|
||||
if let Some(consumer) = room_events_consumers.get(&room_id) {
|
||||
match room_event {
|
||||
RoomEvent::Invitation(user_id, sender_id, is_account_user) => {
|
||||
let invitation = Invitation::new(user_id, sender_id, is_account_user);
|
||||
consumer.on_invitation(invitation).await;
|
||||
RoomEvent::Invitation(user_id, sender_id, is_account_user, span) => {
|
||||
Self::on_room_invitation(consumer, user_id, sender_id, is_account_user)
|
||||
.instrument(span)
|
||||
.await;
|
||||
},
|
||||
RoomEvent::Join(user_id, user_name, avatar_url, is_account_user) => {
|
||||
let member = RoomMember::new(
|
||||
UserId::from(user_id),
|
||||
RoomEvent::Join(user_id, user_name, avatar_url, is_account_user, span) => {
|
||||
Self::on_room_join(
|
||||
consumer,
|
||||
room_id,
|
||||
user_id,
|
||||
user_name,
|
||||
avatar_url,
|
||||
room_id,
|
||||
is_account_user,
|
||||
client.clone());
|
||||
consumer.on_membership(member).await;
|
||||
client.clone())
|
||||
.instrument(span)
|
||||
.await;
|
||||
},
|
||||
RoomEvent::NewTopic(topic) => {
|
||||
consumer.on_new_topic(topic).await;
|
||||
RoomEvent::NewTopic(topic, span) => {
|
||||
Self::on_room_new_topic(consumer, topic)
|
||||
.instrument(span)
|
||||
.await;
|
||||
},
|
||||
RoomEvent::NewName(name) => {
|
||||
consumer.on_new_name(name).await;
|
||||
RoomEvent::NewName(name, span) => {
|
||||
Self::on_room_new_name(consumer, name)
|
||||
.instrument(span)
|
||||
.await;
|
||||
},
|
||||
RoomEvent::NewAvatar(avatar) => {
|
||||
consumer.on_new_avatar(avatar).await;
|
||||
RoomEvent::NewAvatar(avatar, span) => {
|
||||
Self::on_room_new_avatar(consumer, avatar)
|
||||
.instrument(span)
|
||||
.await;
|
||||
}
|
||||
_ => {}
|
||||
// RoomEvent::NewAvatar(avatar) => Self::on_room_new_avatar(consumer, avatar).await,
|
||||
_ => error!("TODO: {:?}", &room_event),
|
||||
}
|
||||
} else {
|
||||
error!("No consumer found for \"{}\" room", room_id);
|
||||
error!("No consumer found for {} room", &room_id);
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -215,19 +330,25 @@ impl AccountMessagingProviderInterface for Requester {
|
||||
if let Ok(room_event) = room_event {
|
||||
if let Some(consumer) = space_events_consumers.get(&space_id) {
|
||||
match room_event {
|
||||
RoomEvent::NewTopic(topic) => {
|
||||
consumer.on_new_topic(topic).await;
|
||||
RoomEvent::NewTopic(topic, span) => {
|
||||
Self::on_space_new_topic(consumer, topic)
|
||||
.instrument(span)
|
||||
.await;
|
||||
},
|
||||
RoomEvent::NewName(name) => {
|
||||
consumer.on_new_name(name).await;
|
||||
RoomEvent::NewName(name, span) => {
|
||||
Self::on_space_new_name(consumer, name)
|
||||
.instrument(span)
|
||||
.await;
|
||||
},
|
||||
RoomEvent::NewChild(child_id) => {
|
||||
consumer.on_child(child_id).await;
|
||||
RoomEvent::NewChild(child_id, span) => {
|
||||
Self::on_space_new_child(consumer, child_id)
|
||||
.instrument(span)
|
||||
.await;
|
||||
},
|
||||
_ => {}
|
||||
_ => error!("TODO: {:?}", &room_event),
|
||||
}
|
||||
} else {
|
||||
error!("No consumer found for \"{}\" space", space_id);
|
||||
error!("No consumer found for {} space", &space_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user