From ae8fb14f9e4e1eb360822700aa897d59689be79b Mon Sep 17 00:00:00 2001 From: John Preston Date: Fri, 6 Dec 2019 10:05:38 +0300 Subject: [PATCH] Use 8 seconds timeout for request duration. --- .../storage/download_manager_mtproto.cpp | 157 +++++++++++++----- .../storage/download_manager_mtproto.h | 16 +- 2 files changed, 125 insertions(+), 48 deletions(-) diff --git a/Telegram/SourceFiles/storage/download_manager_mtproto.cpp b/Telegram/SourceFiles/storage/download_manager_mtproto.cpp index cf66cfb4a..44e716a94 100644 --- a/Telegram/SourceFiles/storage/download_manager_mtproto.cpp +++ b/Telegram/SourceFiles/storage/download_manager_mtproto.cpp @@ -27,11 +27,9 @@ constexpr auto kRetryAddSessionTimeout = 8 * crl::time(1000); constexpr auto kRetryAddSessionSuccesses = 3; constexpr auto kMaxTrackedSuccesses = kRetryAddSessionSuccesses * kMaxTrackedSessionRemoves; -constexpr auto kRemoveSessionAfterTimeouts = 2; +constexpr auto kRemoveSessionAfterTimeouts = 4; constexpr auto kResetDownloadPrioritiesTimeout = crl::time(200); -constexpr auto kGrowMaxWaitedDurationThreshold = crl::time(500); -constexpr auto kGrowSessionsDurationThreshold = crl::time(500); -constexpr auto kBadRequestDurationThreshold = crl::time(2000); +constexpr auto kBadRequestDurationThreshold = 8 * crl::time(1000); // Each (session remove by timeouts) we wait for time: // kRetryAddSessionTimeout * max(removesCount, kMaxTrackedSessionRemoves) @@ -80,6 +78,13 @@ auto DownloadManagerMtproto::Queue::nextTask() const -> Task* { return (i != all.end()) ? i->get() : nullptr; } +void DownloadManagerMtproto::Queue::removeSession(int index) { + auto &&all = ranges::view::concat(_tasks, _previousGeneration); + for (const auto task : all) { + task->removeSession(index); + } +} + DownloadManagerMtproto::DcSessionBalanceData::DcSessionBalanceData() : maxWaitedAmount(kStartWaitedInSession) { } @@ -166,14 +171,14 @@ bool DownloadManagerMtproto::trySendNextPart(MTP::DcId dcId, Queue &queue) { return false; } -void DownloadManagerMtproto::changeRequestedAmount( +int DownloadManagerMtproto::changeRequestedAmount( MTP::DcId dcId, int index, int delta) { const auto i = _balanceData.find(dcId); Assert(i != _balanceData.end()); Assert(index < i->second.sessions.size()); - i->second.sessions[index].requested += delta; + const auto result = (i->second.sessions[index].requested += delta); const auto findNonEmptySession = [](const DcBalanceData &data) { using namespace rpl::mappers; return ranges::find_if( @@ -186,44 +191,53 @@ void DownloadManagerMtproto::changeRequestedAmount( } else if (findNonEmptySession(i->second) == end(i->second.sessions)) { killSessionsSchedule(dcId); } + return result; } void DownloadManagerMtproto::requestSucceeded( MTP::DcId dcId, int index, - crl::time duration) { + int amountAtRequestStart, + crl::time timeAtRequestStart) { using namespace rpl::mappers; - DEBUG_LOG(("Download (%1,%2) request done, duration %3." - ).arg(dcId - ).arg(index - ).arg(duration)); + const auto guard = gsl::finally([&] { + checkSendNext(dcId, _queues[dcId]); + }); + const auto i = _balanceData.find(dcId); Assert(i != end(_balanceData)); auto &dc = i->second; Assert(index < dc.sessions.size()); auto &data = dc.sessions[index]; - const auto guard = gsl::finally([&] { - checkSendNext(dcId, _queues[dcId]); - }); - const auto parts = data.maxWaitedAmount / kDownloadPartSize; - if (duration < kGrowMaxWaitedDurationThreshold * parts) { - if (data.maxWaitedAmount < kMaxWaitedInSession) { - data.maxWaitedAmount = std::min( - data.maxWaitedAmount + kDownloadPartSize, - kMaxWaitedInSession); - DEBUG_LOG(("Download (%1,%2) increased max waited amount %3." - ).arg(dcId - ).arg(index - ).arg(data.maxWaitedAmount)); - } + const auto overloaded = (timeAtRequestStart <= dc.lastSessionRemove) + || (amountAtRequestStart > data.maxWaitedAmount); + const auto parts = amountAtRequestStart / kDownloadPartSize; + const auto duration = (crl::now() - timeAtRequestStart); + DEBUG_LOG(("Download (%1,%2) request done, duration: %3, parts: %4%5" + ).arg(dcId + ).arg(index + ).arg(duration + ).arg(parts + ).arg(overloaded ? " (overloaded)" : "")); + if (overloaded) { + return; } - if (duration >= kBadRequestDurationThreshold * parts) { + + if (duration >= kBadRequestDurationThreshold) { DEBUG_LOG(("Duration too large, signaling time out.")); sessionTimedOut(dcId, index); return; - } else if (duration >= kGrowSessionsDurationThreshold * parts) { - return; + } + if (amountAtRequestStart == data.maxWaitedAmount + && data.maxWaitedAmount < kMaxWaitedInSession) { + data.maxWaitedAmount = std::min( + data.maxWaitedAmount + kDownloadPartSize, + kMaxWaitedInSession); + DEBUG_LOG(("Download (%1,%2) increased max waited amount %3." + ).arg(dcId + ).arg(index + ).arg(data.maxWaitedAmount)); } data.successes = std::min(data.successes + 1, kMaxTrackedSuccesses); const auto notEnough = ranges::find_if( @@ -247,10 +261,22 @@ void DownloadManagerMtproto::requestSucceeded( if (dc.lastSessionRemove && now < dc.lastSessionRemove + delay) { return; } - DEBUG_LOG(("Download (%1,%2) added session." - ).arg(dcId - ).arg(dc.sessions.size())); dc.sessions.emplace_back(); + DEBUG_LOG(("Download (%1,%2) adding, now sessions: %3" + ).arg(dcId + ).arg(dc.sessions.size() - 1 + ).arg(dc.sessions.size())); +} + +int DownloadManagerMtproto::chooseSessionIndex(MTP::DcId dcId) const { + const auto i = _balanceData.find(dcId); + Assert(i != end(_balanceData)); + const auto &sessions = i->second.sessions; + const auto j = ranges::min_element( + sessions, + ranges::less(), + &DcSessionBalanceData::requested); + return (j - begin(sessions)); } void DownloadManagerMtproto::sessionTimedOut(MTP::DcId dcId, int index) { @@ -278,7 +304,10 @@ void DownloadManagerMtproto::removeSession(MTP::DcId dcId) { auto &dc = _balanceData[dcId]; Assert(dc.sessions.size() > kStartSessionsCount); const auto index = int(dc.sessions.size() - 1); - DEBUG_LOG(("Download (%1,%2) removing session.").arg(dcId).arg(index)); + DEBUG_LOG(("Download (%1,%2) removing, now sessions: %3" + ).arg(dcId + ).arg(index + ).arg(index)); auto &queue = _queues[dcId]; if (dc.sessionRemoveIndex == index) { dc.sessionRemoveTimes = std::min( @@ -288,8 +317,17 @@ void DownloadManagerMtproto::removeSession(MTP::DcId dcId) { dc.sessionRemoveIndex = index; dc.sessionRemoveTimes = 1; } + auto &session = dc.sessions.back(); + + // Make sure we don't send anything to that session while redirecting. + session.requested += kMaxWaitedInSession * kMaxSessionsCount; + _queues[dcId].removeSession(index); + Assert(session.requested == kMaxWaitedInSession * kMaxSessionsCount); + + dc.sessions.pop_back(); + MTP::killSession(MTP::downloadDcId(dcId, index)); + dc.lastSessionRemove = crl::now(); -// dc.sessions.pop_back(); } void DownloadManagerMtproto::killSessionsSchedule(MTP::DcId dcId) { @@ -406,16 +444,37 @@ void DownloadMtprotoTask::refreshFileReferenceFrom( } } -void DownloadMtprotoTask::loadPart(int dcIndex) { - makeRequest({ takeNextRequestOffset(), dcIndex }); +void DownloadMtprotoTask::loadPart(int sessionIndex) { + makeRequest({ takeNextRequestOffset(), sessionIndex }); } -mtpRequestId DownloadMtprotoTask::sendRequest(const RequestData &requestData) { +void DownloadMtprotoTask::removeSession(int sessionIndex) { + struct Redirect { + mtpRequestId requestId = 0; + int offset = 0; + }; + auto redirect = std::vector(); + for (const auto &[requestId, requestData] : _sentRequests) { + if (requestData.sessionIndex == sessionIndex) { + redirect.reserve(_sentRequests.size()); + redirect.push_back({ requestId, requestData.offset }); + } + } + for (const auto &[requestId, offset] : redirect) { + cancelRequest(requestId); + const auto newIndex = _owner->chooseSessionIndex(dcId()); + Assert(newIndex < sessionIndex); + makeRequest({ offset, newIndex }); + } +} + +mtpRequestId DownloadMtprotoTask::sendRequest( + const RequestData &requestData) { const auto offset = requestData.offset; const auto limit = Storage::kDownloadPartSize; const auto shiftedDcId = MTP::downloadDcId( _cdnDcId ? _cdnDcId : dcId(), - requestData.dcIndex); + requestData.sessionIndex); if (_cdnDcId) { return api().request(MTPupload_GetCdnFile( MTP_bytes(_cdnToken), @@ -488,7 +547,7 @@ void DownloadMtprotoTask::requestMoreCdnFileHashes() { const auto requestData = _cdnUncheckedParts.cbegin()->first; const auto shiftedDcId = MTP::downloadDcId( dcId(), - requestData.dcIndex); + requestData.sessionIndex); _cdnHashesRequestId = api().request(MTPupload_GetCdnFileHashes( MTP_bytes(_cdnToken), MTP_int(requestData.offset) @@ -533,7 +592,7 @@ void DownloadMtprotoTask::cdnPartLoaded(const MTPupload_CdnFile &result, mtpRequ FinishRequestReason::Redirect); const auto shiftedDcId = MTP::downloadDcId( dcId(), - requestData.dcIndex); + requestData.sessionIndex); const auto requestId = api().request(MTPupload_ReuploadCdnFile( MTP_bytes(_cdnToken), data.vrequest_token() @@ -665,15 +724,16 @@ void DownloadMtprotoTask::getCdnFileHashesDone( void DownloadMtprotoTask::placeSentRequest( mtpRequestId requestId, const RequestData &requestData) { - _owner->changeRequestedAmount( + const auto amount = _owner->changeRequestedAmount( dcId(), - requestData.dcIndex, + requestData.sessionIndex, Storage::kDownloadPartSize); const auto [i, ok1] = _sentRequests.emplace(requestId, requestData); const auto [j, ok2] = _requestByOffset.emplace( requestData.offset, requestId); + i->second.requestedInSession = amount; i->second.sent = crl::now(); Ensures(ok1 && ok2); @@ -692,14 +752,17 @@ auto DownloadMtprotoTask::finishSentRequest( const auto result = it->second; _owner->changeRequestedAmount( dcId(), - result.dcIndex, + result.sessionIndex, -Storage::kDownloadPartSize); _sentRequests.erase(it); const auto ok = _requestByOffset.remove(result.offset); if (reason == FinishRequestReason::Success) { - const auto duration = crl::now() - result.sent; - _owner->requestSucceeded(dcId(), result.dcIndex, duration); + _owner->requestSucceeded( + dcId(), + result.sessionIndex, + result.requestedInSession, + result.sent); } Ensures(ok); @@ -731,10 +794,16 @@ void DownloadMtprotoTask::cancelRequestForOffset(int offset) { } void DownloadMtprotoTask::cancelRequest(mtpRequestId requestId) { + const auto hashes = (_cdnHashesRequestId == requestId); api().request(requestId).cancel(); [[maybe_unused]] const auto data = finishSentRequest( requestId, FinishRequestReason::Cancel); + if (hashes && !_cdnUncheckedParts.empty()) { + crl::on_main(this, [=] { + requestMoreCdnFileHashes(); + }); + } } void DownloadMtprotoTask::addToQueue() { diff --git a/Telegram/SourceFiles/storage/download_manager_mtproto.h b/Telegram/SourceFiles/storage/download_manager_mtproto.h index 01629a1f1..2a7a0bff8 100644 --- a/Telegram/SourceFiles/storage/download_manager_mtproto.h +++ b/Telegram/SourceFiles/storage/download_manager_mtproto.h @@ -42,8 +42,13 @@ public: return _taskFinishedObservable; } - void changeRequestedAmount(MTP::DcId dcId, int index, int delta); - void requestSucceeded(MTP::DcId dcId, int index, crl::time duration); + int changeRequestedAmount(MTP::DcId dcId, int index, int delta); + void requestSucceeded( + MTP::DcId dcId, + int index, + int amountAtRequestStart, + crl::time timeAtRequestStart); + [[nodiscard]] int chooseSessionIndex(MTP::DcId dcId) const; private: class Queue final { @@ -53,6 +58,7 @@ private: void resetGeneration(); [[nodiscard]] bool empty() const; [[nodiscard]] Task *nextTask() const; + void removeSession(int index); private: std::vector> _tasks; @@ -129,7 +135,8 @@ public: [[nodiscard]] const Location &location() const; [[nodiscard]] virtual bool readyToRequest() const = 0; - void loadPart(int dcIndex); + void loadPart(int sessionIndex); + void removeSession(int sessionIndex); void refreshFileReferenceFrom( const Data::UpdatedFileReferences &updates, @@ -152,7 +159,8 @@ protected: private: struct RequestData { int offset = 0; - int dcIndex = 0; + int sessionIndex = 0; + int requestedInSession = 0; crl::time sent = 0; inline bool operator<(const RequestData &other) const {