diff options
Diffstat (limited to 'lib/connectiondata.cpp')
-rw-r--r-- | lib/connectiondata.cpp | 112 |
1 files changed, 92 insertions, 20 deletions
diff --git a/lib/connectiondata.cpp b/lib/connectiondata.cpp index 91cda09f..d57363d0 100644 --- a/lib/connectiondata.cpp +++ b/lib/connectiondata.cpp @@ -13,45 +13,109 @@ * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include "connectiondata.h" -#include "networkaccessmanager.h" #include "logging.h" +#include "networkaccessmanager.h" +#include "jobs/basejob.h" -using namespace QMatrixClient; +#include <QtCore/QTimer> +#include <QtCore/QPointer> -struct ConnectionData::Private -{ - explicit Private(QUrl url) : baseUrl(std::move(url)) { } +#include <array> +#include <queue> + +using namespace Quotient; + +class ConnectionData::Private { +public: + explicit Private(QUrl url) : baseUrl(std::move(url)) + { + rateLimiter.setSingleShot(true); + } QUrl baseUrl; QByteArray accessToken; QString lastEvent; + QString userId; QString deviceId; + std::vector<QString> needToken; mutable unsigned int txnCounter = 0; - const qint64 id = QDateTime::currentMSecsSinceEpoch(); + const qint64 txnBase = QDateTime::currentMSecsSinceEpoch(); + + QString id() const { return userId + '/' + deviceId; } + + using job_queue_t = std::queue<QPointer<BaseJob>>; + std::array<job_queue_t, 2> jobs; // 0 - foreground, 1 - background + QTimer rateLimiter; }; ConnectionData::ConnectionData(QUrl baseUrl) : d(std::make_unique<Private>(std::move(baseUrl))) -{ } +{ + // Each lambda invocation below takes no more than one job from the + // queues (first foreground, then background) and resumes it; then + // restarts the rate limiter timer with duration 0, effectively yielding + // to the event loop and then resuming until both queues are empty. + QObject::connect(&d->rateLimiter, &QTimer::timeout, [this] { + // TODO: Consider moving out all job->sendRequest() invocations to + // a dedicated thread + d->rateLimiter.setInterval(0); + for (auto& q : d->jobs) + while (!q.empty()) { + const auto job = q.front(); + q.pop(); + if (!job || job->error() == BaseJob::Abandoned) + continue; + if (job->error() != BaseJob::Pending) { + qCCritical(MAIN) + << "Job" << job + << "is in the wrong status:" << job->status(); + Q_ASSERT(false); + job->setStatus(BaseJob::Pending); + } + job->sendRequest(); + d->rateLimiter.start(); + return; + } + qCDebug(MAIN) << d->id() << "job queues are empty"; + }); +} -ConnectionData::~ConnectionData() = default; +ConnectionData::~ConnectionData() +{ + d->rateLimiter.disconnect(); + d->rateLimiter.stop(); +} -QByteArray ConnectionData::accessToken() const +void ConnectionData::submit(BaseJob* job) { - return d->accessToken; + job->setStatus(BaseJob::Pending); + if (!d->rateLimiter.isActive()) { + QTimer::singleShot(0, job, &BaseJob::sendRequest); + return; + } + d->jobs[size_t(job->isBackground())].emplace(job); + qCDebug(MAIN) << job << "queued," << d->jobs.front().size() << "+" + << d->jobs.back().size() << "total jobs in" << d->id() + << "queues"; } -QUrl ConnectionData::baseUrl() const +void ConnectionData::limitRate(std::chrono::milliseconds nextCallAfter) { - return d->baseUrl; + qCDebug(MAIN) << "Jobs for" << (d->userId + "/" + d->deviceId) + << "suspended for" << nextCallAfter.count() << "ms"; + d->rateLimiter.start(nextCallAfter); } +QByteArray ConnectionData::accessToken() const { return d->accessToken; } + +QUrl ConnectionData::baseUrl() const { return d->baseUrl; } + QNetworkAccessManager* ConnectionData::nam() const { return NetworkAccessManager::instance(); @@ -80,22 +144,30 @@ void ConnectionData::setPort(int port) qCDebug(MAIN) << "updated baseUrl to" << d->baseUrl; } -const QString& ConnectionData::deviceId() const +const QString& ConnectionData::deviceId() const { return d->deviceId; } + +const QString& ConnectionData::userId() const { return d->userId; } + +bool ConnectionData::needsToken(const QString& requestName) const { - return d->deviceId; + return std::find(d->needToken.cbegin(), d->needToken.cend(), requestName) + != d->needToken.cend(); } void ConnectionData::setDeviceId(const QString& deviceId) { d->deviceId = deviceId; - qCDebug(MAIN) << "updated deviceId to" << d->deviceId; } -QString ConnectionData::lastEvent() const +void ConnectionData::setUserId(const QString& userId) { d->userId = userId; } + +void ConnectionData::setNeedsToken(const QString& requestName) { - return d->lastEvent; + d->needToken.push_back(requestName); } +QString ConnectionData::lastEvent() const { return d->lastEvent; } + void ConnectionData::setLastEvent(QString identifier) { d->lastEvent = std::move(identifier); @@ -103,6 +175,6 @@ void ConnectionData::setLastEvent(QString identifier) QByteArray ConnectionData::generateTxnId() const { - return QByteArray::number(d->id) + 'q' + - QByteArray::number(++d->txnCounter); + return d->deviceId.toLatin1() + QByteArray::number(d->txnBase) + + QByteArray::number(++d->txnCounter); } |