Abstract out tcp socket from tcp connection.

This commit is contained in:
John Preston 2019-07-05 09:54:53 +02:00
parent ffba901620
commit 3cda267787
13 changed files with 306 additions and 100 deletions

View file

@ -118,6 +118,15 @@ ipPortSecret#37982646 ipv4:int port:int secret:bytes = IpPort;
accessPointRule#4679b65f phone_prefix_rules:string dc_id:int ips:vector<IpPort> = AccessPointRule;
help.configSimple#5a592a6c date:int expires:int rules:vector<AccessPointRule> = help.ConfigSimple;
tlsClientHello blocks:vector<TlsBlock> = TlsClientHello;
tlsBlockString data:string = TlsBlock;
tlsBlockRandom length:int = TlsBlock;
tlsBlockZero length:int = TlsBlock;
tlsBlockDomain = TlsBlock;
tlsBlockGrease seed:int = TlsBlock;
tlsBlockScope entries:Vector<TlsBlock> = TlsBlock;
---functions---
rpc_drop_answer#58e4a740 req_msg_id:long = RpcDropAnswer;

View file

@ -999,6 +999,7 @@ void ProxiesBoxController::refreshChecker(Item &item) {
mtproto,
type,
QThread::currentThread(),
item.data.secretFromMtprotoPassword(),
item.data);
setupChecker(item.id, checker);
};

View file

@ -305,11 +305,12 @@ bool ProxyData::operator!=(const ProxyData &other) const {
}
bool ProxyData::ValidMtprotoPassword(const QString &secret) {
using Expression = QRegularExpression;
if (secret.size() == 32) {
static const auto check = QRegularExpression("^[a-fA-F0-9]{32}$");
static const auto check = Expression("^[a-fA-F0-9]{32}$");
return check.match(secret).hasMatch();
} else if (secret.size() == 34) {
static const auto check = QRegularExpression("^dd[a-fA-F0-9]{32}$");
static const auto check = Expression("^(dd|ee)[a-fA-F0-9]{32}$");
return check.match(secret).hasMatch();
}
return false;

View file

@ -356,6 +356,7 @@ void ConnectionPrivate::appendTestConnection(
_instance,
protocol,
thread(),
protocolSecret,
_connectionOptions->proxy),
priority
});

View file

@ -161,6 +161,7 @@ ConnectionPointer AbstractConnection::Create(
not_null<Instance*> instance,
DcOptions::Variants::Protocol protocol,
QThread *thread,
const bytes::vector &secret,
const ProxyData &proxy) {
auto result = [&] {
if (protocol == DcOptions::Variants::Tcp) {

View file

@ -51,9 +51,7 @@ class AbstractConnection : public QObject {
Q_OBJECT
public:
AbstractConnection(
QThread *thread,
const ProxyData &proxy);
AbstractConnection(QThread *thread, const ProxyData &proxy);
AbstractConnection(const AbstractConnection &other) = delete;
AbstractConnection &operator=(const AbstractConnection &other) = delete;
virtual ~AbstractConnection() = default;
@ -63,6 +61,7 @@ public:
not_null<Instance*> instance,
DcOptions::Variants::Protocol protocol,
QThread *thread,
const bytes::vector &secret,
const ProxyData &proxy);
virtual ConnectionPointer clone(const ProxyData &proxy) = 0;

View file

@ -7,6 +7,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "mtproto/connection_tcp.h"
#include "mtproto/mtp_abstract_socket.h"
#include "base/bytes.h"
#include "base/openssl_help.h"
#include "base/qthelp_url.h"
@ -24,9 +25,6 @@ constexpr auto kFullConnectionTimeout = 8 * crl::time(1000);
constexpr auto kSmallBufferSize = 256 * 1024;
constexpr auto kMinPacketBuffer = 256;
using ErrorSignal = void(QTcpSocket::*)(QAbstractSocket::SocketError);
const auto QTcpSocket_error = ErrorSignal(&QAbstractSocket::error);
} // namespace
class TcpConnection::Protocol {
@ -240,28 +238,6 @@ auto TcpConnection::Protocol::Create(bytes::vector &&secret)
TcpConnection::TcpConnection(QThread *thread, const ProxyData &proxy)
: AbstractConnection(thread, proxy)
, _checkNonce(rand_value<MTPint128>()) {
_socket.moveToThread(thread);
_socket.setProxy(ToNetworkProxy(proxy));
connect(
&_socket,
&QTcpSocket::connected,
this,
&TcpConnection::socketConnected);
connect(
&_socket,
&QTcpSocket::disconnected,
this,
&TcpConnection::socketDisconnected);
connect(
&_socket,
&QTcpSocket::readyRead,
this,
&TcpConnection::socketRead);
connect(
&_socket,
QTcpSocket_error,
this,
&TcpConnection::socketError);
}
ConnectionPointer TcpConnection::clone(const ProxyData &proxy) {
@ -299,10 +275,8 @@ void TcpConnection::ensureAvailableInBuffer(int amount) {
void TcpConnection::socketRead() {
Expects(_leftBytes > 0 || !_usingLargeBuffer);
if (_socket.state() != QAbstractSocket::ConnectedState) {
LOG(("MTP error: "
"socket not connected in socketRead(), state: %1"
).arg(_socket.state()));
if (_socket->isConnected()) {
LOG(("MTP Error: Socket not connected in socketRead()"));
emit error(kErrorCodeOther);
return;
}
@ -321,7 +295,7 @@ void TcpConnection::socketRead() {
const auto free = full.subspan(_readBytes);
Assert(free.size() >= readLimit);
const auto readCount = _socket.read(
const auto readCount = _socket->read(
reinterpret_cast<char*>(free.data()),
readLimit);
if (readCount > 0) {
@ -389,7 +363,7 @@ void TcpConnection::socketRead() {
TCP_LOG(("TCP Info: no bytes read, but bytes available was true..."));
break;
}
} while (_socket.state() == QAbstractSocket::ConnectedState && _socket.bytesAvailable());
} while (_socket->isConnected() && _socket->bytesAvailable());
}
mtpBuffer TcpConnection::parsePacket(bytes::const_span bytes) {
@ -418,45 +392,6 @@ mtpBuffer TcpConnection::parsePacket(bytes::const_span bytes) {
return result;
}
void TcpConnection::handleError(QAbstractSocket::SocketError e, QTcpSocket &socket) {
switch (e) {
case QAbstractSocket::ConnectionRefusedError:
LOG(("TCP Error: socket connection refused - %1").arg(socket.errorString()));
break;
case QAbstractSocket::RemoteHostClosedError:
TCP_LOG(("TCP Info: remote host closed socket connection - %1").arg(socket.errorString()));
break;
case QAbstractSocket::HostNotFoundError:
LOG(("TCP Error: host not found - %1").arg(socket.errorString()));
break;
case QAbstractSocket::SocketTimeoutError:
LOG(("TCP Error: socket timeout - %1").arg(socket.errorString()));
break;
case QAbstractSocket::NetworkError:
LOG(("TCP Error: network - %1").arg(socket.errorString()));
break;
case QAbstractSocket::ProxyAuthenticationRequiredError:
case QAbstractSocket::ProxyConnectionRefusedError:
case QAbstractSocket::ProxyConnectionClosedError:
case QAbstractSocket::ProxyConnectionTimeoutError:
case QAbstractSocket::ProxyNotFoundError:
case QAbstractSocket::ProxyProtocolError:
LOG(("TCP Error: proxy (%1) - %2").arg(e).arg(socket.errorString()));
break;
default:
LOG(("TCP Error: other (%1) - %2").arg(e).arg(socket.errorString()));
break;
}
TCP_LOG(("TCP Error %1, restarting! - %2").arg(e).arg(socket.errorString()));
}
void TcpConnection::socketConnected() {
Expects(_status == Status::Waiting);
@ -544,9 +479,11 @@ void TcpConnection::writeConnectionStart() {
const auto dcId = reinterpret_cast<int16*>(nonce.data() + 60);
*dcId = _protocolDcId;
_socket.write(reinterpret_cast<const char*>(nonce.data()), 56);
_socket->write(reinterpret_cast<const char*>(nonce.data()), 56);
aesCtrEncrypt(nonce, _sendKey, &_sendState);
_socket.write(reinterpret_cast<const char*>(nonce.subspan(56).data()), 8);
_socket->write(
reinterpret_cast<const char*>(nonce.subspan(56).data()),
8);
}
void TcpConnection::sendBuffer(mtpBuffer &&buffer) {
@ -559,7 +496,7 @@ void TcpConnection::sendBuffer(mtpBuffer &&buffer) {
const auto bytes = _protocol->finalizePacket(buffer);
TCP_LOG(("TCP Info: write packet %1 bytes").arg(bytes.size()));
aesCtrEncrypt(bytes, _sendKey, &_sendState);
_socket.write(
_socket->write(
reinterpret_cast<const char*>(bytes.data()),
bytes.size());
}
@ -568,12 +505,9 @@ void TcpConnection::sendBuffer(mtpBuffer &&buffer) {
void TcpConnection::disconnectFromServer() {
if (_status == Status::Finished) return;
_status = Status::Finished;
disconnect(&_socket, &QTcpSocket::connected, nullptr, nullptr);
disconnect(&_socket, &QTcpSocket::disconnected, nullptr, nullptr);
disconnect(&_socket, &QTcpSocket::readyRead, nullptr, nullptr);
disconnect(&_socket, QTcpSocket_error, nullptr, nullptr);
_socket.close();
_connectedLifetime.destroy();
_lifetime.destroy();
_socket = nullptr;
}
void TcpConnection::connectToServer(
@ -605,9 +539,30 @@ void TcpConnection::connectToServer(
).arg(protocolDcId
).arg(_address + ':' + QString::number(_port)));
}
_socket = AbstractSocket::Create(thread(), protocolSecret, _proxy);
_protocolDcId = protocolDcId;
_socket.connectToHost(_address, _port);
_socket->connected(
) | rpl::start_with_next([=] {
socketConnected();
}, _connectedLifetime);
_socket->disconnected(
) | rpl::start_with_next([=] {
socketDisconnected();
}, _lifetime);
_socket->readyRead(
) | rpl::start_with_next([=] {
socketRead();
}, _lifetime);
_socket->error(
) | rpl::start_with_next([=] {
socketError();
}, _lifetime);
_socket->connectToHost(_address, _port);
}
crl::time TcpConnection::pingTime() const {
@ -641,11 +596,7 @@ void TcpConnection::socketPacket(bytes::const_span bytes) {
if (data.vnonce() == _checkNonce) {
DEBUG_LOG(("Connection Info: Valid pq response by TCP."));
_status = Status::Ready;
disconnect(
&_socket,
&QTcpSocket::connected,
nullptr,
nullptr);
_connectedLifetime.destroy();
_pingTime = (crl::now() - _pingTime);
emit connected();
} else {
@ -667,7 +618,7 @@ bool TcpConnection::isConnected() const {
}
int32 TcpConnection::debugState() const {
return _socket.state();
return _socket->debugState();
}
QString TcpConnection::transport() const {
@ -691,10 +642,9 @@ QString TcpConnection::tag() const {
return result;
}
void TcpConnection::socketError(QAbstractSocket::SocketError e) {
void TcpConnection::socketError() {
if (_status == Status::Finished) return;
handleError(e, _socket);
emit error(kErrorCodeOther);
}

View file

@ -7,18 +7,17 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "mtproto/auth_key.h"
#include "mtproto/connection_abstract.h"
#include "base/timer.h"
#include "mtproto/auth_key.h"
namespace MTP {
namespace internal {
class AbstractSocket;
class TcpConnection : public AbstractConnection {
public:
TcpConnection(
QThread *thread,
const ProxyData &proxy);
TcpConnection(QThread *thread, const ProxyData &proxy);
ConnectionPointer clone(const ProxyData &proxy) override;
@ -55,7 +54,7 @@ private:
void socketConnected();
void socketDisconnected();
void socketError(QAbstractSocket::SocketError e);
void socketError();
mtpBuffer parsePacket(bytes::const_span bytes);
void ensureAvailableInBuffer(int amount);
@ -67,7 +66,7 @@ private:
void sendBuffer(mtpBuffer &&buffer);
QTcpSocket _socket;
std::unique_ptr<AbstractSocket> _socket;
bool _connectionStarted = false;
int _offsetBytes = 0;
@ -92,6 +91,9 @@ private:
int32 _port = 0;
crl::time _pingTime = 0;
rpl::lifetime _connectedLifetime;
rpl::lifetime _lifetime;
};
} // namespace internal

View file

@ -0,0 +1,23 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "mtproto/mtp_abstract_socket.h"
#include "mtproto/mtp_tcp_socket.h"
namespace MTP {
namespace internal {
std::unique_ptr<AbstractSocket> AbstractSocket::Create(
not_null<QThread*> thread,
const bytes::vector &secret,
const ProxyData &proxy) {
return std::make_unique<TcpSocket>(thread, proxy);
}
} // namespace internal
} // namespace MTP

View file

@ -0,0 +1,55 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
namespace MTP {
namespace internal {
class AbstractSocket : protected QObject {
public:
static std::unique_ptr<AbstractSocket> Create(
not_null<QThread*> thread,
const bytes::vector &secret,
const ProxyData &proxy);
explicit AbstractSocket(not_null<QThread*> thread) {
moveToThread(thread);
}
virtual ~AbstractSocket() = default;
[[nodiscard]] rpl::producer<> connected() const {
return _connected.events();
}
[[nodiscard]] rpl::producer<> disconnected() const {
return _disconnected.events();
}
[[nodiscard]] rpl::producer<> readyRead() const {
return _readyRead.events();
}
[[nodiscard]] rpl::producer<> error() const {
return _error.events();
}
virtual void connectToHost(const QString &address, int port) = 0;
[[nodiscard]] virtual bool isConnected() = 0;
[[nodiscard]] virtual int bytesAvailable() = 0;
[[nodiscard]] virtual int64 read(char *buffer, int64 maxLength) = 0;
virtual int64 write(const char *buffer, int64 length) = 0;
virtual int32 debugState() = 0;
protected:
rpl::event_stream<> _connected;
rpl::event_stream<> _disconnected;
rpl::event_stream<> _readyRead;
rpl::event_stream<> _error;
};
} // namespace internal
} // namespace MTP

View file

@ -0,0 +1,122 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "mtproto/mtp_tcp_socket.h"
namespace MTP {
namespace internal {
namespace {
using ErrorSignal = void(QTcpSocket::*)(QAbstractSocket::SocketError);
const auto QTcpSocket_error = ErrorSignal(&QAbstractSocket::error);
} // namespace
TcpSocket::TcpSocket(not_null<QThread*> thread, const ProxyData &proxy)
: AbstractSocket(thread) {
_socket.moveToThread(thread);
_socket.setProxy(ToNetworkProxy(proxy));
const auto wrap = [&](auto handler) {
return [=](auto &&...args) {
InvokeQueued(this, [=] { handler(args...); });
};
};
using Error = QAbstractSocket::SocketError;
connect(
&_socket,
&QTcpSocket::connected,
wrap([=] { _connected.fire({}); }));
connect(
&_socket,
&QTcpSocket::disconnected,
wrap([=] { _disconnected.fire({}); }));
connect(
&_socket,
&QTcpSocket::readyRead,
wrap([=] { _readyRead.fire({}); }));
connect(
&_socket,
QTcpSocket_error,
wrap([=](Error e) { logError(e); _error.fire({}); }));
}
TcpSocket::~TcpSocket() {
_socket.close();
}
void TcpSocket::connectToHost(const QString &address, int port) {
_socket.connectToHost(address, port);
}
bool TcpSocket::isConnected() {
return (_socket.state() == QAbstractSocket::ConnectedState);
}
int TcpSocket::bytesAvailable() {
return _socket.bytesAvailable();
}
int64 TcpSocket::read(char *buffer, int64 maxLength) {
return _socket.read(buffer, maxLength);
}
int64 TcpSocket::write(const char *buffer, int64 length) {
return _socket.write(buffer, length);
}
int32 TcpSocket::debugState() {
return _socket.state();
}
void TcpSocket::LogError(int errorCode, const QString &errorText) {
switch (errorCode) {
case QAbstractSocket::ConnectionRefusedError:
LOG(("TCP Error: socket connection refused - %1").arg(errorText));
break;
case QAbstractSocket::RemoteHostClosedError:
TCP_LOG(("TCP Info: remote host closed socket connection - %1"
).arg(errorText));
break;
case QAbstractSocket::HostNotFoundError:
LOG(("TCP Error: host not found - %1").arg(errorText));
break;
case QAbstractSocket::SocketTimeoutError:
LOG(("TCP Error: socket timeout - %1").arg(errorText));
break;
case QAbstractSocket::NetworkError:
LOG(("TCP Error: network - %1").arg(errorText));
break;
case QAbstractSocket::ProxyAuthenticationRequiredError:
case QAbstractSocket::ProxyConnectionRefusedError:
case QAbstractSocket::ProxyConnectionClosedError:
case QAbstractSocket::ProxyConnectionTimeoutError:
case QAbstractSocket::ProxyNotFoundError:
case QAbstractSocket::ProxyProtocolError:
LOG(("TCP Error: proxy (%1) - %2").arg(errorCode).arg(errorText));
break;
default:
LOG(("TCP Error: other (%1) - %2").arg(errorCode).arg(errorText));
break;
}
TCP_LOG(("TCP Error %1, restarting! - %2"
).arg(errorCode
).arg(errorText));
}
void TcpSocket::logError(int errorCode) {
LogError(errorCode, _socket.errorString());
}
} // namespace internal
} // namespace MTP

View file

@ -0,0 +1,38 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "mtproto/mtp_abstract_socket.h"
namespace MTP {
namespace internal {
class TcpSocket : public AbstractSocket {
public:
TcpSocket(not_null<QThread*> thread, const ProxyData &proxy);
~TcpSocket();
void connectToHost(const QString &address, int port) override;
bool isConnected() override;
int bytesAvailable() override;
int64 read(char *buffer, int64 maxLength) override;
int64 write(const char *buffer, int64 length) override;
int32 debugState() override;
static void LogError(int errorCode, const QString &errorText);
private:
void logError(int errorCode);
QTcpSocket _socket;
};
} // namespace internal
} // namespace MTP

View file

@ -531,6 +531,10 @@
<(src_loc)/mtproto/dedicated_file_loader.h
<(src_loc)/mtproto/facade.cpp
<(src_loc)/mtproto/facade.h
<(src_loc)/mtproto/mtp_abstract_socket.cpp
<(src_loc)/mtproto/mtp_abstract_socket.h
<(src_loc)/mtproto/mtp_tcp_socket.cpp
<(src_loc)/mtproto/mtp_tcp_socket.h
<(src_loc)/mtproto/mtp_instance.cpp
<(src_loc)/mtproto/mtp_instance.h
<(src_loc)/mtproto/rsa_public_key.cpp