diff options
author | Kitsune Ral <Kitsune-Ral@users.sf.net> | 2019-08-29 12:36:34 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-29 12:36:34 +0900 |
commit | 3b6959ab8ca5cd8d55c2e4627eb9a61dfb2506ff (patch) | |
tree | 90c8f7c0c754e3a20dadc7296374bed940f005c6 /lib | |
parent | c27916e7f96860659c5cfd7d311f6b10db3d592f (diff) | |
parent | af0f7e3ae58c1f28baa9fe1385d70eefbacc0e8a (diff) | |
download | libquotient-3b6959ab8ca5cd8d55c2e4627eb9a61dfb2506ff.tar.gz libquotient-3b6959ab8ca5cd8d55c2e4627eb9a61dfb2506ff.zip |
Merge pull request #348 from quotient-im/kitsune-better-basejob
Better BaseJob
Diffstat (limited to 'lib')
-rw-r--r-- | lib/connection.cpp | 98 | ||||
-rw-r--r-- | lib/connection.h | 20 | ||||
-rw-r--r-- | lib/connectiondata.cpp | 82 | ||||
-rw-r--r-- | lib/connectiondata.h | 14 | ||||
-rw-r--r-- | lib/jobs/basejob.cpp | 294 | ||||
-rw-r--r-- | lib/jobs/basejob.h | 99 | ||||
-rw-r--r-- | lib/jobs/downloadfilejob.cpp | 4 | ||||
-rw-r--r-- | lib/jobs/downloadfilejob.h | 4 | ||||
-rw-r--r-- | lib/room.cpp | 2 |
9 files changed, 385 insertions, 232 deletions
diff --git a/lib/connection.cpp b/lib/connection.cpp index 56613857..5ebdcf6c 100644 --- a/lib/connection.cpp +++ b/lib/connection.cpp @@ -99,7 +99,6 @@ public: DirectChatsMap dcLocalAdditions; DirectChatsMap dcLocalRemovals; std::unordered_map<QString, EventPtr> accountData; - QString userId; int syncLoopTimeout = -1; GetCapabilitiesJob* capabilitiesJob = nullptr; @@ -116,7 +115,7 @@ public: != "json"; bool lazyLoading = false; - void connectWithToken(const QString& user, const QString& accessToken, + void connectWithToken(const QString& userId, const QString& accessToken, const QString& deviceId); void removeRoom(const QString& roomId); @@ -132,7 +131,8 @@ public: void packAndSendAccountData(EventPtr&& event) { const auto eventType = event->matrixType(); - q->callApi<SetAccountDataJob>(userId, eventType, event->contentJson()); + q->callApi<SetAccountDataJob>(data->userId(), eventType, + event->contentJson()); accountData[eventType] = std::move(event); emit q->accountDataChanged(eventType); } @@ -159,7 +159,7 @@ Connection::Connection(QObject* parent) : Connection({}, parent) {} Connection::~Connection() { - qCDebug(MAIN) << "deconstructing connection object for" << d->userId; + qCDebug(MAIN) << "deconstructing connection object for" << userId(); stopSync(); } @@ -234,11 +234,11 @@ void Connection::doConnectToServer(const QString& user, const QString& password, const QString& initialDeviceName, const QString& deviceId) { - auto loginJob = callApi<LoginJob>( - QStringLiteral("m.login.password"), - UserIdentifier { QStringLiteral("m.id.user"), - { { QStringLiteral("user"), user } } }, - password, /*token*/ "", deviceId, initialDeviceName); + auto loginJob = + callApi<LoginJob>(QStringLiteral("m.login.password"), + UserIdentifier { QStringLiteral("m.id.user"), + { { QStringLiteral("user"), user } } }, + password, /*token*/ "", deviceId, initialDeviceName); connect(loginJob, &BaseJob::success, this, [this, loginJob] { d->connectWithToken(loginJob->userId(), loginJob->accessToken(), loginJob->deviceId()); @@ -299,11 +299,11 @@ bool Connection::loadingCapabilities() const return d->capabilities.roomVersions.omitted(); } -void Connection::Private::connectWithToken(const QString& user, +void Connection::Private::connectWithToken(const QString& userId, const QString& accessToken, const QString& deviceId) { - userId = user; + data->setUserId(userId); q->user(); // Creates a User object for the local user data->setToken(accessToken.toLatin1()); data->setDeviceId(deviceId); @@ -354,9 +354,9 @@ void Connection::sync(int timeout) Filter filter; filter.room->timeline->limit = 100; filter.room->state->lazyLoadMembers = d->lazyLoading; - auto job = d->syncJob = callApi<SyncJob>(BackgroundRequest, - d->data->lastEvent(), filter, - timeout); + auto job = d->syncJob = + callApi<SyncJob>(BackgroundRequest, d->data->lastEvent(), filter, + timeout); connect(job, &SyncJob::success, this, [this, job] { onSyncSuccess(job->takeData()); d->syncJob = nullptr; @@ -485,7 +485,7 @@ void Connection::onSyncSuccess(SyncData&& data, bool fromCache) [this, &eventPtr](const Event& accountEvent) { if (is<IgnoredUsersEvent>(accountEvent)) qCDebug(MAIN) - << "Users ignored by" << d->userId << "updated:" + << "Users ignored by" << userId() << "updated:" << QStringList::fromSet(ignoredUsers()).join(','); auto& currentData = d->accountData[accountEvent.matrixType()]; @@ -504,7 +504,7 @@ void Connection::onSyncSuccess(SyncData&& data, bool fromCache) qDebug(MAIN) << "Sending updated direct chats to the server:" << d->dcLocalRemovals.size() << "removal(s)," << d->dcLocalAdditions.size() << "addition(s)"; - callApi<SetAccountDataJob>(d->userId, QStringLiteral("m.direct"), + callApi<SetAccountDataJob>(userId(), QStringLiteral("m.direct"), toJson(d->directChats)); d->dcLocalAdditions.clear(); d->dcLocalRemovals.clear(); @@ -634,8 +634,8 @@ DownloadFileJob* Connection::downloadFile(const QUrl& url, { auto mediaId = url.authority() + url.path(); auto idParts = splitMediaId(mediaId); - auto* job = callApi<DownloadFileJob>(idParts.front(), idParts.back(), - localFilename); + auto* job = + callApi<DownloadFileJob>(idParts.front(), idParts.back(), localFilename); return job; } @@ -648,7 +648,7 @@ Connection::createRoom(RoomVisibility visibility, const QString& alias, const QVector<CreateRoomJob::Invite3pid>& invite3pids, const QJsonObject& creationContent) { - invites.removeOne(d->userId); // The creator is by definition in the room + invites.removeOne(userId()); // The creator is by definition in the room auto job = callApi<CreateRoomJob>(visibility == PublishRoom ? QStringLiteral("public") : QStringLiteral("private"), @@ -672,12 +672,7 @@ Connection::createRoom(RoomVisibility visibility, const QString& alias, void Connection::requestDirectChat(const QString& userId) { - if (auto* u = user(userId)) - requestDirectChat(u); - else - qCCritical(MAIN) << "Connection::requestDirectChat: Couldn't get a " - "user object for" - << userId; + doInDirectChat(userId, [this](Room* r) { emit directChatAvailable(r); }); } void Connection::requestDirectChat(User* u) @@ -700,7 +695,7 @@ void Connection::doInDirectChat(User* u, const std::function<void(Room*)>& operation) { Q_ASSERT(u); - const auto& userId = u->id(); + const auto& otherUserId = u->id(); // There can be more than one DC; find the first valid (existing and // not left), and delete inexistent (forgotten?) ones along the way. DirectChatsMap removals; @@ -710,9 +705,9 @@ void Connection::doInDirectChat(User* u, if (auto r = room(roomId, JoinState::Join)) { Q_ASSERT(r->id() == roomId); // A direct chat with yourself should only involve yourself :) - if (userId == d->userId && r->totalMemberCount() > 1) + if (otherUserId == userId() && r->totalMemberCount() > 1) continue; - qCDebug(MAIN) << "Requested direct chat with" << userId + qCDebug(MAIN) << "Requested direct chat with" << otherUserId << "is already available as" << r->id(); operation(r); return; @@ -721,10 +716,10 @@ void Connection::doInDirectChat(User* u, Q_ASSERT(ir->id() == roomId); auto j = joinRoom(ir->id()); connect(j, &BaseJob::success, this, - [this, roomId, userId, operation] { + [this, roomId, otherUserId, operation] { qCDebug(MAIN) << "Joined the already invited direct chat with" - << userId << "as" << roomId; + << otherUserId << "as" << roomId; operation(room(roomId, JoinState::Join)); }); return; @@ -734,7 +729,7 @@ void Connection::doInDirectChat(User* u, if (room(roomId, JoinState::Leave)) continue; - qCWarning(MAIN) << "Direct chat with" << userId << "known as room" + qCWarning(MAIN) << "Direct chat with" << otherUserId << "known as room" << roomId << "is not valid and will be discarded"; // Postpone actual deletion until we finish iterating d->directChats. removals.insert(it.key(), it.value()); @@ -750,9 +745,9 @@ void Connection::doInDirectChat(User* u, emit directChatsListChanged({}, removals); } - auto j = createDirectChat(userId); - connect(j, &BaseJob::success, this, [this, j, userId, operation] { - qCDebug(MAIN) << "Direct chat with" << userId << "has been created as" + auto j = createDirectChat(otherUserId); + connect(j, &BaseJob::success, this, [this, j, otherUserId, operation] { + qCDebug(MAIN) << "Direct chat with" << otherUserId << "has been created as" << j->roomId(); operation(room(j->roomId(), JoinState::Join)); }); @@ -785,7 +780,7 @@ ForgetRoomJob* Connection::forgetRoom(const QString& id) [this, leaveJob, forgetJob, room] { if (leaveJob->error() == BaseJob::Success || leaveJob->error() == BaseJob::NotFoundError) { - forgetJob->start(connectionData()); + run(forgetJob); // If the matching /sync response hasn't arrived yet, // mark the room for explicit deletion if (room->joinState() != JoinState::Leave) @@ -799,7 +794,7 @@ ForgetRoomJob* Connection::forgetRoom(const QString& id) }); connect(leaveJob, &BaseJob::failure, forgetJob, &BaseJob::abandon); } else - forgetJob->start(connectionData()); + run(forgetJob); connect(forgetJob, &BaseJob::result, this, [this, id, forgetJob] { // Leave room in case of success, or room not known by server if (forgetJob->error() == BaseJob::Success @@ -845,7 +840,7 @@ SendMessageJob* Connection::sendMessage(const QString& roomId, QUrl Connection::homeserver() const { return d->data->baseUrl(); } -QString Connection::domain() const { return d->userId.section(':', 1); } +QString Connection::domain() const { return userId().section(':', 1); } Room* Connection::room(const QString& roomId, JoinStates states) const { @@ -905,30 +900,30 @@ Room* Connection::invitation(const QString& roomId) const return d->roomMap.value({ roomId, true }, nullptr); } -User* Connection::user(const QString& userId) +User* Connection::user(const QString& uId) { - if (userId.isEmpty()) + if (uId.isEmpty()) return nullptr; - if (!userId.startsWith('@') || !userId.contains(':')) { - qCCritical(MAIN) << "Malformed userId:" << userId; + if (!uId.startsWith('@') || !uId.contains(':')) { + qCCritical(MAIN) << "Malformed userId:" << uId; return nullptr; } - if (d->userMap.contains(userId)) - return d->userMap.value(userId); - auto* user = userFactory()(this, userId); - d->userMap.insert(userId, user); + if (d->userMap.contains(uId)) + return d->userMap.value(uId); + auto* user = userFactory()(this, uId); + d->userMap.insert(uId, user); emit newUser(user); return user; } const User* Connection::user() const { - return d->userMap.value(d->userId, nullptr); + return d->userMap.value(userId(), nullptr); } -User* Connection::user() { return user(d->userId); } +User* Connection::user() { return user(userId()); } -QString Connection::userId() const { return d->userId; } +QString Connection::userId() const { return d->data->userId(); } QString Connection::deviceId() const { return d->data->deviceId(); } @@ -1360,6 +1355,13 @@ void Connection::setLazyLoading(bool newValue) } } +void Connection::run(BaseJob* job, RunningPolicy runningPolicy) const +{ + connect(job, &BaseJob::failure, this, &Connection::requestFailed); + job->prepare(d->data.get(), runningPolicy & BackgroundRequest); + d->data->submit(job); +} + void Connection::getTurnServers() { auto job = callApi<GetTurnServerJob>(); diff --git a/lib/connection.h b/lib/connection.h index c807b827..7e32e5c9 100644 --- a/lib/connection.h +++ b/lib/connection.h @@ -37,6 +37,8 @@ class Account; } namespace Quotient { +Q_NAMESPACE + class Room; class User; class ConnectionData; @@ -89,10 +91,12 @@ static inline user_factory_t defaultUserFactory() /** Enumeration with flags defining the network job running policy * So far only background/foreground flags are available. * - * \sa Connection::callApi + * \sa Connection::callApi, Connection::run */ enum RunningPolicy { ForegroundRequest = 0x0, BackgroundRequest = 0x1 }; +Q_ENUM_NS(RunningPolicy) + class Connection : public QObject { Q_OBJECT @@ -352,9 +356,12 @@ public: bool lazyLoading() const; void setLazyLoading(bool newValue); - /** Start a job of a specified type with specified arguments and policy + /*! Start a pre-created job object on this connection */ + void run(BaseJob* job, RunningPolicy runningPolicy = ForegroundRequest) const; + + /*! Start a job of a specified type with specified arguments and policy * - * This is a universal method to start a job of a type passed + * This is a universal method to create and start a job of a type passed * as a template parameter. The policy allows to fine-tune the way * the job is executed - as of this writing it means a choice * between "foreground" and "background". @@ -368,14 +375,13 @@ public: JobT* callApi(RunningPolicy runningPolicy, JobArgTs&&... jobArgs) const { auto job = new JobT(std::forward<JobArgTs>(jobArgs)...); - connect(job, &BaseJob::failure, this, &Connection::requestFailed); - job->start(connectionData(), runningPolicy & BackgroundRequest); + run(job, runningPolicy); return job; } - /** Start a job of a specified type with specified arguments + /*! Start a job of a specified type with specified arguments * - * This is an overload that calls the job with "foreground" policy. + * This is an overload that runs the job with "foreground" policy. */ template <typename JobT, typename... JobArgTs> JobT* callApi(JobArgTs&&... jobArgs) const diff --git a/lib/connectiondata.cpp b/lib/connectiondata.cpp index 486de03d..a3807fc4 100644 --- a/lib/connectiondata.cpp +++ b/lib/connectiondata.cpp @@ -20,27 +20,93 @@ #include "logging.h" #include "networkaccessmanager.h" +#include "jobs/basejob.h" + +#include <QtCore/QTimer> +#include <QtCore/QPointer> + +#include <array> +#include <queue> using namespace Quotient; -struct ConnectionData::Private { - explicit Private(QUrl url) : baseUrl(std::move(url)) {} +class ConnectionData::Private { +public: + explicit Private(QUrl url) : baseUrl(std::move(url)) + { + rateLimiter.setSingleShot(true); + } QUrl baseUrl; QByteArray accessToken; QString lastEvent; + QString userId; QString deviceId; mutable unsigned int txnCounter = 0; - const qint64 id = QDateTime::currentMSecsSinceEpoch(); + const qint64 txnBase = QDateTime::currentMSecsSinceEpoch(); + + QString id() const { return userId + '/' + deviceId; } + + using job_queue_t = std::queue<QPointer<BaseJob>>; + std::array<job_queue_t, 2> jobs; // 0 - foreground, 1 - background + QTimer rateLimiter; }; ConnectionData::ConnectionData(QUrl baseUrl) : d(std::make_unique<Private>(std::move(baseUrl))) -{} +{ + // Each lambda invocation below takes no more than one job from the + // queues (first foreground, then background) and resumes it; then + // restarts the rate limiter timer with duration 0, effectively yielding + // to the event loop and then resuming until both queues are empty. + QObject::connect(&d->rateLimiter, &QTimer::timeout, [this] { + // TODO: Consider moving out all job->sendRequest() invocations to + // a dedicated thread + d->rateLimiter.setInterval(0); + for (auto& q : d->jobs) + while (!q.empty()) { + auto& job = q.front(); + q.pop(); + if (!job || job->error() == BaseJob::Abandoned) + continue; + if (job->error() != BaseJob::Pending) { + qCCritical(MAIN) + << "Job" << job + << "is in the wrong status:" << job->status(); + Q_ASSERT(false); + job->setStatus(BaseJob::Pending); + } + job->sendRequest(); + d->rateLimiter.start(); + return; + } + qCDebug(MAIN) << d->id() << "job queues are empty"; + }); +} ConnectionData::~ConnectionData() = default; +void ConnectionData::submit(BaseJob* job) +{ + Q_ASSERT(job->error() == BaseJob::Pending); + if (!d->rateLimiter.isActive()) { + job->sendRequest(); + return; + } + d->jobs[size_t(job->isBackground())].emplace(job); + qCDebug(MAIN) << job << "queued," << d->jobs.front().size() << "+" + << d->jobs.back().size() << "total jobs in" << d->id() + << "queues"; +} + +void ConnectionData::limitRate(std::chrono::milliseconds nextCallAfter) +{ + qCDebug(MAIN) << "Jobs for" << (d->userId + "/" + d->deviceId) + << "suspended for" << nextCallAfter.count() << "ms"; + d->rateLimiter.start(nextCallAfter); +} + QByteArray ConnectionData::accessToken() const { return d->accessToken; } QUrl ConnectionData::baseUrl() const { return d->baseUrl; } @@ -75,12 +141,15 @@ void ConnectionData::setPort(int port) const QString& ConnectionData::deviceId() const { return d->deviceId; } +const QString& ConnectionData::userId() const { return d->userId; } + void ConnectionData::setDeviceId(const QString& deviceId) { d->deviceId = deviceId; - qCDebug(MAIN) << "updated deviceId to" << d->deviceId; } +void ConnectionData::setUserId(const QString& userId) { d->userId = userId; } + QString ConnectionData::lastEvent() const { return d->lastEvent; } void ConnectionData::setLastEvent(QString identifier) @@ -90,5 +159,6 @@ void ConnectionData::setLastEvent(QString identifier) QByteArray ConnectionData::generateTxnId() const { - return QByteArray::number(d->id) + 'q' + QByteArray::number(++d->txnCounter); + return d->deviceId.toLatin1() + QByteArray::number(d->txnBase) + + QByteArray::number(++d->txnCounter); } diff --git a/lib/connectiondata.h b/lib/connectiondata.h index 80ace08c..b367c977 100644 --- a/lib/connectiondata.h +++ b/lib/connectiondata.h @@ -21,25 +21,35 @@ #include <QtCore/QUrl> #include <memory> +#include <chrono> class QNetworkAccessManager; namespace Quotient { +class BaseJob; + class ConnectionData { public: explicit ConnectionData(QUrl baseUrl); virtual ~ConnectionData(); + void submit(BaseJob* job); + void limitRate(std::chrono::milliseconds nextCallAfter); + QByteArray accessToken() const; QUrl baseUrl() const; const QString& deviceId() const; - + const QString& userId() const; QNetworkAccessManager* nam() const; + void setBaseUrl(QUrl baseUrl); void setToken(QByteArray accessToken); + [[deprecated("Use setBaseUrl() instead")]] void setHost(QString host); + [[deprecated("Use setBaseUrl() instead")]] void setPort(int port); void setDeviceId(const QString& deviceId); + void setUserId(const QString& userId); QString lastEvent() const; void setLastEvent(QString identifier); @@ -47,7 +57,7 @@ public: QByteArray generateTxnId() const; private: - struct Private; + class Private; std::unique_ptr<Private> d; }; } // namespace Quotient diff --git a/lib/jobs/basejob.cpp b/lib/jobs/basejob.cpp index f3ba00b5..54931c83 100644 --- a/lib/jobs/basejob.cpp +++ b/lib/jobs/basejob.cpp @@ -31,6 +31,8 @@ #include <array> using namespace Quotient; +using std::chrono::seconds, std::chrono::milliseconds; +using namespace std::chrono_literals; struct NetworkReplyDeleter : public QScopedPointerDeleteLater { static inline void cleanup(QNetworkReply* reply) @@ -43,6 +45,11 @@ struct NetworkReplyDeleter : public QScopedPointerDeleteLater { class BaseJob::Private { public: + struct JobTimeoutConfig { + seconds jobTimeout; + seconds nextRetryInterval; + }; + // Using an idiom from clang-tidy: // http://clang.llvm.org/extra/clang-tidy/checks/modernize-pass-by-value.html Private(HttpVerb v, QString endpoint, const QUrlQuery& q, Data&& data, @@ -52,12 +59,14 @@ public: , requestQuery(q) , requestData(std::move(data)) , needsToken(nt) - {} + { + timer.setSingleShot(true); + retryTimer.setSingleShot(true); + } - void sendRequest(bool inBackground); - const JobTimeoutConfig& getCurrentTimeoutConfig() const; + void sendRequest(); - const ConnectionData* connection = nullptr; + ConnectionData* connection = nullptr; // Contents for the network request HttpVerb verb; @@ -67,26 +76,41 @@ public: Data requestData; bool needsToken; + bool inBackground = false; + // There's no use of QMimeType here because we don't want to match // content types against the known MIME type hierarchy; and at the same // type QMimeType is of little help with MIME type globs (`text/*` etc.) - QByteArrayList expectedContentTypes; + QByteArrayList expectedContentTypes { "application/json" }; QScopedPointer<QNetworkReply, NetworkReplyDeleter> reply; - Status status = Pending; + Status status = Unprepared; QByteArray rawResponse; QUrl errorUrl; //< May contain a URL to help with some errors + LoggingCategory logCat = JOBS; + QTimer timer; QTimer retryTimer; - QVector<JobTimeoutConfig> errorStrategy = { { 90, 5 }, - { 90, 10 }, - { 120, 30 } }; - int maxRetries = errorStrategy.size(); + static constexpr std::array<const JobTimeoutConfig, 3> errorStrategy { + { { 90s, 5s }, { 90s, 10s }, { 120s, 30s } } + }; + int maxRetries = int(errorStrategy.size()); int retriesTaken = 0; - LoggingCategory logCat = JOBS; + const JobTimeoutConfig& getCurrentTimeoutConfig() const + { + return errorStrategy[std::min(size_t(retriesTaken), + errorStrategy.size() - 1)]; + } + + QString urlForLog() const + { + return reply + ? reply->url().toString(QUrl::RemoveQuery) + : makeRequestUrl(connection->baseUrl(), apiEndpoint).toString(); + } }; BaseJob::BaseJob(HttpVerb verb, const QString& name, const QString& endpoint, @@ -99,9 +123,11 @@ BaseJob::BaseJob(HttpVerb verb, const QString& name, const QString& endpoint, : d(new Private(verb, endpoint, query, std::move(data), needsToken)) { setObjectName(name); - setExpectedContentTypes({ "application/json" }); - d->timer.setSingleShot(true); connect(&d->timer, &QTimer::timeout, this, &BaseJob::timeout); + connect(&d->retryTimer, &QTimer::timeout, this, [this] { + setStatus(Pending); + sendRequest(); + }); } BaseJob::~BaseJob() @@ -112,15 +138,12 @@ BaseJob::~BaseJob() QUrl BaseJob::requestUrl() const { - return d->reply ? d->reply->request().url() : QUrl(); + return d->reply ? d->reply->url() : QUrl(); } bool BaseJob::isBackground() const { - return d->reply - && d->reply->request() - .attribute(QNetworkRequest::BackgroundRequestAttribute) - .toBool(); + return d->inBackground; } const QString& BaseJob::apiEndpoint() const { return d->apiEndpoint; } @@ -184,7 +207,7 @@ QUrl BaseJob::makeRequestUrl(QUrl baseUrl, const QString& path, return baseUrl; } -void BaseJob::Private::sendRequest(bool inBackground) +void BaseJob::Private::sendRequest() { QNetworkRequest req { makeRequestUrl(connection->baseUrl(), apiEndpoint, requestQuery) }; @@ -193,17 +216,13 @@ void BaseJob::Private::sendRequest(bool inBackground) req.setRawHeader("Authorization", QByteArray("Bearer ") + connection->accessToken()); req.setAttribute(QNetworkRequest::BackgroundRequestAttribute, inBackground); -#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) req.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true); req.setMaximumRedirectsAllowed(10); -#endif req.setAttribute(QNetworkRequest::HttpPipeliningAllowedAttribute, true); -#if QT_VERSION >= QT_VERSION_CHECK(5, 9, 0) - // some sources claim that there are issues with QT 5.8 req.setAttribute(QNetworkRequest::HTTP2AllowedAttribute, true); -#endif for (auto it = requestHeaders.cbegin(); it != requestHeaders.cend(); ++it) req.setRawHeader(it.key(), it.value()); + switch (verb) { case HttpVerb::Get: reply.reset(connection->nam()->get(req)); @@ -220,36 +239,31 @@ void BaseJob::Private::sendRequest(bool inBackground) } } -void BaseJob::beforeStart(const ConnectionData*) {} +void BaseJob::doPrepare() {} -void BaseJob::afterStart(const ConnectionData*, QNetworkReply*) {} +void BaseJob::onSentRequest(QNetworkReply*) {} void BaseJob::beforeAbandon(QNetworkReply*) {} -void BaseJob::start(const ConnectionData* connData, bool inBackground) +void BaseJob::prepare(ConnectionData* connData, bool inBackground) { + d->inBackground = inBackground; d->connection = connData; - d->retryTimer.setSingleShot(true); - connect(&d->retryTimer, &QTimer::timeout, this, - [this, inBackground] { sendRequest(inBackground); }); - - beforeStart(connData); - if (status().good()) - sendRequest(inBackground); - if (status().good()) - afterStart(connData, d->reply.data()); - if (!status().good()) + doPrepare(); + if (status().code != Unprepared && status().code != Pending) QTimer::singleShot(0, this, &BaseJob::finishJob); + setStatus(Pending); } -void BaseJob::sendRequest(bool inBackground) +void BaseJob::sendRequest() { - emit aboutToStart(); - d->retryTimer.stop(); // In case we were counting down at the moment - qCDebug(d->logCat) << this << "sending request to" << d->apiEndpoint; - if (!d->requestQuery.isEmpty()) - qCDebug(d->logCat) << " query:" << d->requestQuery.toString(); - d->sendRequest(inBackground); + if (status().code == Abandoned) + return; + Q_ASSERT(d->connection && status().code == Pending); + qCDebug(d->logCat) << "Making request to" << d->urlForLog(); + emit aboutToSendRequest(); + d->sendRequest(); + Q_ASSERT(d->reply); connect(d->reply.data(), &QNetworkReply::finished, this, &BaseJob::gotReply); if (d->reply->isRunning()) { connect(d->reply.data(), &QNetworkReply::metaDataChanged, this, @@ -259,10 +273,12 @@ void BaseJob::sendRequest(bool inBackground) connect(d->reply.data(), &QNetworkReply::downloadProgress, this, &BaseJob::downloadProgress); d->timer.start(getCurrentTimeout()); - qCDebug(d->logCat) << this << "request has been sent"; - emit started(); + qCInfo(d->logCat).noquote() << "Request sent to" << d->urlForLog(); + onSentRequest(d->reply.data()); + emit sentRequest(); } else - qCWarning(d->logCat) << this << "request could not start"; + qCWarning(d->logCat).noquote() + << "Request could not start:" << d->urlForLog(); } void BaseJob::checkReply() { setStatus(doCheckReply(d->reply.data())); } @@ -283,13 +299,7 @@ void BaseJob::gotReply() parseError(d->reply.data(), QJsonDocument::fromJson(d->rawResponse).object())); } - - if (status().code != TooManyRequestsError) - finishJob(); - else { - stop(); - emit retryScheduled(d->retriesTaken, d->retryTimer.interval()); - } + finishJob(); } bool checkContentType(const QByteArray& type, const QByteArrayList& patterns) @@ -318,6 +328,47 @@ bool checkContentType(const QByteArray& type, const QByteArrayList& patterns) return false; } +BaseJob::Status BaseJob::Status::fromHttpCode(int httpCode, QString msg) +{ + // clang-format off + return { [httpCode]() -> StatusCode { + if (httpCode / 10 == 41) // 41x errors + return httpCode == 410 ? IncorrectRequestError : NotFoundError; + switch (httpCode) { + case 401: case 403: case 407: + return ContentAccessError; + case 404: + return NotFoundError; + case 400: case 405: case 406: case 426: case 428: case 505: + case 494: // Unofficial nginx "Request header too large" + case 497: // Unofficial nginx "HTTP request sent to HTTPS port" + return IncorrectRequestError; + case 429: + return TooManyRequestsError; + case 501: case 510: + return RequestNotImplementedError; + case 511: + return NetworkAuthRequiredError; + default: + return NetworkError; + } + }(), std::move(msg) }; + // clang-format on +} + +QDebug BaseJob::Status::dumpToLog(QDebug dbg) const +{ + QDebugStateSaver _s(dbg); + dbg.noquote().nospace(); + if (auto* const k = QMetaEnum::fromType<StatusCode>().valueToKey(code)) { + const QByteArray b = k; + dbg << b.mid(b.lastIndexOf(':')); + } else + dbg << code; + return dbg << ": " << message; + +} + BaseJob::Status BaseJob::doCheckReply(QNetworkReply* reply) const { // QNetworkReply error codes seem to be flawed when it comes to HTTP; @@ -327,62 +378,30 @@ BaseJob::Status BaseJob::doCheckReply(QNetworkReply* reply) const const auto httpCodeHeader = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute); if (!httpCodeHeader.isValid()) { - qCWarning(d->logCat) << this << "didn't get valid HTTP headers"; + qCWarning(d->logCat) << "No valid HTTP headers from" << d->urlForLog(); return { NetworkError, reply->errorString() }; } - const QString replyState = reply->isRunning() - ? QStringLiteral("(tentative)") - : QStringLiteral("(final)"); - const auto urlString = '|' + d->reply->url().toDisplayString(); const auto httpCode = httpCodeHeader.toInt(); - const auto reason = - reply->attribute(QNetworkRequest::HttpReasonPhraseAttribute).toString(); if (httpCode / 100 == 2) // 2xx { - qCDebug(d->logCat).noquote().nospace() << this << urlString; - qCDebug(d->logCat).noquote() << " " << httpCode << reason << replyState; + if (reply->isFinished()) + qCInfo(d->logCat) << httpCode << "<-" << d->urlForLog(); if (!checkContentType(reply->rawHeader("Content-Type"), d->expectedContentTypes)) return { UnexpectedResponseTypeWarning, "Unexpected content type of the response" }; return NoError; } + if (reply->isFinished()) + qCWarning(d->logCat) << httpCode << "<-" << d->urlForLog(); - qCWarning(d->logCat).noquote().nospace() << this << urlString; - qCWarning(d->logCat).noquote() << " " << httpCode << reason << replyState; - return { [httpCode]() -> StatusCode { - if (httpCode / 10 == 41) - return httpCode == 410 ? IncorrectRequestError - : NotFoundError; - switch (httpCode) { - case 401: - case 403: - case 407: - return ContentAccessError; - case 404: - return NotFoundError; - case 400: - case 405: - case 406: - case 426: - case 428: - case 505: - case 494: // Unofficial nginx "Request header too large" - case 497: // Unofficial nginx "HTTP request sent to HTTPS port" - return IncorrectRequestError; - case 429: - return TooManyRequestsError; - case 501: - case 510: - return RequestNotImplementedError; - case 511: - return NetworkAuthRequiredError; - default: - return NetworkError; - } - }(), - reply->errorString() }; + auto message = reply->errorString(); + if (message.isEmpty()) + message = reply->attribute(QNetworkRequest::HttpReasonPhraseAttribute) + .toString(); + + return Status::fromHttpCode(httpCode, message); } BaseJob::Status BaseJob::parseReply(QNetworkReply* reply) @@ -398,20 +417,19 @@ BaseJob::Status BaseJob::parseReply(QNetworkReply* reply) BaseJob::Status BaseJob::parseJson(const QJsonDocument&) { return Success; } -BaseJob::Status BaseJob::parseError(QNetworkReply* reply, +BaseJob::Status BaseJob::parseError(QNetworkReply* /*reply*/, const QJsonObject& errorJson) { const auto errCode = errorJson.value("errcode"_ls).toString(); if (error() == TooManyRequestsError || errCode == "M_LIMIT_EXCEEDED") { QString msg = tr("Too many requests"); - auto retryInterval = errorJson.value("retry_after_ms"_ls).toInt(-1); - if (retryInterval != -1) - msg += tr(", next retry advised after %1 ms").arg(retryInterval); + int64_t retryAfterMs = errorJson.value("retry_after_ms"_ls).toInt(-1); + if (retryAfterMs >= 0) + msg += tr(", next retry advised after %1 ms").arg(retryAfterMs); else // We still have to figure some reasonable interval - retryInterval = getNextRetryInterval(); + retryAfterMs = getNextRetryMs(); - qCWarning(d->logCat) << this << "will retry in" << retryInterval << "ms"; - d->retryTimer.start(retryInterval); + d->connection->limitRate(milliseconds(retryAfterMs)); return { TooManyRequestsError, msg }; } @@ -441,7 +459,7 @@ BaseJob::Status BaseJob::parseError(QNetworkReply* reply, void BaseJob::stop() { - // This method is used to semi-finalise the job before retrying; so + // This method is (also) used to semi-finalise the job before retrying; so // stop the timeout timer but keep the retry timer running. d->timer.stop(); if (d->reply) { @@ -449,7 +467,7 @@ void BaseJob::stop() if (d->reply->isRunning()) { qCWarning(d->logCat) << this << "stopped without ready network reply"; - d->reply->abort(); + d->reply->abort(); // Keep the reply object in case clients need it } } else qCWarning(d->logCat) << this << "stopped with empty network reply"; @@ -458,26 +476,30 @@ void BaseJob::stop() void BaseJob::finishJob() { stop(); - if ((error() == NetworkError || error() == TimeoutError) + if (error() == TooManyRequests) { + emit rateLimited(); + setStatus(Pending); + d->connection->submit(this); + return; + } + if ((error() == NetworkError || error() == Timeout) && d->retriesTaken < d->maxRetries) { - // TODO: The whole retrying thing should be put to ConnectionManager + // TODO: The whole retrying thing should be put to Connection(Manager) // otherwise independently retrying jobs make a bit of notification // storm towards the UI. - const auto retryInterval = error() == TimeoutError - ? 0 - : getNextRetryInterval(); + const seconds retryIn = error() == Timeout ? 0s : getNextRetryInterval(); ++d->retriesTaken; qCWarning(d->logCat).nospace() << this << ": retry #" << d->retriesTaken - << " in " << retryInterval / 1000 << " s"; - d->retryTimer.start(retryInterval); - emit retryScheduled(d->retriesTaken, retryInterval); + << " in " << retryIn.count() << " s"; + d->retryTimer.start(retryIn); + emit retryScheduled(d->retriesTaken, milliseconds(retryIn).count()); return; } - // Notify those interested in any completion of the job (including killing) + // Notify those interested in any completion of the job including abandon() emit finished(this); - emit result(this); + emit result(this); // abandon() doesn't emit this if (error()) emit failure(this); else @@ -486,24 +508,35 @@ void BaseJob::finishJob() deleteLater(); } -const JobTimeoutConfig& BaseJob::Private::getCurrentTimeoutConfig() const +seconds BaseJob::getCurrentTimeout() const { - return errorStrategy[std::min(retriesTaken, errorStrategy.size() - 1)]; + return d->getCurrentTimeoutConfig().jobTimeout; } -BaseJob::duration_t BaseJob::getCurrentTimeout() const +BaseJob::duration_ms_t BaseJob::getCurrentTimeoutMs() const { - return d->getCurrentTimeoutConfig().jobTimeout * 1000; + return milliseconds(getCurrentTimeout()).count(); } -BaseJob::duration_t BaseJob::getNextRetryInterval() const +seconds BaseJob::getNextRetryInterval() const { - return d->getCurrentTimeoutConfig().nextRetryInterval * 1000; + return d->getCurrentTimeoutConfig().nextRetryInterval; } -BaseJob::duration_t BaseJob::millisToRetry() const +BaseJob::duration_ms_t BaseJob::getNextRetryMs() const { - return d->retryTimer.isActive() ? d->retryTimer.remainingTime() : 0; + return milliseconds(getNextRetryInterval()).count(); +} + +milliseconds BaseJob::timeToRetry() const +{ + return d->retryTimer.isActive() ? d->retryTimer.remainingTimeAsDuration() + : 0s; +} + +BaseJob::duration_ms_t BaseJob::millisToRetry() const +{ + return timeToRetry().count(); } int BaseJob::maxRetries() const { return d->maxRetries; } @@ -582,21 +615,22 @@ void BaseJob::setStatus(Status s) { // The crash that led to this code has been reported in // https://github.com/quotient-im/Quaternion/issues/566 - basically, - // when cleaning up childrent of a deleted Connection, there's a chance + // when cleaning up children of a deleted Connection, there's a chance // of pending jobs being abandoned, calling setStatus(Abandoned). // There's nothing wrong with this; however, the safety check for // cleartext access tokens below uses d->connection - which is a dangling // pointer. // To alleviate that, a stricter condition is applied, that for Abandoned // and to-be-Abandoned jobs the status message will be disregarded entirely. - // For 0.6 we might rectify the situation by making d->connection - // a QPointer<> (and derive ConnectionData from QObject, respectively). - if (d->status.code == Abandoned || s.code == Abandoned) - s.message.clear(); - + // We could rectify the situation by making d->connection a QPointer<> + // (and deriving ConnectionData from QObject, respectively) but it's + // a too edge case for the hassle. if (d->status == s) return; + if (d->status.code == Abandoned || s.code == Abandoned) + s.message.clear(); + if (!s.message.isEmpty() && d->connection && !d->connection->accessToken().isEmpty()) s.message.replace(d->connection->accessToken(), "(REDACTED)"); diff --git a/lib/jobs/basejob.h b/lib/jobs/basejob.h index fd7beca0..6c1b802c 100644 --- a/lib/jobs/basejob.h +++ b/lib/jobs/basejob.h @@ -24,6 +24,7 @@ #include <QtCore/QJsonDocument> #include <QtCore/QObject> #include <QtCore/QUrlQuery> +#include <QtCore/QMetaEnum> class QNetworkReply; class QSslError; @@ -33,28 +34,29 @@ class ConnectionData; enum class HttpVerb { Get, Put, Post, Delete }; -struct JobTimeoutConfig { - int jobTimeout; - int nextRetryInterval; -}; - class BaseJob : public QObject { Q_OBJECT Q_PROPERTY(QUrl requestUrl READ requestUrl CONSTANT) Q_PROPERTY(int maxRetries READ maxRetries WRITE setMaxRetries) public: + /*! The status code of a job + * + * Every job is created in Unprepared status; upon calling prepare() + * from Connection (if things are fine) it go to Pending status. After + * that, the next transition comes after the reply arrives and its contents + * are analysed. At any point in time the job can be abandon()ed, causing + * it to switch to status Abandoned for a brief period before deletion. + */ enum StatusCode { - NoError = 0 // To be compatible with Qt conventions - , Success = 0, + NoError = Success, // To be compatible with Qt conventions Pending = 1, - WarningLevel = 20, + WarningLevel = 20, //< Warnings have codes starting from this UnexpectedResponseType = 21, UnexpectedResponseTypeWarning = UnexpectedResponseType, - Abandoned = 50 //< A tiny period between abandoning and object deletion - , - ErrorLevel = 100 //< Errors have codes starting from this - , + Unprepared = 25, //< Initial job state is incomplete, hence warning level + Abandoned = 50, //< A tiny period between abandoning and object deletion + ErrorLevel = 100, //< Errors have codes starting from this NetworkError = 100, Timeout, TimeoutError = Timeout, @@ -64,10 +66,11 @@ public: IncorrectRequestError = IncorrectRequest, IncorrectResponse, IncorrectResponseError = IncorrectResponse, - JsonParseError //< deprecated; Use IncorrectResponse instead + JsonParseError //< \deprecated Use IncorrectResponse instead = IncorrectResponse, TooManyRequests, TooManyRequestsError = TooManyRequests, + RateLimited = TooManyRequests, RequestNotImplemented, RequestNotImplementedError = RequestNotImplemented, UnsupportedRoomVersion, @@ -80,6 +83,7 @@ public: UserDeactivated, UserDefinedError = 256 }; + Q_ENUM(StatusCode) /** * A simple wrapper around QUrlQuery that allows its creation from @@ -97,7 +101,7 @@ public: using Data = RequestData; - /** + /*! * This structure stores the status of a server call job. The status * consists of a code, that is described (but not delimited) by the * respective enum, and a freeform message. @@ -106,16 +110,16 @@ public: * along the lines of StatusCode, with additional values * starting at UserDefinedError */ - class Status { - public: + struct Status { Status(StatusCode c) : code(c) {} Status(int c, QString m) : code(c), message(std::move(m)) {} + static Status fromHttpCode(int httpCode, QString msg = {}); bool good() const { return code < ErrorLevel; } - friend QDebug operator<<(QDebug dbg, const Status& s) + QDebug dumpToLog(QDebug dbg) const; + friend QDebug operator<<(const QDebug& dbg, const Status& s) { - QDebugStateSaver _s(dbg); - return dbg.noquote().nospace() << s.code << ": " << s.message; + return s.dumpToLog(dbg); } bool operator==(const Status& other) const @@ -131,8 +135,6 @@ public: QString message; }; - using duration_t = int; // milliseconds - public: BaseJob(HttpVerb verb, const QString& name, const QString& endpoint, bool needsToken = true); @@ -144,13 +146,16 @@ public: /** Current status of the job */ Status status() const; + /** Short human-friendly message on the job status */ QString statusCaption() const; + /** Get raw response body as received from the server * \param bytesAtMost return this number of leftmost bytes, or -1 * to return the entire response */ QByteArray rawData(int bytesAtMost = -1) const; + /** Get UI-friendly sample of raw data * * This is almost the same as rawData but appends the "truncated" @@ -166,17 +171,24 @@ public: * \sa status */ int error() const; + /** Error-specific message, as returned by the server */ virtual QString errorString() const; + /** A URL to help/clarify the error, if provided by the server */ QUrl errorUrl() const; int maxRetries() const; void setMaxRetries(int newMaxRetries); - Q_INVOKABLE duration_t getCurrentTimeout() const; - Q_INVOKABLE duration_t getNextRetryInterval() const; - Q_INVOKABLE duration_t millisToRetry() const; + using duration_ms_t = std::chrono::milliseconds::rep; // normally int64_t + + std::chrono::seconds getCurrentTimeout() const; + Q_INVOKABLE duration_ms_t getCurrentTimeoutMs() const; + std::chrono::seconds getNextRetryInterval() const; + Q_INVOKABLE duration_ms_t getNextRetryMs() const; + std::chrono::milliseconds timeToRetry() const; + Q_INVOKABLE duration_ms_t millisToRetry() const; friend QDebug operator<<(QDebug dbg, const BaseJob* j) { @@ -184,7 +196,7 @@ public: } public slots: - void start(const ConnectionData* connData, bool inBackground = false); + void prepare(ConnectionData* connData, bool inBackground); /** * Abandons the result of this job, arrived or unarrived. @@ -197,10 +209,10 @@ public slots: signals: /** The job is about to send a network request */ - void aboutToStart(); + void aboutToSendRequest(); /** The job has sent a network request */ - void started(); + void sentRequest(); /** The job has changed its status */ void statusChanged(Status newStatus); @@ -213,7 +225,14 @@ signals: * @param inMilliseconds the interval after which the next attempt will be * taken */ - void retryScheduled(int nextAttempt, int inMilliseconds); + void retryScheduled(int nextAttempt, duration_ms_t inMilliseconds); + + /** + * The previous network request has been rate-limited; the next attempt + * will be queued and run sometime later. Since other jobs may already + * wait in the queue, it's not possible to predict the wait time. + */ + void rateLimited(); /** * Emitted when the job is finished, in any case. It is used to notify @@ -288,9 +307,20 @@ protected: static QUrl makeRequestUrl(QUrl baseUrl, const QString& path, const QUrlQuery& query = {}); - virtual void beforeStart(const ConnectionData* connData); - virtual void afterStart(const ConnectionData* connData, - QNetworkReply* reply); + /*! Prepares the job for execution + * + * This method is called no more than once per job lifecycle, + * when it's first scheduled for execution; in particular, it is not called + * on retries. + */ + virtual void doPrepare(); + /*! Postprocessing after the network request has been sent + * + * This method is called every time the job receives a running + * QNetworkReply object from NetworkAccessManager - basically, after + * successfully sending a network request (including retries). + */ + virtual void onSentRequest(QNetworkReply*); virtual void beforeAbandon(QNetworkReply*); /** @@ -332,8 +362,7 @@ protected: * @param reply the HTTP reply from the server * @param errorJson the JSON payload describing the error */ - virtual Status parseError(QNetworkReply* reply, - const QJsonObject& errorJson); + virtual Status parseError(QNetworkReply*, const QJsonObject& errorJson); void setStatus(Status s); void setStatus(int code, QString message); @@ -350,10 +379,12 @@ protected slots: void timeout(); private slots: - void sendRequest(bool inBackground); + void sendRequest(); void checkReply(); void gotReply(); + friend class ConnectionData; // to provide access to sendRequest() + private: void stop(); void finishJob(); diff --git a/lib/jobs/downloadfilejob.cpp b/lib/jobs/downloadfilejob.cpp index 3a03efde..4e997326 100644 --- a/lib/jobs/downloadfilejob.cpp +++ b/lib/jobs/downloadfilejob.cpp @@ -39,7 +39,7 @@ QString DownloadFileJob::targetFileName() const return (d->targetFile ? d->targetFile : d->tempFile)->fileName(); } -void DownloadFileJob::beforeStart(const ConnectionData*) +void DownloadFileJob::doPrepare() { if (d->targetFile && !d->targetFile->isReadable() && !d->targetFile->open(QIODevice::WriteOnly)) { @@ -57,7 +57,7 @@ void DownloadFileJob::beforeStart(const ConnectionData*) qCDebug(JOBS) << "Downloading to" << d->tempFile->fileName(); } -void DownloadFileJob::afterStart(const ConnectionData*, QNetworkReply* reply) +void DownloadFileJob::onSentRequest(QNetworkReply* reply) { connect(reply, &QNetworkReply::metaDataChanged, this, [this, reply] { if (!status().good()) diff --git a/lib/jobs/downloadfilejob.h b/lib/jobs/downloadfilejob.h index fa697219..b7d2d75b 100644 --- a/lib/jobs/downloadfilejob.h +++ b/lib/jobs/downloadfilejob.h @@ -19,8 +19,8 @@ private: class Private; QScopedPointer<Private> d; - void beforeStart(const ConnectionData*) override; - void afterStart(const ConnectionData*, QNetworkReply* reply) override; + void doPrepare() override; + void onSentRequest(QNetworkReply* reply) override; void beforeAbandon(QNetworkReply*) override; Status parseReply(QNetworkReply*) override; }; diff --git a/lib/room.cpp b/lib/room.cpp index d47af49f..2c9fca63 100644 --- a/lib/room.cpp +++ b/lib/room.cpp @@ -1440,7 +1440,7 @@ QString Room::Private::doSendEvent(const RoomEvent* pEvent) connection->callApi<SendMessageJob>(BackgroundRequest, id, pEvent->matrixType(), txnId, pEvent->contentJson())) { - Room::connect(call, &BaseJob::started, q, [this, txnId] { + Room::connect(call, &BaseJob::sentRequest, q, [this, txnId] { auto it = q->findPendingEvent(txnId); if (it == unsyncedEvents.end()) { qWarning(EVENTS) << "Pending event for transaction" << txnId |