Optimize updates subscription in topics.

This commit is contained in:
John Preston 2022-11-02 20:59:12 +04:00
parent 9d4840c0de
commit a21c73facd
5 changed files with 108 additions and 39 deletions

View file

@ -151,7 +151,7 @@ ForumTopic::ForumTopic(not_null<Forum*> forum, MsgId rootId)
: Thread(&forum->history()->owner(), Type::ForumTopic)
, _forum(forum)
, _list(_forum->topicsList())
, _replies(std::make_shared<RepliesList>(history(), rootId))
, _replies(std::make_shared<RepliesList>(history(), rootId, this))
, _sendActionPainter(owner().sendActionManager().repliesPainter(
history(),
rootId))

View file

@ -60,38 +60,24 @@ struct RepliesList::Viewer {
bool scheduled = false;
};
RepliesList::RepliesList(not_null<History*> history, MsgId rootId)
RepliesList::RepliesList(
not_null<History*> history,
MsgId rootId,
ForumTopic *owningTopic)
: _history(history)
, _owningTopic(owningTopic)
, _rootId(rootId)
, _creating(IsCreating(history, rootId))
, _readRequestTimer([=] { sendReadTillRequest(); }) {
_history->owner().repliesReadTillUpdates(
) | rpl::filter([=](const RepliesReadTillUpdate &update) {
return (update.id.msg == _rootId)
&& (update.id.peer == _history->peer->id);
}) | rpl::start_with_next([=](const RepliesReadTillUpdate &update) {
if (update.out) {
setOutboxReadTill(update.readTillId);
} else if (update.readTillId >= _inboxReadTillId) {
setInboxReadTill(
update.readTillId,
computeUnreadCountLocally(update.readTillId));
}
}, _lifetime);
_history->session().changes().messageUpdates(
MessageUpdate::Flag::NewAdded
| MessageUpdate::Flag::NewMaybeAdded
| MessageUpdate::Flag::ReplyToTopAdded
| MessageUpdate::Flag::Destroyed
) | rpl::filter([=](const MessageUpdate &update) {
return applyUpdate(update);
}) | rpl::to_empty | rpl::start_to_stream(_instantChanges, _lifetime);
_history->owner().channelDifferenceTooLong(
) | rpl::filter([=](not_null<ChannelData*> channel) {
return applyDifferenceTooLong(channel);
}) | rpl::to_empty | rpl::start_to_stream(_listChanges, _lifetime);
if (_owningTopic) {
_owningTopic->destroyed(
) | rpl::start_with_next([=] {
_owningTopic = nullptr;
subscribeToUpdates();
}, _lifetime);
} else {
subscribeToUpdates();
}
}
RepliesList::~RepliesList() {
@ -105,6 +91,48 @@ RepliesList::~RepliesList() {
}
}
void RepliesList::subscribeToUpdates() {
_history->owner().repliesReadTillUpdates(
) | rpl::filter([=](const RepliesReadTillUpdate &update) {
return (update.id.msg == _rootId)
&& (update.id.peer == _history->peer->id);
}) | rpl::start_with_next([=](const RepliesReadTillUpdate &update) {
apply(update);
}, _lifetime);
_history->session().changes().messageUpdates(
MessageUpdate::Flag::NewAdded
| MessageUpdate::Flag::NewMaybeAdded
| MessageUpdate::Flag::ReplyToTopAdded
| MessageUpdate::Flag::Destroyed
) | rpl::start_with_next([=](const MessageUpdate &update) {
apply(update);
}, _lifetime);
_history->owner().channelDifferenceTooLong(
) | rpl::start_with_next([=](not_null<ChannelData*> channel) {
if (channel == _history->peer) {
applyDifferenceTooLong();
}
}, _lifetime);
}
void RepliesList::apply(const RepliesReadTillUpdate &update) {
if (update.out) {
setOutboxReadTill(update.readTillId);
} else if (update.readTillId >= _inboxReadTillId) {
setInboxReadTill(
update.readTillId,
computeUnreadCountLocally(update.readTillId));
}
}
void RepliesList::apply(const MessageUpdate &update) {
if (applyUpdate(update)) {
_instantChanges.fire({});
}
}
rpl::producer<MessagesSlice> RepliesList::source(
MessagePosition aroundId,
int limitBefore,
@ -426,14 +454,11 @@ bool RepliesList::applyUpdate(const MessageUpdate &update) {
return true;
}
bool RepliesList::applyDifferenceTooLong(not_null<ChannelData*> channel) {
if (_creating
|| _history->peer != channel
|| !_skippedAfter.has_value()) {
return false;
void RepliesList::applyDifferenceTooLong() {
if (!_creating && _skippedAfter.has_value()) {
_skippedAfter = std::nullopt;
_listChanges.fire({});
}
_skippedAfter = std::nullopt;
return true;
}
void RepliesList::changeUnreadCountByPost(MsgId id, int delta) {

View file

@ -15,16 +15,25 @@ class HistoryService;
namespace Data {
class ForumTopic;
class Histories;
struct MessagePosition;
struct MessagesSlice;
struct MessageUpdate;
struct RepliesReadTillUpdate;
class RepliesList final : public base::has_weak_ptr {
public:
RepliesList(not_null<History*> history, MsgId rootId);
RepliesList(
not_null<History*> history,
MsgId rootId,
ForumTopic *owningTopic = nullptr);
~RepliesList();
void apply(const RepliesReadTillUpdate &update);
void apply(const MessageUpdate &update);
void applyDifferenceTooLong();
[[nodiscard]] rpl::producer<MessagesSlice> source(
MessagePosition aroundId,
int limitBefore,
@ -66,6 +75,7 @@ private:
HistoryItem *lookupRoot();
[[nodiscard]] Histories &histories();
void subscribeToUpdates();
[[nodiscard]] rpl::producer<MessagesSlice> sourceFromServer(
MessagePosition aroundId,
int limitBefore,
@ -77,8 +87,6 @@ private:
not_null<Viewer*> viewer,
not_null<HistoryItem*> item);
[[nodiscard]] bool applyUpdate(const MessageUpdate &update);
[[nodiscard]] bool applyDifferenceTooLong(
not_null<ChannelData*> channel);
void injectRootMessageAndReverse(not_null<Viewer*> viewer);
void injectRootMessage(not_null<Viewer*> viewer);
void injectRootDivider(
@ -97,6 +105,7 @@ private:
void reloadUnreadCountIfNeeded();
const not_null<History*> _history;
ForumTopic *_owningTopic = nullptr;
const MsgId _rootId = 0;
const bool _creating = false;

View file

@ -56,6 +56,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "data/data_wall_paper.h"
#include "data/data_game.h"
#include "data/data_poll.h"
#include "data/data_replies_list.h"
#include "data/data_chat_filters.h"
#include "data/data_scheduled_messages.h"
#include "data/data_send_action.h"
@ -292,6 +293,8 @@ Session::Session(not_null<Main::Session*> session)
}
}, _lifetime);
subscribeForTopicRepliesLists();
crl::on_main(_session, [=] {
AmPremiumValue(
_session
@ -307,6 +310,37 @@ Session::Session(not_null<Main::Session*> session)
});
}
void Session::subscribeForTopicRepliesLists() {
repliesReadTillUpdates(
) | rpl::start_with_next([=](const RepliesReadTillUpdate &update) {
if (const auto peer = peerLoaded(update.id.peer)) {
if (const auto topic = peer->forumTopicFor(update.id.msg)) {
topic->replies()->apply(update);
}
}
}, _lifetime);
session().changes().messageUpdates(
MessageUpdate::Flag::NewAdded
| MessageUpdate::Flag::NewMaybeAdded
| MessageUpdate::Flag::ReplyToTopAdded
| MessageUpdate::Flag::Destroyed
) | rpl::start_with_next([=](const MessageUpdate &update) {
if (const auto topic = update.item->topic()) {
topic->replies()->apply(update);
}
}, _lifetime);
channelDifferenceTooLong(
) | rpl::start_with_next([=](not_null<ChannelData*> channel) {
if (const auto forum = channel->forum()) {
forum->enumerateTopics([](not_null<ForumTopic*> topic) {
topic->replies()->applyDifferenceTooLong();
});
}
}, _lifetime);
}
void Session::clear() {
// Optimization: clear notifications before destroying items.
Core::App().notifications().clearFromSession(_session);

View file

@ -143,6 +143,7 @@ public:
return ++_nonHistoryEntryId;
}
void subscribeForTopicRepliesLists();
void clear();
void keepAlive(std::shared_ptr<PhotoMedia> media);