diff --git a/Telegram/SourceFiles/mtproto/concurrent_sender.cpp b/Telegram/SourceFiles/mtproto/concurrent_sender.cpp index b2ffb0028..3d484296a 100644 --- a/Telegram/SourceFiles/mtproto/concurrent_sender.cpp +++ b/Telegram/SourceFiles/mtproto/concurrent_sender.cpp @@ -111,7 +111,7 @@ auto ConcurrentSender::with_instance(Method &&method) ConcurrentSender::RequestBuilder::RequestBuilder( not_null sender, - SecureRequest &&serialized) noexcept + details::SerializedRequest &&serialized) noexcept : _sender(sender) , _serialized(std::move(serialized)) { } diff --git a/Telegram/SourceFiles/mtproto/concurrent_sender.h b/Telegram/SourceFiles/mtproto/concurrent_sender.h index 5929f800c..68fb22781 100644 --- a/Telegram/SourceFiles/mtproto/concurrent_sender.h +++ b/Telegram/SourceFiles/mtproto/concurrent_sender.h @@ -12,6 +12,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "base/weak_ptr.h" #include "base/flat_map.h" #include "mtproto/core_types.h" +#include "mtproto/details/mtproto_serialized_request.h" #ifndef _DEBUG #define MTP_SENDER_USE_GENERIC_HANDLERS @@ -61,7 +62,7 @@ class ConcurrentSender : public base::has_weak_ptr { protected: RequestBuilder( not_null sender, - SecureRequest &&serialized) noexcept; + details::SerializedRequest &&serialized) noexcept; void setToDC(ShiftedDcId dcId) noexcept; void setCanWait(crl::time ms) noexcept; @@ -74,7 +75,7 @@ class ConcurrentSender : public base::has_weak_ptr { private: not_null _sender; - SecureRequest _serialized; + details::SerializedRequest _serialized; ShiftedDcId _dcId = 0; crl::time _canWait = 0; @@ -224,8 +225,8 @@ void ConcurrentSender::RequestBuilder::setFailHandler( template ConcurrentSender::SpecificRequestBuilder::SpecificRequestBuilder( not_null sender, - Request &&request -) noexcept : RequestBuilder(sender, SecureRequest::Serialize(request)) { + Request &&request) noexcept +: RequestBuilder(sender, details::SerializedRequest::Serialize(request)) { } template diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index e5d5be76a..80f4e478e 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -85,11 +85,15 @@ using namespace details; : TemporaryKeyType::Regular; } -void wrapInvokeAfter(SecureRequest &to, const SecureRequest &from, const RequestMap &haveSent, int32 skipBeforeRequest = 0) { +void WrapInvokeAfter( + SerializedRequest &to, + const SerializedRequest &from, + const base::flat_map &haveSent, + int32 skipBeforeRequest = 0) { const auto afterId = *(mtpMsgId*)(from->after->data() + 4); - const auto i = afterId ? haveSent.constFind(afterId) : haveSent.cend(); + const auto i = afterId ? haveSent.find(afterId) : haveSent.end(); int32 size = to->size(), lenInInts = (tl::count_length(from) >> 2), headlen = 4, fulllen = headlen + lenInInts; - if (i == haveSent.constEnd()) { // no invoke after or such msg was not sent or was completed recently + if (i == haveSent.end()) { // no invoke after or such msg was not sent or was completed recently to->resize(size + fulllen + skipBeforeRequest); if (skipBeforeRequest) { memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime)); @@ -229,41 +233,41 @@ int16 ConnectionPrivate::getProtocolDcId() const { } void ConnectionPrivate::checkSentRequests() { - QVector removingIds; // remove very old (10 minutes) containers and resend requests + // Remove very old (10 minutes) containers and resend requests. + auto removingIds = std::vector(); auto requesting = false; { QReadLocker locker(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); const auto haveSentCount = haveSent.size(); - auto ms = crl::now(); - for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) { - auto &req = i.value(); - if (req->msDate > 0) { - if (req->msDate + kCheckSentRequestTimeout < ms) { - // Need to check state. - req->msDate = ms; - if (_stateRequestData.emplace(i.key()).second) { - requesting = true; - } + auto now = crl::now(); + for (const auto &[msgId, request] : haveSent) { + if (request.isStateRequest()) { + continue; + } else if (request.isSentContainer()) { + if (base::unixtime::now() + > int32(msgId >> 32) + kContainerLives) { + removingIds.push_back(msgId); + } + } else if (request->lastSentTime + kCheckSentRequestTimeout + < now) { + // Need to check state. + request->lastSentTime = now; + if (_stateRequestData.emplace(msgId).second) { + requesting = true; } - } else if (base::unixtime::now() - > int32(i.key() >> 32) + kContainerLives) { - removingIds.reserve(haveSentCount); - removingIds.push_back(i.key()); } } } if (requesting) { _sessionData->queueSendAnything(kSendStateRequestWaiting); } - if (!removingIds.isEmpty()) { + if (!removingIds.empty()) { QWriteLocker locker(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); - for (uint32 i = 0, l = removingIds.size(); i < l; ++i) { - auto j = haveSent.find(removingIds[i]); - if (j != haveSent.cend()) { - Assert(!j.value()->requestId); - haveSent.erase(j); + for (const auto msgId : removingIds) { + if (const auto removed = haveSent.take(msgId)) { + Assert(!(*removed)->requestId); } } } @@ -436,7 +440,7 @@ bool ConnectionPrivate::markSessionAsStarted() { } mtpMsgId ConnectionPrivate::prepareToSend( - SecureRequest &request, + SerializedRequest &request, mtpMsgId currentLastId, bool forceNewMsgId) { Expects(request->size() > 8); @@ -460,7 +464,7 @@ mtpMsgId ConnectionPrivate::prepareToSend( return currentLastId; } -mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) { +mtpMsgId ConnectionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId newId) { Expects(request->size() > 8); const auto oldMsgId = request.getMsgId(); @@ -472,11 +476,14 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) while (_resendingIds.contains(newId) || _ackedIds.contains(newId) - || haveSent.constFind(newId) != haveSent.cend()) { + || haveSent.contains(newId)) { newId = base::unixtime::mtproto_msg_id(); } - MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3").arg(request->requestId).arg(oldMsgId).arg(newId)); + MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3" + ).arg(request->requestId + ).arg(oldMsgId + ).arg(newId)); const auto i = _resendingIds.find(oldMsgId); if (i != _resendingIds.end()) { @@ -486,24 +493,23 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) } const auto j = _ackedIds.find(oldMsgId); - if (j != _ackedIds.cend()) { + if (j != _ackedIds.end()) { const auto requestId = j->second; _ackedIds.erase(j); _ackedIds.emplace(newId, requestId); } const auto k = haveSent.find(oldMsgId); - if (k != haveSent.cend()) { - const auto req = k.value(); + if (k != haveSent.end()) { + const auto request = k->second; haveSent.erase(k); - haveSent.insert(newId, req); + haveSent.emplace(newId, request); } - for (auto l = haveSent.begin(); l != haveSent.cend(); ++l) { - const auto req = l.value(); - if (req.isSentContainer()) { - const auto ids = (mtpMsgId *)(req->data() + 8); - for (uint32 i = 0, l = (req->size() - 8) >> 1; i < l; ++i) { + for (const auto &[requestId, sent] : haveSent) { + if (sent.isSentContainer()) { + const auto ids = (mtpMsgId *)(sent->data() + 8); + for (uint32 i = 0, l = (sent->size() - 8) >> 1; i < l; ++i) { if (ids[i] == oldMsgId) { ids[i] = newId; } @@ -517,11 +523,11 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) } mtpMsgId ConnectionPrivate::placeToContainer( - SecureRequest &toSendRequest, + SerializedRequest &toSendRequest, mtpMsgId &bigMsgId, bool forceNewMsgId, mtpMsgId *&haveSentArr, - SecureRequest &req) { + SerializedRequest &req) { const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId); if (msgId >= bigMsgId) { bigMsgId = base::unixtime::mtproto_msg_id(); @@ -560,28 +566,28 @@ void ConnectionPrivate::tryToSend() { _keyCreator->restartBinder(); } - auto pingRequest = SecureRequest(); - auto ackRequest = SecureRequest(); - auto resendRequest = SecureRequest(); - auto stateRequest = SecureRequest(); - auto httpWaitRequest = SecureRequest(); - auto bindDcKeyRequest = SecureRequest(); + auto pingRequest = SerializedRequest(); + auto ackRequest = SerializedRequest(); + auto resendRequest = SerializedRequest(); + auto stateRequest = SerializedRequest(); + auto httpWaitRequest = SerializedRequest(); + auto bindDcKeyRequest = SerializedRequest(); if (_pingIdToSend) { if (sendOnlyFirstPing || !isMainSession) { DEBUG_LOG(("MTP Info: sending ping, ping_id: %1" ).arg(_pingIdToSend)); - pingRequest = SecureRequest::Serialize(MTPPing( + pingRequest = SerializedRequest::Serialize(MTPPing( MTP_long(_pingIdToSend) )); } else { DEBUG_LOG(("MTP Info: sending ping_delay_disconnect, " "ping_id: %1").arg(_pingIdToSend)); - pingRequest = SecureRequest::Serialize(MTPPing_delay_disconnect( + pingRequest = SerializedRequest::Serialize(MTPPing_delay_disconnect( MTP_long(_pingIdToSend), MTP_int(kPingDelayDisconnect))); _pingSender.callOnce(kPingSendAfterForce); } - _pingSendAt = pingRequest->msDate + kPingSendAfter; + _pingSendAt = pingRequest->lastSentTime + kPingSendAfter; _pingId = base::take(_pingIdToSend); } else if (!sendAll) { DEBUG_LOG(("MTP Info: dc %1 sending only service or bind." @@ -594,12 +600,12 @@ void ConnectionPrivate::tryToSend() { if (!sendOnlyFirstPing) { if (!_ackRequestData.isEmpty()) { - ackRequest = SecureRequest::Serialize(MTPMsgsAck( + ackRequest = SerializedRequest::Serialize(MTPMsgsAck( MTP_msgs_ack(MTP_vector( base::take(_ackRequestData))))); } if (!_resendRequestData.isEmpty()) { - resendRequest = SecureRequest::Serialize(MTPMsgResendReq( + resendRequest = SerializedRequest::Serialize(MTPMsgResendReq( MTP_msg_resend_req(MTP_vector( base::take(_resendRequestData))))); } @@ -609,13 +615,13 @@ void ConnectionPrivate::tryToSend() { for (const auto id : base::take(_stateRequestData)) { ids.push_back(MTP_long(id)); } - stateRequest = SecureRequest::Serialize(MTPMsgsStateReq( + stateRequest = SerializedRequest::Serialize(MTPMsgsStateReq( MTP_msgs_state_req(MTP_vector(ids)))); // Add to haveSent / _ackedIds, but don't add to requestMap. stateRequest->requestId = GetNextRequestId(); } if (_connection->usingHttpWait()) { - httpWaitRequest = SecureRequest::Serialize(MTPHttpWait( + httpWaitRequest = SerializedRequest::Serialize(MTPHttpWait( MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000)))); } if (!_bindMsgId && _keyCreator && _keyCreator->readyToBind()) { @@ -631,7 +637,7 @@ void ConnectionPrivate::tryToSend() { } } - MTPInitConnection initWrapper; + MTPInitConnection initWrapper; int32 initSize = 0, initSizeInInts = 0; if (needsLayer) { Assert(_connectionOptions != nullptr); @@ -660,8 +666,8 @@ void ConnectionPrivate::tryToSend() { MTP_string(_connectionOptions->proxy.host), MTP_int(_connectionOptions->proxy.port)) : MTPInputClientProxy(); - using Flag = MTPInitConnection::Flag; - initWrapper = MTPInitConnection( + using Flag = MTPInitConnection::Flag; + initWrapper = MTPInitConnection( MTP_flags(mtprotoProxy ? Flag::f_proxy : Flag(0)), MTP_int(ApiId), MTP_string(deviceModel), @@ -671,25 +677,22 @@ void ConnectionPrivate::tryToSend() { MTP_string(langPackName), MTP_string(cloudLangCode), clientProxyFields, - SecureRequest()); + SerializedRequest()); initSizeInInts = (tl::count_length(initWrapper) >> 2) + 2; initSize = initSizeInInts * sizeof(mtpPrime); } bool needAnyResponse = false; - SecureRequest toSendRequest; + SerializedRequest toSendRequest; { QWriteLocker locker1(_sessionData->toSendMutex()); - auto toSendDummy = PreRequestMap(); + auto toSendDummy = base::flat_map(); auto &toSend = sendAll ? _sessionData->toSendMap() : toSendDummy; if (!sendAll) { locker1.unlock(); - } else { - int time = crl::now(); - int now = crl::now(); } uint32 toSendCount = toSend.size(); @@ -716,8 +719,8 @@ void ConnectionPrivate::tryToSend() { ? httpWaitRequest : bindDcKeyRequest ? bindDcKeyRequest - : toSend.cbegin().value(); - if (toSendCount == 1 && first->msDate > 0) { // if can send without container + : toSend.begin()->second; + if (toSendCount == 1 && !first->forceSendInContainer) { toSendRequest = first; if (sendAll) { toSend.clear(); @@ -740,27 +743,27 @@ void ConnectionPrivate::tryToSend() { if (toSendRequest->requestId) { if (toSendRequest.needAck()) { - toSendRequest->msDate = toSendRequest.isStateRequest() ? 0 : crl::now(); + toSendRequest->lastSentTime = crl::now(); QWriteLocker locker2(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); - haveSent.insert(msgId, toSendRequest); + haveSent.emplace(msgId, toSendRequest); const auto wrapLayer = needsLayer && toSendRequest->needsLayer; if (toSendRequest->after) { const auto toSendSize = tl::count_length(toSendRequest) >> 2; - auto wrappedRequest = SecureRequest::Prepare( + auto wrappedRequest = SerializedRequest::Prepare( toSendSize, toSendSize + 3); wrappedRequest->resize(4); memcpy(wrappedRequest->data(), toSendRequest->constData(), 4 * sizeof(mtpPrime)); - wrapInvokeAfter(wrappedRequest, toSendRequest, haveSent); + WrapInvokeAfter(wrappedRequest, toSendRequest, haveSent); toSendRequest = std::move(wrappedRequest); } if (wrapLayer) { const auto noWrapSize = (tl::count_length(toSendRequest) >> 2); const auto toSendSize = noWrapSize + initSizeInInts; - auto wrappedRequest = SecureRequest::Prepare(toSendSize); + auto wrappedRequest = SerializedRequest::Prepare(toSendSize); memcpy(wrappedRequest->data(), toSendRequest->constData(), 7 * sizeof(mtpPrime)); // all except length wrappedRequest->push_back(mtpc_invokeWithLayer); wrappedRequest->push_back(internal::CurrentLayer); @@ -784,9 +787,9 @@ void ConnectionPrivate::tryToSend() { if (stateRequest) containerSize += stateRequest.messageSize(); if (httpWaitRequest) containerSize += httpWaitRequest.messageSize(); if (bindDcKeyRequest) containerSize += bindDcKeyRequest.messageSize(); - for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) { - containerSize += i.value().messageSize(); - if (needsLayer && i.value()->needsLayer) { + for (const auto &[requestId, request] : toSend) { + containerSize += request.messageSize(); + if (needsLayer && request->needsLayer) { containerSize += initSizeInInts; willNeedInit = true; } @@ -799,7 +802,7 @@ void ConnectionPrivate::tryToSend() { initWrapper.write(initSerialized); } // prepare container + each in invoke after - toSendRequest = SecureRequest::Prepare( + toSendRequest = SerializedRequest::Prepare( containerSize, containerSize + 3 * toSend.size()); toSendRequest->push_back(mtpc_msg_container); @@ -813,9 +816,8 @@ void ConnectionPrivate::tryToSend() { auto &haveSent = _sessionData->haveSentMap(); // prepare "request-like" wrap for msgId vector - auto haveSentIdsWrap = SecureRequest::Prepare(idsWrapSize); - haveSentIdsWrap->msDate = 0; // Container: msDate = 0, seqNo = 0. - haveSentIdsWrap->requestId = 0; + auto haveSentIdsWrap = SerializedRequest::Prepare(idsWrapSize); + haveSentIdsWrap->isContainerIdsWrap = true; haveSentIdsWrap->resize(haveSentIdsWrap->size() + idsWrapSize); auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8); @@ -840,10 +842,9 @@ void ConnectionPrivate::tryToSend() { if (resendRequest || stateRequest) { needAnyResponse = true; } - for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) { - auto &req = i.value(); + for (auto &[requestId, request] : toSend) { const auto msgId = prepareToSend( - req, + request, bigMsgId, forceNewMsgId); if (msgId >= bigMsgId) { @@ -851,44 +852,43 @@ void ConnectionPrivate::tryToSend() { } *(haveSentArr++) = msgId; bool added = false; - if (req->requestId) { - if (req.needAck()) { - req->msDate = req.isStateRequest() ? 0 : crl::now(); - int32 reqNeedsLayer = (needsLayer && req->needsLayer) ? toSendRequest->size() : 0; - if (req->after) { - wrapInvokeAfter(toSendRequest, req, haveSent, reqNeedsLayer ? initSizeInInts : 0); + if (request->requestId) { + if (request.needAck()) { + request->lastSentTime = crl::now(); + int32 reqNeedsLayer = (needsLayer && request->needsLayer) ? toSendRequest->size() : 0; + if (request->after) { + WrapInvokeAfter(toSendRequest, request, haveSent, reqNeedsLayer ? initSizeInInts : 0); if (reqNeedsLayer) { memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize); *(toSendRequest->data() + reqNeedsLayer + 3) += initSize; } added = true; } else if (reqNeedsLayer) { - toSendRequest->resize(reqNeedsLayer + initSizeInInts + req.messageSize()); - memcpy(toSendRequest->data() + reqNeedsLayer, req->constData() + 4, 4 * sizeof(mtpPrime)); + toSendRequest->resize(reqNeedsLayer + initSizeInInts + request.messageSize()); + memcpy(toSendRequest->data() + reqNeedsLayer, request->constData() + 4, 4 * sizeof(mtpPrime)); memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize); - memcpy(toSendRequest->data() + reqNeedsLayer + 4 + initSizeInInts, req->constData() + 8, tl::count_length(req)); + memcpy(toSendRequest->data() + reqNeedsLayer + 4 + initSizeInInts, request->constData() + 8, tl::count_length(request)); *(toSendRequest->data() + reqNeedsLayer + 3) += initSize; added = true; } Assert(!haveSent.contains(msgId)); - haveSent.insert(msgId, req); + haveSent.emplace(msgId, request); needAnyResponse = true; } else { - _ackedIds.emplace(msgId, req->requestId); + _ackedIds.emplace(msgId, request->requestId); } } if (!added) { - uint32 from = toSendRequest->size(), len = req.messageSize(); + uint32 from = toSendRequest->size(), len = request.messageSize(); toSendRequest->resize(from + len); - memcpy(toSendRequest->data() + from, req->constData() + 4, len * sizeof(mtpPrime)); + memcpy(toSendRequest->data() + from, request->constData() + 4, len * sizeof(mtpPrime)); } } if (stateRequest) { - mtpMsgId msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest); - stateRequest->msDate = 0; // 0 for state request, do not request state of it + const auto msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest); Assert(!haveSent.contains(msgId)); - haveSent.insert(msgId, stateRequest); + haveSent.emplace(msgId, stateRequest); } if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest); if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest); @@ -899,9 +899,8 @@ void ConnectionPrivate::tryToSend() { bigMsgId, forceNewMsgId); *(mtpMsgId*)(haveSentIdsWrap->data() + 4) = containerMsgId; - (*haveSentIdsWrap)[6] = 0; // for container, msDate = 0, seqNo = 0 Assert(!haveSent.contains(containerMsgId)); - haveSent.insert(containerMsgId, haveSentIdsWrap); + haveSent.emplace(containerMsgId, haveSentIdsWrap); toSend.clear(); } } @@ -1370,7 +1369,8 @@ void ConnectionPrivate::handleReceived() { } auto lock = QReadLocker(_sessionData->haveReceivedMutex()); - const auto tryToReceive = !_sessionData->haveReceivedResponses().isEmpty() || !_sessionData->haveReceivedUpdates().isEmpty(); + const auto tryToReceive = !_sessionData->haveReceivedResponses().empty() + || !_sessionData->haveReceivedUpdates().empty(); lock.unlock(); if (tryToReceive) { @@ -1521,16 +1521,16 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( || (errorCode == 64); // bad container if (errorCode == 64) { // bad container! if (Logs::DebugEnabled()) { - SecureRequest request; + SerializedRequest request; { QWriteLocker locker(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); - const auto i = haveSent.constFind(resendId); - if (i == haveSent.cend()) { + const auto i = haveSent.find(resendId); + if (i == haveSent.end()) { LOG(("Message Error: Container not found!")); } else { - request = i.value(); + request = i->second; } } if (request) { @@ -1626,53 +1626,6 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( resend(resendId); } return HandleResult::Success; - case mtpc_msgs_state_req: { - if (badTime) { - DEBUG_LOG(("Message Info: skipping with bad time...")); - return HandleResult::Ignored; - } - MTPMsgsStateReq msg; - if (!msg.read(from, end)) { - return HandleResult::ParseError; - } - auto &ids = msg.c_msgs_state_req().vmsg_ids().v; - auto idsCount = ids.size(); - DEBUG_LOG(("Message Info: msgs_state_req received, ids: %1").arg(LogIdsVector(ids))); - if (!idsCount) return HandleResult::Success; - - QByteArray info(idsCount, Qt::Uninitialized); - { - const auto minRecv = _receivedMessageIds.min(); - const auto maxRecv = _receivedMessageIds.max(); - for (uint32 i = 0, l = idsCount; i < l; ++i) { - char state = 0; - uint64 reqMsgId = ids[i].v; - if (reqMsgId < minRecv) { - state |= 0x01; - } else if (reqMsgId > maxRecv) { - state |= 0x03; - } else { - auto msgIdState = _receivedMessageIds.lookup(reqMsgId); - if (msgIdState == ReceivedIdsManager::State::NotFound) { - state |= 0x02; - } else { - state |= 0x04; - if (_ackedIds.contains(reqMsgId)) { - state |= 0x80; // we know, that server knows, that we received request - } - if (msgIdState == ReceivedIdsManager::State::NeedsAck) { // need ack, so we sent ack - state |= 0x08; - } else { - state |= 0x10; - } - } - } - info[i] = state; - } - } - _sessionData->queueSendMsgsStateInfo(msgId, info); - } return HandleResult::Success; - case mtpc_msgs_state_info: { MTPMsgsStateInfo msg; if (!msg.read(from, end)) { @@ -1684,12 +1637,12 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( auto &states = data.vinfo().v; DEBUG_LOG(("Message Info: msg state received, msgId %1, reqMsgId: %2, HEX states %3").arg(msgId).arg(reqMsgId).arg(Logs::mb(states.data(), states.length()).str())); - SecureRequest requestBuffer; + SerializedRequest requestBuffer; { // find this request in session-shared sent requests map QReadLocker locker(_sessionData->haveSentMutex()); const auto &haveSent = _sessionData->haveSentMap(); - const auto replyTo = haveSent.constFind(reqMsgId); - if (replyTo == haveSent.cend()) { // do not look in toResend, because we do not resend msgs_state_req requests + const auto replyTo = haveSent.find(reqMsgId); + if (replyTo == haveSent.end()) { // do not look in toResend, because we do not resend msgs_state_req requests DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(reqMsgId)); return (badTime ? HandleResult::Ignored : HandleResult::Success); } @@ -1703,7 +1656,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( badTime = false; } - requestBuffer = replyTo.value(); + requestBuffer = replyTo->second; } QVector toAckReq(1, MTP_long(reqMsgId)), toAck; requestsAcked(toAck, true); @@ -1809,7 +1762,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( if (from + 3 > end) { return HandleResult::ParseError; } - auto response = SerializedMessage(); + auto response = mtpBuffer(); MTPlong reqMsgId; if (!reqMsgId.read(++from, end)) { @@ -1861,7 +1814,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) { // Save rpc_result for processing in the main thread. QWriteLocker locker(_sessionData->haveReceivedMutex()); - _sessionData->haveReceivedResponses().insert(requestId, response); + _sessionData->haveReceivedResponses().emplace(requestId, response); } else { DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(requestMsgId)); } @@ -1893,11 +1846,11 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( QReadLocker locker(_sessionData->haveSentMutex()); const auto &haveSent = _sessionData->haveSentMap(); toResend.reserve(haveSent.size()); - for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { - if (i.key() >= firstMsgId) { + for (const auto &[msgId, request] : haveSent) { + if (msgId >= firstMsgId) { break; - } else if (i.value()->requestId) { - toResend.push_back(i.key()); + } else if (request->requestId) { + toResend.push_back(msgId); } } } @@ -1910,7 +1863,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( // Notify main process about new session - need to get difference. QWriteLocker locker(_sessionData->haveReceivedMutex()); - _sessionData->haveReceivedUpdates().push_back(SerializedMessage(update)); + _sessionData->haveReceivedUpdates().push_back(mtpBuffer(update)); } return HandleResult::Success; case mtpc_pong: { @@ -1957,7 +1910,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( // Notify main process about the new updates. QWriteLocker locker(_sessionData->haveReceivedMutex()); - _sessionData->haveReceivedUpdates().push_back(SerializedMessage(update)); + _sessionData->haveReceivedUpdates().push_back(mtpBuffer(update)); } else { LOG(("Message Error: unexpected updates in dcType: %1" ).arg(static_cast(_currentDcType))); @@ -2075,27 +2028,27 @@ void ConnectionPrivate::requestsAcked(const QVector &ids, bool byRespon for (uint32 i = 0; i < idsCount; ++i) { mtpMsgId msgId = ids[i].v; const auto req = haveSent.find(msgId); - if (req != haveSent.cend()) { - if (!req.value()->msDate) { + if (req != haveSent.end()) { + if (req->second.isSentContainer()) { DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v)); - uint32 inContCount = ((*req)->size() - 8) / 2; - const mtpMsgId *inContId = (const mtpMsgId *)(req.value()->constData() + 8); + uint32 inContCount = (req->second->size() - 8) / 2; + const mtpMsgId *inContId = (const mtpMsgId *)(req->second->constData() + 8); toAckMore.reserve(toAckMore.size() + inContCount); for (uint32 j = 0; j < inContCount; ++j) { toAckMore.push_back(MTP_long(*(inContId++))); } haveSent.erase(req); } else { - mtpRequestId reqId = req.value()->requestId; + const auto requestId = req->second->requestId; bool moveToAcked = byResponse; if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) - moveToAcked = !_instance->hasCallbacks(reqId); + moveToAcked = !_instance->hasCallbacks(requestId); } if (moveToAcked) { - _ackedIds.emplace(msgId, reqId); + _ackedIds.emplace(msgId, requestId); haveSent.erase(req); } else { - DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId)); + DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId)); } } } else { @@ -2112,9 +2065,9 @@ void ConnectionPrivate::requestsAcked(const QVector &ids, bool byRespon auto &toSend = _sessionData->toSendMap(); const auto req = toSend.find(reqId); if (req != toSend.cend()) { - _ackedIds.emplace(msgId, req.value()->requestId); - if (req.value()->requestId != reqId) { - DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req.value()->requestId)); + _ackedIds.emplace(msgId, req->second->requestId); + if (req->second->requestId != reqId) { + DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req->second->requestId)); } else { DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId)); } @@ -2172,9 +2125,7 @@ void ConnectionPrivate::handleMsgsStates(const QVector &ids, const QByt uint64 requestMsgId = ids[i].v; { QReadLocker locker(_sessionData->haveSentMutex()); - const auto &haveSent = _sessionData->haveSentMap(); - const auto haveSentEnd = haveSent.cend(); - if (haveSent.find(requestMsgId) == haveSentEnd) { + if (!_sessionData->haveSentMap().contains(requestMsgId)) { DEBUG_LOG(("Message Info: state was received for msgId %1, but request is not found, looking in resent requests...").arg(requestMsgId)); const auto reqIt = _resendingIds.find(requestMsgId); if (reqIt != _resendingIds.cend()) { @@ -2226,7 +2177,7 @@ void ConnectionPrivate::resend( if (i == haveSent.end()) { return; } - auto request = i.value(); + auto request = i->second; haveSent.erase(i); lock.unlock(); @@ -2238,11 +2189,12 @@ void ConnectionPrivate::resend( resend(ids[i], -1, true); } } else if (!request.isStateRequest()) { - request->msDate = forceContainer ? 0 : crl::now(); + request->lastSentTime = crl::now(); + request->forceSendInContainer = forceContainer; _resendingIds.emplace(msgId, request->requestId); { QWriteLocker locker(_sessionData->toSendMutex()); - _sessionData->toSendMap().insert(request->requestId, request); + _sessionData->toSendMap().emplace(request->requestId, request); } } } @@ -2253,9 +2205,9 @@ void ConnectionPrivate::resendAll() { auto lock = QReadLocker(_sessionData->haveSentMutex()); const auto &haveSent = _sessionData->haveSentMap(); toResend.reserve(haveSent.size()); - for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { - if (!i.value().isSentContainer()) { - toResend.push_back(i.key()); + for (const auto &[msgId, request] : haveSent) { + if (!request.isSentContainer() && !request.isStateRequest()) { + toResend.push_back(msgId); } } lock.unlock(); @@ -2581,7 +2533,7 @@ void ConnectionPrivate::destroyTemporaryKey() { } bool ConnectionPrivate::sendSecureRequest( - SecureRequest &&request, + SerializedRequest &&request, bool needAnyResponse) { #ifdef TDESKTOP_MTPROTO_OLD const auto oldPadding = true; @@ -2671,10 +2623,10 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const { { QReadLocker locker(_sessionData->haveSentMutex()); const auto &haveSent = _sessionData->haveSentMap(); - const auto i = haveSent.constFind(msgId); - if (i != haveSent.cend()) { - return i.value()->requestId - ? i.value()->requestId + const auto i = haveSent.find(msgId); + if (i != haveSent.end()) { + return i->second->requestId + ? i->second->requestId : mtpRequestId(0xFFFFFFFF); } } diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index 36c5706d3..2715bb640 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -8,6 +8,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #pragma once #include "mtproto/details/mtproto_received_ids_manager.h" +#include "mtproto/details/mtproto_serialized_request.h" #include "mtproto/mtproto_auth_key.h" #include "mtproto/dc_options.h" #include "mtproto/connection_abstract.h" @@ -139,18 +140,22 @@ private: void checkSentRequests(); mtpMsgId placeToContainer( - SecureRequest &toSendRequest, + details::SerializedRequest &toSendRequest, mtpMsgId &bigMsgId, bool forceNewMsgId, mtpMsgId *&haveSentArr, - SecureRequest &req); + details::SerializedRequest &req); mtpMsgId prepareToSend( - SecureRequest &request, + details::SerializedRequest &request, mtpMsgId currentLastId, bool forceNewMsgId); - mtpMsgId replaceMsgId(SecureRequest &request, mtpMsgId newId); + mtpMsgId replaceMsgId( + details::SerializedRequest &request, + mtpMsgId newId); - bool sendSecureRequest(SecureRequest &&request, bool needAnyResponse); + bool sendSecureRequest( + details::SerializedRequest &&request, + bool needAnyResponse); mtpRequestId wasSent(mtpMsgId msgId) const; [[nodiscard]] HandleResult handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime); diff --git a/Telegram/SourceFiles/mtproto/core_types.h b/Telegram/SourceFiles/mtproto/core_types.h index 37c84a16d..93724f611 100644 --- a/Telegram/SourceFiles/mtproto/core_types.h +++ b/Telegram/SourceFiles/mtproto/core_types.h @@ -122,102 +122,6 @@ static const mtpTypeId mtpLayers[] = { }; static const uint32 mtpLayerMaxSingle = sizeof(mtpLayers) / sizeof(mtpLayers[0]); -namespace MTP { -namespace details { - -struct SecureRequestCreateTag { -}; - -} // namespace details - -class SecureRequestData; -class SecureRequest { -public: - SecureRequest() = default; - - static constexpr auto kSaltInts = 2; - static constexpr auto kSessionIdInts = 2; - static constexpr auto kMessageIdPosition = kSaltInts + kSessionIdInts; - static constexpr auto kMessageIdInts = 2; - static constexpr auto kSeqNoPosition = kMessageIdPosition - + kMessageIdInts; - static constexpr auto kSeqNoInts = 1; - static constexpr auto kMessageLengthPosition = kSeqNoPosition - + kSeqNoInts; - static constexpr auto kMessageLengthInts = 1; - static constexpr auto kMessageBodyPosition = kMessageLengthPosition - + kMessageLengthInts; - - static SecureRequest Prepare(uint32 size, uint32 reserveSize = 0); - - template < - typename Request, - typename = std::enable_if_t>> - static SecureRequest Serialize(const Request &request); - - // For template MTP requests and MTPBoxed instanciation. - template - void write(Accumulator &to) const { - if (const auto size = sizeInBytes()) { - tl::Writer::PutBytes(to, dataInBytes(), size); - } - } - - SecureRequestData *operator->() const; - SecureRequestData &operator*() const; - explicit operator bool() const; - - void setMsgId(mtpMsgId msgId); - [[nodiscard]] mtpMsgId getMsgId() const; - - void setSeqNo(uint32 seqNo); - [[nodiscard]] uint32 getSeqNo() const; - - void addPadding(bool extended, bool old); - [[nodiscard]] uint32 messageSize() const; - - // "request-like" wrap for msgIds vector - [[nodiscard]] bool isSentContainer() const; - [[nodiscard]] bool isStateRequest() const; - [[nodiscard]] bool needAck() const; - - using ResponseType = void; // don't know real response type =( - -private: - explicit SecureRequest(const details::SecureRequestCreateTag &); - - [[nodiscard]] size_t sizeInBytes() const; - [[nodiscard]] const void *dataInBytes() const; - - std::shared_ptr _data; - -}; - -class SecureRequestData : public mtpBuffer { -public: - explicit SecureRequestData(const details::SecureRequestCreateTag &) { - } - - // in toSend: = 0 - must send in container, > 0 - can send without container - // in haveSent: = 0 - container with msgIds, > 0 - when was sent - int64 msDate = 0; - - mtpRequestId requestId = 0; - SecureRequest after; - bool needsLayer = false; - -}; - -template -SecureRequest SecureRequest::Serialize(const Request &request) { - const auto requestSize = tl::count_length(request) >> 2; - auto serialized = Prepare(requestSize); - request.template write(*serialized); - return serialized; -} - -} // namespace MTP - using MTPint = tl::int_type; inline MTPint MTP_int(int32 v) { diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp index ae68422b5..c84a40bbd 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp +++ b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp @@ -7,6 +7,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #include "mtproto/details/mtproto_bound_key_creator.h" +#include "mtproto/details/mtproto_serialized_request.h" + namespace MTP::details { BoundKeyCreator::BoundKeyCreator(DcKeyRequest request, Delegate delegate) @@ -54,7 +56,7 @@ bool BoundKeyCreator::readyToBind() const { return _binder.has_value(); } -SecureRequest BoundKeyCreator::prepareBindRequest( +SerializedRequest BoundKeyCreator::prepareBindRequest( const AuthKeyPtr &temporaryKey, uint64 sessionId) { Expects(_binder.has_value()); diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h index e8a7bf100..a1ca4cb83 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h +++ b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h @@ -12,6 +12,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace MTP::details { +class SerializedRequest; + class BoundKeyCreator final { public: struct Delegate { @@ -32,7 +34,7 @@ public: void bind(AuthKeyPtr &&persistentKey); void restartBinder(); [[nodiscard]] bool readyToBind() const; - [[nodiscard]] SecureRequest prepareBindRequest( + [[nodiscard]] SerializedRequest prepareBindRequest( const AuthKeyPtr &temporaryKey, uint64 sessionId); [[nodiscard]] DcKeyBindState handleBindResponse( diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp index 736a6f4fa..0f6b0b56c 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp +++ b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp @@ -7,6 +7,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #include "mtproto/details/mtproto_dc_key_binder.h" +#include "mtproto/details/mtproto_serialized_request.h" #include "mtproto/mtp_instance.h" #include "base/unixtime.h" #include "base/openssl_help.h" @@ -21,12 +22,12 @@ namespace { const AuthKeyPtr &persistentKey, mtpMsgId realMsgId, const MTPBindAuthKeyInner &data) { - auto serialized = SecureRequest::Serialize(data); + auto serialized = SerializedRequest::Serialize(data); serialized.setMsgId(realMsgId); serialized.setSeqNo(0); serialized.addPadding(false, true); - constexpr auto kMsgIdPosition = SecureRequest::kMessageIdPosition; + constexpr auto kMsgIdPosition = SerializedRequest::kMessageIdPosition; constexpr auto kMinMessageSize = 5; const auto sizeInPrimes = serialized->size(); @@ -77,7 +78,7 @@ DcKeyBinder::DcKeyBinder(AuthKeyPtr &&persistentKey) Expects(_persistentKey != nullptr); } -SecureRequest DcKeyBinder::prepareRequest( +SerializedRequest DcKeyBinder::prepareRequest( const AuthKeyPtr &temporaryKey, uint64 sessionId) { Expects(temporaryKey != nullptr); @@ -85,7 +86,7 @@ SecureRequest DcKeyBinder::prepareRequest( const auto nonce = openssl::RandomValue(); const auto msgId = base::unixtime::mtproto_msg_id(); - auto result = SecureRequest::Serialize(MTPauth_BindTempAuthKey( + auto result = SerializedRequest::Serialize(MTPauth_BindTempAuthKey( MTP_long(_persistentKey->keyId()), MTP_long(nonce), MTP_int(temporaryKey->expiresAt()), diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h index ba18176e4..882e0f88d 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h +++ b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h @@ -16,6 +16,8 @@ class Instance; namespace MTP::details { +class SerializedRequest; + enum class DcKeyBindState { Success, Failed, @@ -26,7 +28,7 @@ class DcKeyBinder final { public: explicit DcKeyBinder(AuthKeyPtr &&persistentKey); - [[nodiscard]] SecureRequest prepareRequest( + [[nodiscard]] SerializedRequest prepareRequest( const AuthKeyPtr &temporaryKey, uint64 sessionId); [[nodiscard]] DcKeyBindState handleResponse(const mtpBuffer &response); diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_dump_to_text.h b/Telegram/SourceFiles/mtproto/details/mtproto_dump_to_text.h index f24361655..61c47665e 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_dump_to_text.h +++ b/Telegram/SourceFiles/mtproto/details/mtproto_dump_to_text.h @@ -8,6 +8,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #pragma once #include "mtproto/core_types.h" +#include "mtproto/details/mtproto_serialized_request.h" namespace MTP::details { diff --git a/Telegram/SourceFiles/mtproto/core_types.cpp b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.cpp similarity index 55% rename from Telegram/SourceFiles/mtproto/core_types.cpp rename to Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.cpp index 50e62f95e..032957d40 100644 --- a/Telegram/SourceFiles/mtproto/core_types.cpp +++ b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.cpp @@ -5,9 +5,11 @@ the official desktop application for the Telegram messaging service. For license and copyright information please follow this link: https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ -#include "mtproto/core_types.h" +#include "mtproto/details/mtproto_serialized_request.h" -namespace MTP { +#include "base/openssl_help.h" + +namespace MTP::details { namespace { uint32 CountPaddingPrimesCount(uint32 requestSize, bool extended, bool old) { @@ -27,7 +29,7 @@ uint32 CountPaddingPrimesCount(uint32 requestSize, bool extended, bool old) { if (extended) { // Some more random padding. - result += ((rand_value() & 0x0F) << 2); + result += ((openssl::RandomValue() & 0x0F) << 2); } return result; @@ -35,67 +37,72 @@ uint32 CountPaddingPrimesCount(uint32 requestSize, bool extended, bool old) { } // namespace -SecureRequest::SecureRequest(const details::SecureRequestCreateTag &tag) -: _data(std::make_shared(tag)) { +SerializedRequest::SerializedRequest(const RequestConstructHider::Tag &tag) +: _data(std::make_shared(tag)) { } -SecureRequest SecureRequest::Prepare(uint32 size, uint32 reserveSize) { +SerializedRequest SerializedRequest::Prepare( + uint32 size, + uint32 reserveSize) { + Expects(size > 0); + const auto finalSize = std::max(size, reserveSize); - auto result = SecureRequest(details::SecureRequestCreateTag{}); + auto result = SerializedRequest(RequestConstructHider::Tag{}); result->reserve(kMessageBodyPosition + finalSize); result->resize(kMessageBodyPosition); result->back() = (size << 2); - result->msDate = crl::now(); // > 0 - can send without container + result->lastSentTime = crl::now(); return result; } -SecureRequestData *SecureRequest::operator->() const { +RequestData *SerializedRequest::operator->() const { Expects(_data != nullptr); return _data.get(); } -SecureRequestData &SecureRequest::operator*() const { +RequestData &SerializedRequest::operator*() const { Expects(_data != nullptr); return *_data; } -SecureRequest::operator bool() const { +SerializedRequest::operator bool() const { return (_data != nullptr); } -void SecureRequest::setMsgId(mtpMsgId msgId) { +void SerializedRequest::setMsgId(mtpMsgId msgId) { Expects(_data != nullptr); + Expects(_data->size() > kMessageBodyPosition); memcpy(_data->data() + kMessageIdPosition, &msgId, sizeof(mtpMsgId)); } -mtpMsgId SecureRequest::getMsgId() const { +mtpMsgId SerializedRequest::getMsgId() const { Expects(_data != nullptr); + Expects(_data->size() > kMessageBodyPosition); return *(mtpMsgId*)(_data->constData() + kMessageIdPosition); } -void SecureRequest::setSeqNo(uint32 seqNo) { +void SerializedRequest::setSeqNo(uint32 seqNo) { Expects(_data != nullptr); + Expects(_data->size() > kMessageBodyPosition); (*_data)[kSeqNoPosition] = mtpPrime(seqNo); } -uint32 SecureRequest::getSeqNo() const { +uint32 SerializedRequest::getSeqNo() const { Expects(_data != nullptr); + Expects(_data->size() > kMessageBodyPosition); return uint32((*_data)[kSeqNoPosition]); } -void SecureRequest::addPadding(bool extended, bool old) { +void SerializedRequest::addPadding(bool extended, bool old) { Expects(_data != nullptr); - - if (_data->size() <= kMessageBodyPosition) { - return; - } + Expects(_data->size() > kMessageBodyPosition); const auto requestSize = (tl::count_length(*this) >> 2); const auto padding = CountPaddingPrimesCount(requestSize, extended, old); @@ -103,48 +110,38 @@ void SecureRequest::addPadding(bool extended, bool old) { if (uint32(_data->size()) != fullSize) { _data->resize(fullSize); if (padding > 0) { - memset_rand( - _data->data() + (fullSize - padding), - padding * sizeof(mtpPrime)); + bytes::set_random(bytes::make_span(*_data).subspan( + (fullSize - padding) * sizeof(mtpPrime))); } } } -uint32 SecureRequest::messageSize() const { +uint32 SerializedRequest::messageSize() const { Expects(_data != nullptr); + Expects(_data->size() > kMessageBodyPosition); - if (_data->size() <= kMessageBodyPosition) { - return 0; - } const auto ints = (tl::count_length(*this) >> 2); return kMessageIdInts + kSeqNoInts + kMessageLengthInts + ints; } -bool SecureRequest::isSentContainer() const { +bool SerializedRequest::isSentContainer() const { Expects(_data != nullptr); - if (_data->size() <= kMessageBodyPosition) { - return false; - } - return (!_data->msDate && !getSeqNo()); // msDate = 0, seqNo = 0 + return _data->isContainerIdsWrap; } -bool SecureRequest::isStateRequest() const { +bool SerializedRequest::isStateRequest() const { Expects(_data != nullptr); + Expects(_data->size() > kMessageBodyPosition); - if (_data->size() <= kMessageBodyPosition) { - return false; - } const auto type = mtpTypeId((*_data)[kMessageBodyPosition]); return (type == mtpc_msgs_state_req); } -bool SecureRequest::needAck() const { +bool SerializedRequest::needAck() const { Expects(_data != nullptr); + Expects(_data->size() > kMessageBodyPosition); - if (_data->size() <= kMessageBodyPosition) { - return false; - } const auto type = mtpTypeId((*_data)[kMessageBodyPosition]); switch (type) { case mtpc_msg_container: @@ -160,16 +157,14 @@ bool SecureRequest::needAck() const { return true; } -size_t SecureRequest::sizeInBytes() const { - return (_data && _data->size() > kMessageBodyPosition) - ? (*_data)[kMessageLengthPosition] - : 0; +size_t SerializedRequest::sizeInBytes() const { + Expects(!_data || _data->size() > kMessageBodyPosition); + return _data ? (*_data)[kMessageLengthPosition] : 0; } -const void *SecureRequest::dataInBytes() const { - return (_data && _data->size() > kMessageBodyPosition) - ? (_data->constData() + kMessageBodyPosition) - : nullptr; +const void *SerializedRequest::dataInBytes() const { + Expects(!_data || _data->size() > kMessageBodyPosition); + return _data ? (_data->constData() + kMessageBodyPosition) : nullptr; } } // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.h b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.h new file mode 100644 index 000000000..08e13380e --- /dev/null +++ b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.h @@ -0,0 +1,109 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "mtproto/core_types.h" + +#include + +namespace MTP::details { + +class RequestData; +class SerializedRequest; + +class RequestConstructHider { + struct Tag {}; + friend class RequestData; + friend class SerializedRequest; +}; + +class SerializedRequest { +public: + SerializedRequest() = default; + + static constexpr auto kSaltInts = 2; + static constexpr auto kSessionIdInts = 2; + static constexpr auto kMessageIdPosition = kSaltInts + kSessionIdInts; + static constexpr auto kMessageIdInts = 2; + static constexpr auto kSeqNoPosition = kMessageIdPosition + + kMessageIdInts; + static constexpr auto kSeqNoInts = 1; + static constexpr auto kMessageLengthPosition = kSeqNoPosition + + kSeqNoInts; + static constexpr auto kMessageLengthInts = 1; + static constexpr auto kMessageBodyPosition = kMessageLengthPosition + + kMessageLengthInts; + + static SerializedRequest Prepare(uint32 size, uint32 reserveSize = 0); + + template < + typename Request, + typename = std::enable_if_t>> + static SerializedRequest Serialize(const Request &request); + + // For template MTP requests and MTPBoxed instantiation. + template + void write(Accumulator &to) const { + if (const auto size = sizeInBytes()) { + tl::Writer::PutBytes(to, dataInBytes(), size); + } + } + + RequestData *operator->() const; + RequestData &operator*() const; + explicit operator bool() const; + + void setMsgId(mtpMsgId msgId); + [[nodiscard]] mtpMsgId getMsgId() const; + + void setSeqNo(uint32 seqNo); + [[nodiscard]] uint32 getSeqNo() const; + + void addPadding(bool extended, bool old); + [[nodiscard]] uint32 messageSize() const; + + // "request-like" wrap for msgIds vector + [[nodiscard]] bool isSentContainer() const; + [[nodiscard]] bool isStateRequest() const; + [[nodiscard]] bool needAck() const; + + using ResponseType = void; // don't know real response type =( + +private: + explicit SerializedRequest(const RequestConstructHider::Tag &); + + [[nodiscard]] size_t sizeInBytes() const; + [[nodiscard]] const void *dataInBytes() const; + + std::shared_ptr _data; + +}; + +class RequestData : public mtpBuffer { +public: + explicit RequestData(const RequestConstructHider::Tag &) { + } + + SerializedRequest after; + crl::time lastSentTime = 0; + mtpRequestId requestId = 0; + bool needsLayer = false; + bool forceSendInContainer = false; + bool isContainerIdsWrap = false; + +}; + +template +SerializedRequest SerializedRequest::Serialize(const Request &request) { + const auto requestSize = tl::count_length(request) >> 2; + auto serialized = Prepare(requestSize); + request.template write(*serialized); + return serialized; +} + +} // namespace MTP::details diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp index 20fa3e1a5..888728b9d 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -36,6 +36,7 @@ constexpr auto kConfigBecomesOldForBlockedIn = 8 * crl::time(1000); constexpr auto kCheckKeyEach = 60 * crl::time(1000); using namespace internal; +using namespace details; std::atomic GlobalAtomicRequestId = 0; @@ -111,7 +112,7 @@ public: void sendRequest( mtpRequestId requestId, - SecureRequest &&request, + SerializedRequest &&request, RPCResponseHandler &&callbacks, ShiftedDcId shiftedDcId, crl::time msCanWait, @@ -121,9 +122,9 @@ public: void unregisterRequest(mtpRequestId requestId); void storeRequest( mtpRequestId requestId, - const SecureRequest &request, + const SerializedRequest &request, RPCResponseHandler &&callbacks); - SecureRequest getRequest(mtpRequestId requestId); + SerializedRequest getRequest(mtpRequestId requestId); void clearCallbacksDelayed(std::vector &&ids); void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); bool hasCallbacks(mtpRequestId requestId); @@ -238,7 +239,7 @@ private: std::map _parserMap; QMutex _parserMapLock; - std::map _requestMap; + std::map _requestMap; QReadWriteLock _requestMapLock; std::deque> _delayedRequests; @@ -911,7 +912,7 @@ void Instance::Private::checkDelayedRequests() { continue; } - auto request = SecureRequest(); + auto request = SerializedRequest(); { QReadLocker locker(&_requestMapLock); auto it = _requestMap.find(requestId); @@ -932,7 +933,7 @@ void Instance::Private::checkDelayedRequests() { void Instance::Private::sendRequest( mtpRequestId requestId, - SecureRequest &&request, + SerializedRequest &&request, RPCResponseHandler &&callbacks, ShiftedDcId shiftedDcId, crl::time msCanWait, @@ -951,7 +952,7 @@ void Instance::Private::sendRequest( if (afterRequestId) { request->after = getRequest(afterRequestId); } - request->msDate = crl::now(); // > 0 - can send without container + request->lastSentTime = crl::now(); request->needsLayer = needsLayer; session->sendPrepared(request, msCanWait); @@ -980,7 +981,7 @@ void Instance::Private::unregisterRequest(mtpRequestId requestId) { void Instance::Private::storeRequest( mtpRequestId requestId, - const SecureRequest &request, + const SerializedRequest &request, RPCResponseHandler &&callbacks) { if (callbacks.onDone || callbacks.onFail) { QMutexLocker locker(&_parserMapLock); @@ -992,8 +993,8 @@ void Instance::Private::storeRequest( } } -SecureRequest Instance::Private::getRequest(mtpRequestId requestId) { - auto result = SecureRequest(); +SerializedRequest Instance::Private::getRequest(mtpRequestId requestId) { + auto result = SerializedRequest(); { QReadLocker locker(&_requestMapLock); auto it = _requestMap.find(requestId); @@ -1319,7 +1320,7 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e newdcWithShift = ShiftDcId(newdcWithShift, GetDcIdShift(dcWithShift)); } - auto request = SecureRequest(); + auto request = SerializedRequest(); { QReadLocker locker(&_requestMapLock); auto it = _requestMap.find(requestId); @@ -1391,7 +1392,7 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e if (badGuestDc) _badGuestDcRequests.insert(requestId); return true; } else if (err == qstr("CONNECTION_NOT_INITED") || err == qstr("CONNECTION_LAYER_INVALID")) { - SecureRequest request; + SerializedRequest request; { QReadLocker locker(&_requestMapLock); auto it = _requestMap.find(requestId); @@ -1416,7 +1417,7 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e } else if (err == qstr("CONNECTION_LANG_CODE_INVALID")) { Lang::CurrentCloudManager().resetToDefault(); } else if (err == qstr("MSG_WAIT_FAILED")) { - SecureRequest request; + SerializedRequest request; { QReadLocker locker(&_requestMapLock); auto it = _requestMap.find(requestId); @@ -1435,7 +1436,7 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e if (const auto afterDcId = queryRequestByDc(request->after->requestId)) { dcWithShift = *shiftedDcId; if (*shiftedDcId != *afterDcId) { - request->after = SecureRequest(); + request->after = SerializedRequest(); } } else { LOG(("MTP Error: could not find dependent request %1 by dc").arg(request->after->requestId)); @@ -1852,7 +1853,7 @@ void Instance::keyDestroyedOnServer(ShiftedDcId shiftedDcId, uint64 keyId) { void Instance::sendRequest( mtpRequestId requestId, - SecureRequest &&request, + SerializedRequest &&request, RPCResponseHandler &&callbacks, ShiftedDcId shiftedDcId, crl::time msCanWait, diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h index a59a96a9b..17e2f5889 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.h +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -8,6 +8,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #pragma once #include "mtproto/mtproto_rpc_sender.h" +#include "mtproto/details/mtproto_serialized_request.h" namespace MTP { namespace internal { @@ -139,7 +140,7 @@ public: const auto requestId = internal::GetNextRequestId(); sendSerialized( requestId, - SecureRequest::Serialize(request), + details::SerializedRequest::Serialize(request), std::move(callbacks), shiftedDcId, msCanWait, @@ -170,7 +171,7 @@ public: const auto requestId = internal::GetNextRequestId(); sendRequest( requestId, - SecureRequest::Serialize(request), + details::SerializedRequest::Serialize(request), {}, shiftedDcId, 0, @@ -181,7 +182,7 @@ public: void sendSerialized( mtpRequestId requestId, - SecureRequest &&request, + details::SerializedRequest &&request, RPCResponseHandler &&callbacks, ShiftedDcId shiftedDcId, crl::time msCanWait, @@ -209,7 +210,7 @@ signals: private: void sendRequest( mtpRequestId requestId, - SecureRequest &&request, + details::SerializedRequest &&request, RPCResponseHandler &&callbacks, ShiftedDcId shiftedDcId, crl::time msCanWait, diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index ba2ce7fd4..66cf7f31e 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -94,12 +94,6 @@ void SessionData::queueSendAnything(crl::time msCanWait) { }); } -void SessionData::queueSendMsgsStateInfo(quint64 msgId, QByteArray data) { - withSession([=](not_null session) { - session->sendMsgsStateInfo(msgId, data); - }); -} - bool SessionData::connectionInited() const { QMutexLocker lock(&_ownerMutex); return _owner ? _owner->connectionInited() : false; @@ -300,18 +294,6 @@ void Session::needToResumeAndSend() { } } -void Session::sendMsgsStateInfo(quint64 msgId, QByteArray data) { - auto info = bytes::vector(); - if (!data.isEmpty()) { - info.resize(data.size()); - bytes::copy(info, bytes::make_span(data)); - } - _instance->sendProtocolMessage( - _shiftedDcId, - MTPMsgsStateInfo( - MTP_msgs_state_info(MTP_long(msgId), MTP_bytes(data)))); -} - void Session::connectionStateChange(int newState) { _instance->onStateChange(_shiftedDcId, newState); } @@ -356,17 +338,14 @@ int32 Session::requestState(mtpRequestId requestId) const { } if (!connected) { return result; - } - if (!requestId) return MTP::RequestSent; - - QWriteLocker locker(_data->toSendMutex()); - const auto &toSend = _data->toSendMap(); - const auto i = toSend.constFind(requestId); - if (i != toSend.cend()) { - return MTP::RequestSending; - } else { + } else if (!requestId) { return MTP::RequestSent; } + + QWriteLocker locker(_data->toSendMutex()); + return _data->toSendMap().contains(requestId) + ? MTP::RequestSending + : MTP::RequestSent; } int32 Session::getState() const { @@ -397,13 +376,13 @@ QString Session::transport() const { } void Session::sendPrepared( - const SecureRequest &request, + const details::SerializedRequest &request, crl::time msCanWait) { DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1" ).arg(msCanWait)); { QWriteLocker locker(_data->toSendMutex()); - _data->toSendMap().insert(request->requestId, request); + _data->toSendMap().emplace(request->requestId, request); *(mtpMsgId*)(request->data() + 4) = 0; *(request->data() + 6) = 0; } @@ -510,35 +489,27 @@ void Session::tryToReceive() { return; } while (true) { - auto requestId = mtpRequestId(0); - auto isUpdate = false; - auto message = SerializedMessage(); - { - QWriteLocker locker(_data->haveReceivedMutex()); - auto &responses = _data->haveReceivedResponses(); - auto response = responses.begin(); - if (response == responses.cend()) { - auto &updates = _data->haveReceivedUpdates(); - auto update = updates.begin(); - if (update == updates.cend()) { - return; - } else { - message = std::move(*update); - isUpdate = true; - updates.pop_front(); - } - } else { - requestId = response.key(); - message = std::move(response.value()); - responses.erase(response); - } + auto lock = QWriteLocker(_data->haveReceivedMutex()); + const auto responses = base::take(_data->haveReceivedResponses()); + const auto updates = base::take(_data->haveReceivedUpdates()); + lock.unlock(); + if (responses.empty() && updates.empty()) { + break; } - if (isUpdate) { - if (_shiftedDcId == BareDcId(_shiftedDcId)) { // call globalCallback only in main session - _instance->globalCallback(message.constData(), message.constData() + message.size()); + for (const auto &[requestId, response] : responses) { + _instance->execCallback( + requestId, + response.constData(), + response.constData() + response.size()); + } + + // Call globalCallback only in main session. + if (_shiftedDcId == BareDcId(_shiftedDcId)) { + for (const auto &update : updates) { + _instance->globalCallback( + update.constData(), + update.constData() + update.size()); } - } else { - _instance->execCallback(requestId, message.constData(), message.constData() + message.size()); } } } diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index 896dafe4b..ce4ef83a3 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -10,6 +10,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "base/timer.h" #include "mtproto/mtproto_rpc_sender.h" #include "mtproto/mtproto_proxy_data.h" +#include "mtproto/details/mtproto_serialized_request.h" #include @@ -27,18 +28,6 @@ class Connection; enum class TemporaryKeyType; enum class CreatingKeyType; -using PreRequestMap = QMap; -using RequestMap = QMap; -using SerializedMessage = mtpBuffer; - -inline bool ResponseNeedsAck(const SerializedMessage &response) { - if (response.size() < 8) { - return false; - } - auto seqNo = *(uint32*)(response.constData() + 6); - return (seqNo & 0x01) ? true : false; -} - struct ConnectionOptions { ConnectionOptions() = default; ConnectionOptions( @@ -80,38 +69,26 @@ public: return _options; } - not_null toSendMutex() const { + not_null toSendMutex() { return &_toSendLock; } - not_null haveSentMutex() const { + not_null haveSentMutex() { return &_haveSentLock; } - not_null haveReceivedMutex() const { + not_null haveReceivedMutex() { return &_haveReceivedLock; } - PreRequestMap &toSendMap() { + base::flat_map &toSendMap() { return _toSend; } - const PreRequestMap &toSendMap() const { - return _toSend; - } - RequestMap &haveSentMap() { + base::flat_map &haveSentMap() { return _haveSent; } - const RequestMap &haveSentMap() const { - return _haveSent; - } - QMap &haveReceivedResponses() { + base::flat_map &haveReceivedResponses() { return _receivedResponses; } - const QMap &haveReceivedResponses() const { - return _receivedResponses; - } - QList &haveReceivedUpdates() { - return _receivedUpdates; - } - const QList &haveReceivedUpdates() const { + std::vector &haveReceivedUpdates() { return _receivedUpdates; } @@ -126,7 +103,6 @@ public: void queueConnectionStateChange(int newState); void queueResetDone(); void queueSendAnything(crl::time msCanWait = 0); - void queueSendMsgsStateInfo(quint64 msgId, QByteArray data); [[nodiscard]] bool connectionInited() const; [[nodiscard]] AuthKeyPtr getPersistentKey() const; @@ -148,18 +124,17 @@ private: mutable QMutex _ownerMutex; ConnectionOptions _options; - - PreRequestMap _toSend; // map of request_id -> request, that is waiting to be sent - RequestMap _haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers - - QMap _receivedResponses; // map of request_id -> response that should be processed in the main thread - QList _receivedUpdates; // list of updates that should be processed in the main thread - - // mutexes mutable QReadWriteLock _optionsLock; - mutable QReadWriteLock _toSendLock; - mutable QReadWriteLock _haveSentLock; - mutable QReadWriteLock _haveReceivedLock; + + base::flat_map _toSend; // map of request_id -> request, that is waiting to be sent + QReadWriteLock _toSendLock; + + base::flat_map _haveSent; // map of msg_id -> request, that was sent + QReadWriteLock _haveSentLock; + + base::flat_map _receivedResponses; // map of request_id -> response that should be processed in the main thread + std::vector _receivedUpdates; // list of updates that should be processed in the main thread + QReadWriteLock _haveReceivedLock; }; @@ -189,7 +164,9 @@ public: [[nodiscard]] AuthKeyPtr getPersistentKey() const; [[nodiscard]] AuthKeyPtr getTemporaryKey(TemporaryKeyType type) const; [[nodiscard]] bool connectionInited() const; - void sendPrepared(const SecureRequest &request, crl::time msCanWait = 0); + void sendPrepared( + const details::SerializedRequest &request, + crl::time msCanWait = 0); // Connection thread. [[nodiscard]] CreatingKeyType acquireKeyCreation(TemporaryKeyType type); @@ -212,7 +189,6 @@ public: void connectionStateChange(int newState); void resetDone(); void sendAnything(crl::time msCanWait = 0); - void sendMsgsStateInfo(quint64 msgId, QByteArray data); signals: void authKeyChanged(); diff --git a/Telegram/gyp/lib_mtproto.gyp b/Telegram/gyp/lib_mtproto.gyp index 4b01766a2..f95b4310d 100644 --- a/Telegram/gyp/lib_mtproto.gyp +++ b/Telegram/gyp/lib_mtproto.gyp @@ -44,6 +44,8 @@ '<(src_loc)/mtproto/details/mtproto_dump_to_text.h', '<(src_loc)/mtproto/details/mtproto_received_ids_manager.cpp', '<(src_loc)/mtproto/details/mtproto_received_ids_manager.h', + '<(src_loc)/mtproto/details/mtproto_serialized_request.cpp', + '<(src_loc)/mtproto/details/mtproto_serialized_request.h', '<(src_loc)/mtproto/mtproto_abstract_socket.cpp', '<(src_loc)/mtproto/mtproto_abstract_socket.h', '<(src_loc)/mtproto/mtproto_auth_key.cpp', diff --git a/Telegram/gyp/telegram/sources.txt b/Telegram/gyp/telegram/sources.txt index e7ccceed3..0bd944970 100644 --- a/Telegram/gyp/telegram/sources.txt +++ b/Telegram/gyp/telegram/sources.txt @@ -547,7 +547,6 @@ <(src_loc)/mtproto/connection_resolving.h <(src_loc)/mtproto/connection_tcp.cpp <(src_loc)/mtproto/connection_tcp.h -<(src_loc)/mtproto/core_types.cpp <(src_loc)/mtproto/core_types.h <(src_loc)/mtproto/dcenter.cpp <(src_loc)/mtproto/dcenter.h diff --git a/Telegram/lib_tl b/Telegram/lib_tl index 2bf101114..13918d5b5 160000 --- a/Telegram/lib_tl +++ b/Telegram/lib_tl @@ -1 +1 @@ -Subproject commit 2bf101114f0a54c868a2703a9a80c06c0f2d2c41 +Subproject commit 13918d5b5c4611ef1563a13ac1daf4d2c77c5f49