diff options
-rw-r--r-- | lib/connection.cpp | 2 | ||||
-rw-r--r-- | lib/connectiondata.cpp | 70 | ||||
-rw-r--r-- | lib/connectiondata.h | 6 | ||||
-rw-r--r-- | lib/jobs/basejob.cpp | 159 | ||||
-rw-r--r-- | lib/jobs/basejob.h | 63 | ||||
-rw-r--r-- | lib/jobs/downloadfilejob.cpp | 4 | ||||
-rw-r--r-- | lib/jobs/downloadfilejob.h | 4 | ||||
-rw-r--r-- | lib/room.cpp | 2 |
8 files changed, 212 insertions, 98 deletions
diff --git a/lib/connection.cpp b/lib/connection.cpp index 5f90ed55..5ebdcf6c 100644 --- a/lib/connection.cpp +++ b/lib/connection.cpp @@ -1358,7 +1358,7 @@ void Connection::setLazyLoading(bool newValue) void Connection::run(BaseJob* job, RunningPolicy runningPolicy) const { connect(job, &BaseJob::failure, this, &Connection::requestFailed); - job->start(d->data.get(), runningPolicy & BackgroundRequest); + job->prepare(d->data.get(), runningPolicy & BackgroundRequest); d->data->submit(job); } diff --git a/lib/connectiondata.cpp b/lib/connectiondata.cpp index 41d97b87..e241e376 100644 --- a/lib/connectiondata.cpp +++ b/lib/connectiondata.cpp @@ -20,11 +20,21 @@ #include "logging.h" #include "networkaccessmanager.h" +#include "jobs/basejob.h" + +#include <QtCore/QTimer> +#include <QtCore/QPointer> + +#include <queue> using namespace Quotient; -struct ConnectionData::Private { - explicit Private(QUrl url) : baseUrl(std::move(url)) {} +class ConnectionData::Private { +public: + explicit Private(QUrl url) : baseUrl(std::move(url)) + { + rateLimiter.setSingleShot(true); + } QUrl baseUrl; QByteArray accessToken; @@ -34,14 +44,68 @@ struct ConnectionData::Private { mutable unsigned int txnCounter = 0; const qint64 txnBase = QDateTime::currentMSecsSinceEpoch(); + + QString id() const { return userId + '/' + deviceId; } + + using job_queue_t = std::queue<QPointer<BaseJob>>; + std::array<job_queue_t, 2> jobs; // 0 - foreground, 1 - background + QTimer rateLimiter; }; ConnectionData::ConnectionData(QUrl baseUrl) : d(std::make_unique<Private>(std::move(baseUrl))) -{} +{ + // Each lambda invocation below takes no more than one job from the + // queues (first foreground, then background) and resumes it; then + // restarts the rate limiter timer with duration 0, effectively yielding + // to the event loop and then resuming until both queues are empty. + QObject::connect(&d->rateLimiter, &QTimer::timeout, [this] { + // TODO: Consider moving out all job->sendRequest() invocations to + // a dedicated thread + d->rateLimiter.setInterval(0); + for (auto& q : d->jobs) + while (!q.empty()) { + auto& job = q.front(); + q.pop(); + if (!job || job->error() == BaseJob::Abandoned) + continue; + if (job->error() != BaseJob::Pending) { + qCCritical(MAIN) + << "Job" << job + << "is in the wrong status:" << job->status(); + Q_ASSERT(false); + job->setStatus(BaseJob::Pending); + } + job->sendRequest(); + d->rateLimiter.start(); + return; + } + qCDebug(MAIN) << d->id() << "job queues are empty"; + }); +} ConnectionData::~ConnectionData() = default; +void ConnectionData::submit(BaseJob* job) +{ + Q_ASSERT(job->error() == BaseJob::Pending); + if (!d->rateLimiter.isActive()) { + job->sendRequest(); + return; + } + d->jobs[size_t(job->isBackground())].emplace(job); + qCDebug(MAIN) << job << "queued," << d->jobs.front().size() << "+" + << d->jobs.back().size() << "total jobs in" << d->id() + << "queues"; +} + +void ConnectionData::limitRate(std::chrono::milliseconds nextCallAfter) +{ + qCDebug(MAIN) << "Jobs for" << (d->userId + "/" + d->deviceId) + << "suspended for" << nextCallAfter.count() << "ms"; + d->rateLimiter.start(nextCallAfter); +} + QByteArray ConnectionData::accessToken() const { return d->accessToken; } QUrl ConnectionData::baseUrl() const { return d->baseUrl; } diff --git a/lib/connectiondata.h b/lib/connectiondata.h index 561893df..5cd7c3c7 100644 --- a/lib/connectiondata.h +++ b/lib/connectiondata.h @@ -21,15 +21,21 @@ #include <QtCore/QUrl> #include <memory> +#include <chrono> class QNetworkAccessManager; namespace Quotient { +class BaseJob; + class ConnectionData { public: explicit ConnectionData(QUrl baseUrl); virtual ~ConnectionData(); + void submit(BaseJob* job); + void limitRate(std::chrono::milliseconds nextCallAfter); + QByteArray accessToken() const; QUrl baseUrl() const; const QString& deviceId() const; diff --git a/lib/jobs/basejob.cpp b/lib/jobs/basejob.cpp index 621762be..0a17431c 100644 --- a/lib/jobs/basejob.cpp +++ b/lib/jobs/basejob.cpp @@ -31,6 +31,8 @@ #include <array> using namespace Quotient; +using std::chrono::seconds, std::chrono::milliseconds; +using std::chrono_literals::operator""s; struct NetworkReplyDeleter : public QScopedPointerDeleteLater { static inline void cleanup(QNetworkReply* reply) @@ -43,6 +45,11 @@ struct NetworkReplyDeleter : public QScopedPointerDeleteLater { class BaseJob::Private { public: + struct JobTimeoutConfig { + seconds jobTimeout; + seconds nextRetryInterval; + }; + // Using an idiom from clang-tidy: // http://clang.llvm.org/extra/clang-tidy/checks/modernize-pass-by-value.html Private(HttpVerb v, QString endpoint, const QUrlQuery& q, Data&& data, @@ -52,12 +59,14 @@ public: , requestQuery(q) , requestData(std::move(data)) , needsToken(nt) - {} + { + timer.setSingleShot(true); + retryTimer.setSingleShot(true); + } - void sendRequest(bool inBackground); - const JobTimeoutConfig& getCurrentTimeoutConfig() const; + void sendRequest(); - const ConnectionData* connection = nullptr; + ConnectionData* connection = nullptr; // Contents for the network request HttpVerb verb; @@ -67,13 +76,15 @@ public: Data requestData; bool needsToken; + bool inBackground = false; + // There's no use of QMimeType here because we don't want to match // content types against the known MIME type hierarchy; and at the same // type QMimeType is of little help with MIME type globs (`text/*` etc.) - QByteArrayList expectedContentTypes; + QByteArrayList expectedContentTypes { "application/json" }; QScopedPointer<QNetworkReply, NetworkReplyDeleter> reply; - Status status = Pending; + Status status = Unprepared; QByteArray rawResponse; QUrl errorUrl; //< May contain a URL to help with some errors @@ -82,12 +93,18 @@ public: QTimer timer; QTimer retryTimer; - QVector<JobTimeoutConfig> errorStrategy = { { 90, 5 }, - { 90, 10 }, - { 120, 30 } }; - int maxRetries = errorStrategy.size(); + static constexpr std::array<const JobTimeoutConfig, 3> errorStrategy { + { { 90s, 5s }, { 90s, 10s }, { 120s, 30s } } + }; + int maxRetries = int(errorStrategy.size()); int retriesTaken = 0; + const JobTimeoutConfig& getCurrentTimeoutConfig() const + { + return errorStrategy[std::min(size_t(retriesTaken), + errorStrategy.size() - 1)]; + } + QString urlForLog() const { return reply @@ -106,9 +123,11 @@ BaseJob::BaseJob(HttpVerb verb, const QString& name, const QString& endpoint, : d(new Private(verb, endpoint, query, std::move(data), needsToken)) { setObjectName(name); - setExpectedContentTypes({ "application/json" }); - d->timer.setSingleShot(true); connect(&d->timer, &QTimer::timeout, this, &BaseJob::timeout); + connect(&d->retryTimer, &QTimer::timeout, this, [this] { + setStatus(Pending); + sendRequest(); + }); } BaseJob::~BaseJob() @@ -124,10 +143,7 @@ QUrl BaseJob::requestUrl() const bool BaseJob::isBackground() const { - return d->reply - && d->reply->request() - .attribute(QNetworkRequest::BackgroundRequestAttribute) - .toBool(); + return d->inBackground; } const QString& BaseJob::apiEndpoint() const { return d->apiEndpoint; } @@ -191,7 +207,7 @@ QUrl BaseJob::makeRequestUrl(QUrl baseUrl, const QString& path, return baseUrl; } -void BaseJob::Private::sendRequest(bool inBackground) +void BaseJob::Private::sendRequest() { QNetworkRequest req { makeRequestUrl(connection->baseUrl(), apiEndpoint, requestQuery) }; @@ -223,36 +239,31 @@ void BaseJob::Private::sendRequest(bool inBackground) } } -void BaseJob::beforeStart(const ConnectionData*) {} +void BaseJob::doPrepare() {} -void BaseJob::afterStart(const ConnectionData*, QNetworkReply*) {} +void BaseJob::onSentRequest(QNetworkReply*) {} void BaseJob::beforeAbandon(QNetworkReply*) {} -void BaseJob::start(const ConnectionData* connData, bool inBackground) +void BaseJob::prepare(ConnectionData* connData, bool inBackground) { + d->inBackground = inBackground; d->connection = connData; - d->retryTimer.setSingleShot(true); - connect(&d->retryTimer, &QTimer::timeout, this, - [this, inBackground] { sendRequest(inBackground); }); - - beforeStart(connData); - if (status().good()) - sendRequest(inBackground); - if (status().good()) - afterStart(connData, d->reply.data()); - if (!status().good()) + doPrepare(); + if (status().code != Unprepared && status().code != Pending) QTimer::singleShot(0, this, &BaseJob::finishJob); + setStatus(Pending); } -void BaseJob::sendRequest(bool inBackground) +void BaseJob::sendRequest() { - emit aboutToStart(); - d->retryTimer.stop(); // In case we were counting down at the moment - qCDebug(d->logCat) << this << "sending request to" << d->apiEndpoint; - if (!d->requestQuery.isEmpty()) - qCDebug(d->logCat) << " query:" << d->requestQuery.toString(); - d->sendRequest(inBackground); + if (status().code == Abandoned) + return; + Q_ASSERT(d->connection && status().code == Pending); + qCDebug(d->logCat) << "Making request to" << d->urlForLog(); + emit aboutToSendRequest(); + d->sendRequest(); + Q_ASSERT(d->reply); connect(d->reply.data(), &QNetworkReply::finished, this, &BaseJob::gotReply); if (d->reply->isRunning()) { connect(d->reply.data(), &QNetworkReply::metaDataChanged, this, @@ -262,10 +273,12 @@ void BaseJob::sendRequest(bool inBackground) connect(d->reply.data(), &QNetworkReply::downloadProgress, this, &BaseJob::downloadProgress); d->timer.start(getCurrentTimeout()); - qCDebug(d->logCat) << this << "request has been sent"; - emit started(); + qCInfo(d->logCat).noquote() << "Request sent to" << d->urlForLog(); + onSentRequest(d->reply.data()); + emit sentRequest(); } else - qCWarning(d->logCat) << this << "request could not start"; + qCWarning(d->logCat).noquote() + << "Request could not start:" << d->urlForLog(); } void BaseJob::checkReply() { setStatus(doCheckReply(d->reply.data())); } @@ -286,13 +299,7 @@ void BaseJob::gotReply() parseError(d->reply.data(), QJsonDocument::fromJson(d->rawResponse).object())); } - - if (status().code != TooManyRequestsError) - finishJob(); - else { - stop(); - emit retryScheduled(d->retriesTaken, d->retryTimer.interval()); - } + finishJob(); } bool checkContentType(const QByteArray& type, const QByteArrayList& patterns) @@ -416,14 +423,13 @@ BaseJob::Status BaseJob::parseError(QNetworkReply* /*reply*/, const auto errCode = errorJson.value("errcode"_ls).toString(); if (error() == TooManyRequestsError || errCode == "M_LIMIT_EXCEEDED") { QString msg = tr("Too many requests"); - auto retryInterval = errorJson.value("retry_after_ms"_ls).toInt(-1); - if (retryInterval != -1) - msg += tr(", next retry advised after %1 ms").arg(retryInterval); + int64_t retryAfterMs = errorJson.value("retry_after_ms"_ls).toInt(-1); + if (retryAfterMs >= 0) + msg += tr(", next retry advised after %1 ms").arg(retryAfterMs); else // We still have to figure some reasonable interval - retryInterval = getNextRetryInterval(); + retryAfterMs = getNextRetryMs(); - qCWarning(d->logCat) << this << "will retry in" << retryInterval << "ms"; - d->retryTimer.start(retryInterval); + d->connection->limitRate(milliseconds(retryAfterMs)); return { TooManyRequestsError, msg }; } @@ -470,19 +476,23 @@ void BaseJob::stop() void BaseJob::finishJob() { stop(); - if ((error() == NetworkError || error() == TimeoutError) + if (error() == TooManyRequests) { + emit rateLimited(); + setStatus(Pending); + d->connection->submit(this); + return; + } + if ((error() == NetworkError || error() == Timeout) && d->retriesTaken < d->maxRetries) { - // TODO: The whole retrying thing should be put to ConnectionManager + // TODO: The whole retrying thing should be put to Connection(Manager) // otherwise independently retrying jobs make a bit of notification // storm towards the UI. - const auto retryInterval = error() == TimeoutError - ? 0 - : getNextRetryInterval(); + const seconds retryIn = error() == Timeout ? 0s : getNextRetryInterval(); ++d->retriesTaken; qCWarning(d->logCat).nospace() << this << ": retry #" << d->retriesTaken - << " in " << retryInterval / 1000 << " s"; - d->retryTimer.start(retryInterval); - emit retryScheduled(d->retriesTaken, retryInterval); + << " in " << retryIn.count() << " s"; + d->retryTimer.start(retryIn); + emit retryScheduled(d->retriesTaken, milliseconds(retryIn).count()); return; } @@ -498,24 +508,35 @@ void BaseJob::finishJob() deleteLater(); } -const JobTimeoutConfig& BaseJob::Private::getCurrentTimeoutConfig() const +seconds BaseJob::getCurrentTimeout() const +{ + return d->getCurrentTimeoutConfig().jobTimeout; +} + +BaseJob::duration_ms_t BaseJob::getCurrentTimeoutMs() const +{ + return milliseconds(getCurrentTimeout()).count(); +} + +seconds BaseJob::getNextRetryInterval() const { - return errorStrategy[std::min(retriesTaken, errorStrategy.size() - 1)]; + return d->getCurrentTimeoutConfig().nextRetryInterval; } -BaseJob::duration_t BaseJob::getCurrentTimeout() const +BaseJob::duration_ms_t BaseJob::getNextRetryMs() const { - return d->getCurrentTimeoutConfig().jobTimeout * 1000; + return milliseconds(getNextRetryInterval()).count(); } -BaseJob::duration_t BaseJob::getNextRetryInterval() const +milliseconds BaseJob::timeToRetry() const { - return d->getCurrentTimeoutConfig().nextRetryInterval * 1000; + return d->retryTimer.isActive() ? d->retryTimer.remainingTimeAsDuration() + : 0s; } -BaseJob::duration_t BaseJob::millisToRetry() const +BaseJob::duration_ms_t BaseJob::millisToRetry() const { - return d->retryTimer.isActive() ? d->retryTimer.remainingTime() : 0; + return timeToRetry().count(); } int BaseJob::maxRetries() const { return d->maxRetries; } diff --git a/lib/jobs/basejob.h b/lib/jobs/basejob.h index 9de7b49d..4dc287f8 100644 --- a/lib/jobs/basejob.h +++ b/lib/jobs/basejob.h @@ -34,11 +34,6 @@ class ConnectionData; enum class HttpVerb { Get, Put, Post, Delete }; -struct JobTimeoutConfig { - int jobTimeout; - int nextRetryInterval; -}; - class BaseJob : public QObject { Q_OBJECT Q_PROPERTY(QUrl requestUrl READ requestUrl CONSTANT) @@ -59,6 +54,7 @@ public: WarningLevel = 20, //< Warnings have codes starting from this UnexpectedResponseType = 21, UnexpectedResponseTypeWarning = UnexpectedResponseType, + Unprepared = 25, //< Initial job state is incomplete, hence warning level Abandoned = 50, //< A tiny period between abandoning and object deletion ErrorLevel = 100, //< Errors have codes starting from this NetworkError = 100, @@ -140,8 +136,6 @@ public: QString message; }; - using duration_t = int; // milliseconds - public: BaseJob(HttpVerb verb, const QString& name, const QString& endpoint, bool needsToken = true); @@ -153,13 +147,16 @@ public: /** Current status of the job */ Status status() const; + /** Short human-friendly message on the job status */ QString statusCaption() const; + /** Get raw response body as received from the server * \param bytesAtMost return this number of leftmost bytes, or -1 * to return the entire response */ QByteArray rawData(int bytesAtMost = -1) const; + /** Get UI-friendly sample of raw data * * This is almost the same as rawData but appends the "truncated" @@ -175,17 +172,24 @@ public: * \sa status */ int error() const; + /** Error-specific message, as returned by the server */ virtual QString errorString() const; + /** A URL to help/clarify the error, if provided by the server */ QUrl errorUrl() const; int maxRetries() const; void setMaxRetries(int newMaxRetries); - Q_INVOKABLE duration_t getCurrentTimeout() const; - Q_INVOKABLE duration_t getNextRetryInterval() const; - Q_INVOKABLE duration_t millisToRetry() const; + using duration_ms_t = std::chrono::milliseconds::rep; // normally int64_t + + std::chrono::seconds getCurrentTimeout() const; + Q_INVOKABLE duration_ms_t getCurrentTimeoutMs() const; + std::chrono::seconds getNextRetryInterval() const; + Q_INVOKABLE duration_ms_t getNextRetryMs() const; + std::chrono::milliseconds timeToRetry() const; + Q_INVOKABLE duration_ms_t millisToRetry() const; friend QDebug operator<<(QDebug dbg, const BaseJob* j) { @@ -193,7 +197,7 @@ public: } public slots: - void start(const ConnectionData* connData, bool inBackground = false); + void prepare(ConnectionData* connData, bool inBackground); /** * Abandons the result of this job, arrived or unarrived. @@ -206,10 +210,10 @@ public slots: signals: /** The job is about to send a network request */ - void aboutToStart(); + void aboutToSendRequest(); /** The job has sent a network request */ - void started(); + void sentRequest(); /** The job has changed its status */ void statusChanged(Status newStatus); @@ -222,7 +226,14 @@ signals: * @param inMilliseconds the interval after which the next attempt will be * taken */ - void retryScheduled(int nextAttempt, int inMilliseconds); + void retryScheduled(int nextAttempt, duration_ms_t inMilliseconds); + + /** + * The previous network request has been rate-limited; the next attempt + * will be queued and run sometime later. Since other jobs may already + * wait in the queue, it's not possible to predict the wait time. + */ + void rateLimited(); /** * Emitted when the job is finished, in any case. It is used to notify @@ -297,9 +308,20 @@ protected: static QUrl makeRequestUrl(QUrl baseUrl, const QString& path, const QUrlQuery& query = {}); - virtual void beforeStart(const ConnectionData* connData); - virtual void afterStart(const ConnectionData* connData, - QNetworkReply* reply); + /*! Prepares the job for execution + * + * This method is called no more than once per job lifecycle, + * when it's first scheduled for execution; in particular, it is not called + * on retries. + */ + virtual void doPrepare(); + /*! Postprocessing after the network request has been sent + * + * This method is called every time the job receives a running + * QNetworkReply object from NetworkAccessManager - basically, after + * successfully sending a network request (including retries). + */ + virtual void onSentRequest(QNetworkReply*); virtual void beforeAbandon(QNetworkReply*); /** @@ -341,8 +363,7 @@ protected: * @param reply the HTTP reply from the server * @param errorJson the JSON payload describing the error */ - virtual Status parseError(QNetworkReply* reply, - const QJsonObject& errorJson); + virtual Status parseError(QNetworkReply*, const QJsonObject& errorJson); void setStatus(Status s); void setStatus(int code, QString message); @@ -359,10 +380,12 @@ protected slots: void timeout(); private slots: - void sendRequest(bool inBackground); + void sendRequest(); void checkReply(); void gotReply(); + friend class ConnectionData; // to provide access to sendRequest() + private: void stop(); void finishJob(); diff --git a/lib/jobs/downloadfilejob.cpp b/lib/jobs/downloadfilejob.cpp index 3a03efde..4e997326 100644 --- a/lib/jobs/downloadfilejob.cpp +++ b/lib/jobs/downloadfilejob.cpp @@ -39,7 +39,7 @@ QString DownloadFileJob::targetFileName() const return (d->targetFile ? d->targetFile : d->tempFile)->fileName(); } -void DownloadFileJob::beforeStart(const ConnectionData*) +void DownloadFileJob::doPrepare() { if (d->targetFile && !d->targetFile->isReadable() && !d->targetFile->open(QIODevice::WriteOnly)) { @@ -57,7 +57,7 @@ void DownloadFileJob::beforeStart(const ConnectionData*) qCDebug(JOBS) << "Downloading to" << d->tempFile->fileName(); } -void DownloadFileJob::afterStart(const ConnectionData*, QNetworkReply* reply) +void DownloadFileJob::onSentRequest(QNetworkReply* reply) { connect(reply, &QNetworkReply::metaDataChanged, this, [this, reply] { if (!status().good()) diff --git a/lib/jobs/downloadfilejob.h b/lib/jobs/downloadfilejob.h index fa697219..b7d2d75b 100644 --- a/lib/jobs/downloadfilejob.h +++ b/lib/jobs/downloadfilejob.h @@ -19,8 +19,8 @@ private: class Private; QScopedPointer<Private> d; - void beforeStart(const ConnectionData*) override; - void afterStart(const ConnectionData*, QNetworkReply* reply) override; + void doPrepare() override; + void onSentRequest(QNetworkReply* reply) override; void beforeAbandon(QNetworkReply*) override; Status parseReply(QNetworkReply*) override; }; diff --git a/lib/room.cpp b/lib/room.cpp index 2e2f73c4..b0829252 100644 --- a/lib/room.cpp +++ b/lib/room.cpp @@ -1440,7 +1440,7 @@ QString Room::Private::doSendEvent(const RoomEvent* pEvent) connection->callApi<SendMessageJob>(BackgroundRequest, id, pEvent->matrixType(), txnId, pEvent->contentJson())) { - Room::connect(call, &BaseJob::started, q, [this, txnId] { + Room::connect(call, &BaseJob::sentRequest, q, [this, txnId] { auto it = q->findPendingEvent(txnId); if (it == unsyncedEvents.end()) { qWarning(EVENTS) << "Pending event for transaction" << txnId |