diff options
author | Alexey Rusakov <Kitsune-Ral@users.sf.net> | 2022-07-16 20:31:12 +0200 |
---|---|---|
committer | Alexey Rusakov <Kitsune-Ral@users.sf.net> | 2022-07-16 21:55:15 +0200 |
commit | 4770d303b7141971fa9a25f85874e6bbe71776d9 (patch) | |
tree | 24c45fb29478079ac610502ed6948f50298faac9 /lib/room.cpp | |
parent | cc8753612f2f45b18a9c924920395fb5f4c57078 (diff) | |
download | libquotient-4770d303b7141971fa9a25f85874e6bbe71776d9.tar.gz libquotient-4770d303b7141971fa9a25f85874e6bbe71776d9.zip |
Speed up read receipt updates
Profiling revealed 3 inefficiencies in read receipts code - and given
there are a lot of them coming, these inefficiences quickly add up.
Fixing them allows to slash read receipt processing time by 60%, and
the total time of updating a room by more than a half.
1. Room::lastReadEventChanged() is emitted per receipt. This can be
taxing on initial syncs or in bigger rooms; this commit converts it
to an aggregate signal only emitted once per sync room batch and
carrying the list of all user ids (more on that below) with updated
read receipts.
For that, Room::P::setLastReadEvent() is split into
Room::P::setLocalLastReadEvent() that is called whenever the local
read receipt has to be updated, and setLastReadEvent() proper that is
very fast and only updates the internal data structures, nothing else.
setLocalLastEvent() calls it, as does processEphemeralEvents(); both
take responsibility to emit lastReadEventChanged() depending on the
outcome of setLastReadEvent() invocation(s).
2. Massively aggravating the above point, user id from each read receipt
is turned to a User object - and since most of the users are unknown
at early moments, this causes thousands of allocations. Therefore
the new aggregated lastReadEventChanged() only carries user ids, and
clients will have to resolve them to User objects if they need.
3. Despite fairly tight conditions (note we're talking about thousands
of receipts), Quotient still creates an intermediate C++ structure
(EventsWithReceipts), only for the sake of passing it to
processEphemeralEvent() that immediately disassembles it back again,
converting to a series of calls to set(Local)LastReadEvent(). To fix
this, processEphemeralEvent() now takes the event content JSON
directly and iterates over it instead.
Aside from that, a few extraneous conditions and logging has been
removed and the whole function rewritten with switchOnType() to reduce
cognitive complexity.
Diffstat (limited to 'lib/room.cpp')
-rw-r--r-- | lib/room.cpp | 190 |
1 files changed, 109 insertions, 81 deletions
diff --git a/lib/room.cpp b/lib/room.cpp index ba4b1d27..b128d2a7 100644 --- a/lib/room.cpp +++ b/lib/room.cpp @@ -279,9 +279,15 @@ public: void dropDuplicateEvents(RoomEvents& events) const; void decryptIncomingEvents(RoomEvents& events); - Changes setLastReadReceipt(const QString& userId, rev_iter_t newMarker, - ReadReceipt newReceipt = {}, - bool deferStatsUpdate = false); + //! \brief update last receipt record for a given user + //! + //! \return previous event id of the receipt if the new receipt changed + //! it, or `none` if no change took place + Omittable<QString> setLastReadReceipt(const QString& userId, rev_iter_t newMarker, + ReadReceipt newReceipt = {}); + Changes setLocalLastReadReceipt(const rev_iter_t& newMarker, + ReadReceipt newReceipt = {}, + bool deferStatsUpdate = false); Changes setFullyReadMarker(const QString &eventId); Changes updateStats(const rev_iter_t& from, const rev_iter_t& to); bool markMessagesAsRead(const rev_iter_t& upToMarker); @@ -701,10 +707,9 @@ void Room::setJoinState(JoinState state) emit joinStateChanged(oldState, state); } -Room::Changes Room::Private::setLastReadReceipt(const QString& userId, - rev_iter_t newMarker, - ReadReceipt newReceipt, - bool deferStatsUpdate) +Omittable<QString> Room::Private::setLastReadReceipt(const QString& userId, + rev_iter_t newMarker, + ReadReceipt newReceipt) { if (newMarker == historyEdge() && !newReceipt.eventId.isEmpty()) newMarker = q->findInTimeline(newReceipt.eventId); @@ -718,7 +723,7 @@ Room::Changes Room::Private::setLastReadReceipt(const QString& userId, // eagerMarker is now just after the desired event for newMarker if (eagerMarker != newMarker.base()) { newMarker = rev_iter_t(eagerMarker); - qCDebug(EPHEMERAL) << "Auto-promoted read receipt for" << userId + qDebug(EPHEMERAL) << "Auto-promoted read receipt for" << userId << "to" << *newMarker; } // Fill newReceipt with the event (and, if needed, timestamp) from @@ -732,14 +737,19 @@ Room::Changes Room::Private::setLastReadReceipt(const QString& userId, const auto prevEventId = storedReceipt.eventId; // Check that either the new marker is actually "newer" than the current one // or, if both markers are at historyEdge(), event ids are different. + // This logic tackles, in particular, the case when the new event is not + // found (most likely, because it's too old and hasn't been fetched from + // the server yet) but there is a previous marker for a user; in that case, + // the previous marker is kept because read receipts are not supposed + // to move backwards. If neither new nor old event is found, the new receipt + // is blindly stored, in a hope it's also "newer" in the timeline. // NB: with reverse iterators, timeline history edge >= sync edge if (prevEventId == newReceipt.eventId || newMarker > q->findInTimeline(prevEventId)) - return Change::None; + return {}; // Finally make the change - Changes changes = Change::Other; auto oldEventReadUsersIt = eventIdReadUsers.find(prevEventId); // clazy:exclude=detaching-member if (oldEventReadUsersIt != eventIdReadUsers.end()) { @@ -751,7 +761,7 @@ Room::Changes Room::Private::setLastReadReceipt(const QString& userId, storedReceipt = move(newReceipt); { - auto dbg = qDebug(EPHEMERAL); // This trick needs qDebug, not qCDebug + auto dbg = qDebug(EPHEMERAL); // NB: qCDebug can't be used like that dbg << "The new read receipt for" << userId << "is now at"; if (newMarker == historyEdge()) dbg << storedReceipt.eventId; @@ -759,25 +769,37 @@ Room::Changes Room::Private::setLastReadReceipt(const QString& userId, dbg << *newMarker; } - // TODO: use Room::member() when it becomes a thing and only emit signals - // for actual members, not just any user - const auto member = q->user(userId); - Q_ASSERT(member != nullptr); - if (isLocalUser(member) && !deferStatsUpdate) { - if (unreadStats.updateOnMarkerMove(q, q->findInTimeline(prevEventId), + // NB: This method, unlike setLocalLastReadReceipt, doesn't emit + // lastReadEventChanged() to avoid numerous emissions when many read + // receipts arrive. It can be called thousands of times during an initial + // sync, e.g. + // TODO: remove in 0.8 + if (const auto member = q->user(userId); !isLocalUser(member)) + emit q->readMarkerForUserMoved(member, prevEventId, + storedReceipt.eventId); + return prevEventId; +} + +Room::Changes Room::Private::setLocalLastReadReceipt(const rev_iter_t& newMarker, + ReadReceipt newReceipt, + bool deferStatsUpdate) +{ + auto prevEventId = + setLastReadReceipt(connection->userId(), newMarker, move(newReceipt)); + if (!prevEventId) + return Change::None; + Changes changes = Change::Other; + if (!deferStatsUpdate) { + if (unreadStats.updateOnMarkerMove(q, q->findInTimeline(*prevEventId), newMarker)) { - qCDebug(MESSAGES) + qDebug(MESSAGES) << "Updated unread event statistics in" << q->objectName() << "after moving the local read receipt:" << unreadStats; changes |= Change::UnreadStats; } Q_ASSERT(unreadStats.isValidFor(q, newMarker)); // post-check } - emit q->lastReadEventChanged(member); - // TODO: remove in 0.8 - if (!isLocalUser(member)) - emit q->readMarkerForUserMoved(member, prevEventId, - storedReceipt.eventId); + emit q->lastReadEventChanged({ connection->userId() }); return changes; } @@ -792,7 +814,7 @@ Room::Changes Room::Private::updateStats(const rev_iter_t& from, Changes changes = Change::None; // Correct the read receipt to never be behind the fully read marker if (readReceiptMarker > fullyReadMarker - && setLastReadReceipt(connection->userId(), fullyReadMarker, {}, true)) { + && setLocalLastReadReceipt(fullyReadMarker, {}, true)) { changes |= Change::Other; readReceiptMarker = q->localReadReceiptMarker(); qCInfo(MESSAGES) << "The local m.read receipt was behind m.fully_read " @@ -890,7 +912,7 @@ Room::Changes Room::Private::setFullyReadMarker(const QString& eventId) QT_IGNORE_DEPRECATIONS(Changes changes = Change::ReadMarker|Change::Other;) if (const auto rm = q->fullyReadMarker(); rm != historyEdge()) { // Pull read receipt if it's behind, and update statistics - changes |= setLastReadReceipt(connection->userId(), rm); + changes |= setLocalLastReadReceipt(rm); if (partiallyReadStats.updateOnMarkerMove(q, prevReadMarker, rm)) { changes |= Change::PartiallyReadStats; qCDebug(MESSAGES) @@ -908,9 +930,8 @@ Room::Changes Room::Private::setFullyReadMarker(const QString& eventId) void Room::setReadReceipt(const QString& atEventId) { - if (const auto changes = d->setLastReadReceipt(localUser()->id(), - historyEdge(), - { atEventId })) { + if (const auto changes = + d->setLocalLastReadReceipt(historyEdge(), { atEventId })) { connection()->callApi<PostReceiptJob>(BackgroundRequest, id(), QStringLiteral("m.read"), QUrl::toPercentEncoding(atEventId)); @@ -3198,59 +3219,66 @@ Room::Changes Room::processEphemeralEvent(EventPtr&& event) Changes changes {}; QElapsedTimer et; et.start(); - if (auto* evt = eventCast<TypingEvent>(event)) { - const auto& users = evt->users(); - d->usersTyping.clear(); - d->usersTyping.reserve(users.size()); // Assume all are members - for (const auto& userId : users) - if (isMember(userId)) - d->usersTyping.append(user(userId)); - - if (users.size() > 3 || et.nsecsElapsed() >= profilerMinNsecs()) - qCDebug(PROFILER) - << "Processing typing events from" << users.size() - << "user(s) in" << objectName() << "took" << et; - emit typingChanged(); - } - if (auto* evt = eventCast<ReceiptEvent>(event)) { - int totalReceipts = 0; - const auto& eventsWithReceipts = evt->eventsWithReceipts(); - for (const auto& p : eventsWithReceipts) { - totalReceipts += p.receipts.size(); - const auto newMarker = findInTimeline(p.evtId); - if (newMarker == historyEdge()) - qCDebug(EPHEMERAL) - << "Event" << p.evtId - << "is not found; saving read receipt(s) anyway"; - // If the event is not found (most likely, because it's too old and - // hasn't been fetched from the server yet) but there is a previous - // marker for a user, keep the previous marker because read receipts - // are not supposed to move backwards. Otherwise, blindly store - // the event id for this user and update the read marker when/if - // the event is fetched later on. - const auto updatedCount = std::count_if( - p.receipts.cbegin(), p.receipts.cend(), - [this, &changes, &newMarker, &evtId = p.evtId](const auto& r) { - const auto change = - d->setLastReadReceipt(r.userId, newMarker, - { evtId, r.timestamp }); - changes |= change; - return change & Change::Any; - }); - - if (p.receipts.size() > 1) - qCDebug(EPHEMERAL) << p.evtId << "marked as read for" - << updatedCount << "user(s)"; - if (updatedCount < p.receipts.size()) - qCDebug(EPHEMERAL) << p.receipts.size() - updatedCount - << "receipts were skipped"; - } - if (eventsWithReceipts.size() > 3 || totalReceipts > 10 - || et.nsecsElapsed() >= profilerMinNsecs()) - qCDebug(PROFILER) << "Processing" << totalReceipts - << "receipt(s) on" << eventsWithReceipts.size() - << "event(s) in" << objectName() << "took" << et; - } + switchOnType(*event, + [this, &et](const TypingEvent& evt) { + const auto& users = evt.users(); + d->usersTyping.clear(); + d->usersTyping.reserve(users.size()); // Assume all are members + for (const auto& userId : users) + if (isMember(userId)) + d->usersTyping.append(user(userId)); + + if (d->usersTyping.size() > 3 + || et.nsecsElapsed() >= profilerMinNsecs()) + qDebug(PROFILER) + << "Processing typing events from" << users.size() + << "user(s) in" << objectName() << "took" << et; + emit typingChanged(); + }, + [this, &changes, &et](const ReceiptEvent& evt) { + const auto& receiptsJson = evt.contentJson(); + QVector<QString> updatedUserIds; + // Most often (especially for bigger batches), receipts are + // scattered across events (an anecdotal evidence showed 1.2-1.3 + // receipts per event on average). + updatedUserIds.reserve(receiptsJson.size() * 2); + for (auto eventIt = receiptsJson.begin(); + eventIt != receiptsJson.end(); ++eventIt) { + const auto evtId = eventIt.key(); + const auto newMarker = findInTimeline(evtId); + if (newMarker == historyEdge()) + qDebug(EPHEMERAL) + << "Event" << evtId + << "is not found; saving read receipt(s) anyway"; + const auto reads = + eventIt.value().toObject().value("m.read"_ls).toObject(); + for (auto userIt = reads.begin(); userIt != reads.end(); + ++userIt) { + ReadReceipt rr{ evtId, + fromJson<QDateTime>( + userIt->toObject().value("ts"_ls)) }; + const auto userId = userIt.key(); + if (userId == connection()->userId()) { + // Local user is special, and will get a signal about + // its read receipt separately from (and before) a + // signal on everybody else. No particular reason, just + // less cumbersome code. + changes |= d->setLocalLastReadReceipt(newMarker, rr); + } else if (d->setLastReadReceipt(userId, newMarker, rr)) { + changes |= Change::Other; + updatedUserIds.push_back(userId); + } + } + } + if (updatedUserIds.size() > 10 + || et.nsecsElapsed() >= profilerMinNsecs()) + qDebug(PROFILER) + << "Processing" << updatedUserIds + << "non-local receipt(s) on" << receiptsJson.size() + << "event(s) in" << objectName() << "took" << et; + if (!updatedUserIds.empty()) + emit lastReadEventChanged(updatedUserIds); + }); return changes; } |