diff --git a/Telegram/SourceFiles/mtproto/session_private.cpp b/Telegram/SourceFiles/mtproto/session_private.cpp index 05e968b7f..09f078f87 100644 --- a/Telegram/SourceFiles/mtproto/session_private.cpp +++ b/Telegram/SourceFiles/mtproto/session_private.cpp @@ -60,6 +60,8 @@ constexpr auto kSendStateRequestWaiting = crl::time(1000); // How much time to wait for some more requests, when sending msg acks. constexpr auto kAckSendWaiting = 10 * crl::time(1000); +constexpr auto kCutContainerOnSize = 16 * 1024; + auto SyncTimeRequestDuration = kFastRequestDuration; using namespace details; @@ -696,7 +698,8 @@ void SessionPrivate::tryToSend() { initSize = initSizeInInts * sizeof(mtpPrime); } - bool needAnyResponse = false; + auto needAnyResponse = false; + auto someSkipped = false; SerializedRequest toSendRequest; { QWriteLocker locker1(_sessionData->toSendMutex()); @@ -711,15 +714,33 @@ void SessionPrivate::tryToSend() { locker1.unlock(); } - uint32 toSendCount = toSend.size(); - if (pingRequest) ++toSendCount; - if (ackRequest) ++toSendCount; - if (resendRequest) ++toSendCount; - if (stateRequest) ++toSendCount; - if (httpWaitRequest) ++toSendCount; - if (bindDcKeyRequest) ++toSendCount; + auto totalSending = int(toSend.size()); + auto sendingFrom = begin(toSend); + auto sendingTill = end(toSend); + auto combinedLength = 0; + for (auto i = sendingFrom; i != sendingTill; ++i) { + combinedLength += i->second->size(); + if (combinedLength >= kCutContainerOnSize) { + ++i; + if (const auto skipping = int(sendingTill - i)) { + sendingTill = i; + totalSending -= skipping; + Assert(totalSending > 0); + someSkipped = true; + } + break; + } + } + auto sendingRange = ranges::make_subrange(sendingFrom, sendingTill); + const auto sendingCount = totalSending; + if (pingRequest) ++totalSending; + if (ackRequest) ++totalSending; + if (resendRequest) ++totalSending; + if (stateRequest) ++totalSending; + if (httpWaitRequest) ++totalSending; + if (bindDcKeyRequest) ++totalSending; - if (!toSendCount) { + if (!totalSending) { return; // nothing to send } @@ -735,11 +756,11 @@ void SessionPrivate::tryToSend() { ? httpWaitRequest : bindDcKeyRequest ? bindDcKeyRequest - : toSend.begin()->second; - if (toSendCount == 1 && !first->forceSendInContainer) { + : sendingRange.begin()->second; + if (totalSending == 1 && !first->forceSendInContainer) { toSendRequest = first; if (sendAll) { - toSend.clear(); + toSend.erase(sendingFrom, sendingTill); locker1.unlock(); } @@ -808,7 +829,7 @@ void SessionPrivate::tryToSend() { if (stateRequest) containerSize += stateRequest.messageSize(); if (httpWaitRequest) containerSize += httpWaitRequest.messageSize(); if (bindDcKeyRequest) containerSize += bindDcKeyRequest.messageSize(); - for (const auto &[requestId, request] : toSend) { + for (const auto &[requestId, request] : sendingRange) { containerSize += request.messageSize(); if (needsLayer && request->needsLayer) { containerSize += initSizeInInts; @@ -825,9 +846,9 @@ void SessionPrivate::tryToSend() { // prepare container + each in invoke after toSendRequest = SerializedRequest::Prepare( containerSize, - containerSize + 3 * toSend.size()); + containerSize + 3 * sendingCount); toSendRequest->push_back(mtpc_msg_container); - toSendRequest->push_back(toSendCount); + toSendRequest->push_back(totalSending); // check for a valid container auto bigMsgId = base::unixtime::mtproto_msg_id(); @@ -839,7 +860,7 @@ void SessionPrivate::tryToSend() { // prepare sent container auto sentIdsWrap = SentContainer(); sentIdsWrap.sent = crl::now(); - sentIdsWrap.messages.reserve(toSendCount); + sentIdsWrap.messages.reserve(totalSending); if (bindDcKeyRequest) { _bindMsgId = placeToContainer( @@ -859,7 +880,7 @@ void SessionPrivate::tryToSend() { needAnyResponse = true; } - for (auto &[requestId, request] : toSend) { + for (auto &[requestId, request] : sendingRange) { const auto msgId = prepareToSend( request, bigMsgId, @@ -904,7 +925,7 @@ void SessionPrivate::tryToSend() { memcpy(toSendRequest->data() + from, request->constData() + 4, len * sizeof(mtpPrime)); } } - toSend.clear(); + toSend.erase(sendingFrom, sendingTill); if (stateRequest) { const auto msgId = placeToContainer( @@ -951,6 +972,11 @@ void SessionPrivate::tryToSend() { } } sendSecureRequest(std::move(toSendRequest), needAnyResponse); + if (someSkipped) { + InvokeQueued(this, [=] { + tryToSend(); + }); + } } void SessionPrivate::retryByTimer() { diff --git a/Telegram/SourceFiles/storage/file_upload.cpp b/Telegram/SourceFiles/storage/file_upload.cpp index 3028b3e6d..95c4b0a1b 100644 --- a/Telegram/SourceFiles/storage/file_upload.cpp +++ b/Telegram/SourceFiles/storage/file_upload.cpp @@ -26,8 +26,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace Storage { namespace { -// max 512kb uploaded at the same time in each session -constexpr auto kMaxUploadPerSession = 512 * 1024; +// max 1mb uploaded at the same time in each session +constexpr auto kMaxUploadPerSession = 1024 * 1024; constexpr auto kDocumentMaxPartsCountDefault = 4000; @@ -47,7 +47,7 @@ constexpr auto kDocumentUploadPartSize3 = 256 * 1024; constexpr auto kDocumentUploadPartSize4 = 512 * 1024; // One part each half second, if not uploaded faster. -constexpr auto kUploadRequestInterval = crl::time(200); +constexpr auto kUploadRequestInterval = crl::time(250); // How much time without upload causes additional session kill. constexpr auto kKillSessionTimeout = 15 * crl::time(1000); @@ -59,6 +59,10 @@ constexpr auto kMaxSessionsCount = 8; constexpr auto kFastRequestThreshold = 1 * crl::time(1000); constexpr auto kSlowRequestThreshold = 8 * crl::time(1000); +// Request is 'fast' if it was done in less than 1s and +// (it-s size + queued before size) >= 512kb. +constexpr auto kAcceptAsFastIfTotalAtLeast = 512 * 1024; + [[nodiscard]] const char *ThumbnailFormat(const QString &mime) { return Core::IsMimeSticker(mime) ? "WEBP" : "JPG"; } @@ -97,6 +101,7 @@ struct Uploader::Request { FullMsgId itemId; crl::time sent = 0; QByteArray bytes; + int queued = 0; ushort part = 0; uchar dcIndex = 0; bool docPart = false; @@ -467,7 +472,9 @@ auto Uploader::sendPart(not_null entry, uchar dcIndex) template void Uploader::sendPreparedRequest(Prepared &&prepared, Request &&request) { - _sentPerDcIndex[request.dcIndex] += int(request.bytes.size()); + auto &sentInSession = _sentPerDcIndex[request.dcIndex]; + const auto queued = sentInSession; + sentInSession += int(request.bytes.size()); const auto requestId = _api->request( std::move(prepared) @@ -478,6 +485,7 @@ void Uploader::sendPreparedRequest(Prepared &&prepared, Request &&request) { }).toDC(MTP::uploadDcId(request.dcIndex)).send(); request.sent = crl::now(); + request.queued = queued; _requests.emplace(requestId, std::move(request)); } @@ -515,7 +523,7 @@ auto Uploader::sendDocPart(not_null entry, uchar dcIndex) const auto itemId = entry->itemId; const auto alreadySent = _sentPerDcIndex[dcIndex]; const auto willProbablyBeSent = entry->docPartSize; - if (alreadySent + willProbablyBeSent >= kMaxUploadPerSession) { + if (alreadySent + willProbablyBeSent > kMaxUploadPerSession) { return SendResult::DcIndexFull; } @@ -614,9 +622,13 @@ void Uploader::maybeSend() { } // If this entry failed, we try the next one. } - usedDcIndices.emplace(dcIndex); + if (_sentPerDcIndex[dcIndex] >= kAcceptAsFastIfTotalAtLeast) { + usedDcIndices.emplace(dcIndex); + } } - if (!usedDcIndices.empty()) { + if (usedDcIndices.empty()) { + _nextTimer.cancel(); + } else { _nextTimer.callOnce(kUploadRequestInterval); } } @@ -718,7 +730,8 @@ void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { } else { DEBUG_LOG(("Uploader: Slow-ish request, clear fast records.")); } - } else if (request.sent > _latestDcIndexAdded) { + } else if (request.sent > _latestDcIndexAdded + && (request.queued + bytes >= kAcceptAsFastIfTotalAtLeast)) { if (_dcIndicesWithFastRequests.emplace(request.dcIndex).second) { DEBUG_LOG(("Uploader: Mark %1 of %2 as fast." ).arg(request.dcIndex