diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp index 9b691f9d9..526d940b5 100644 --- a/Telegram/SourceFiles/data/data_document.cpp +++ b/Telegram/SourceFiles/data/data_document.cpp @@ -434,7 +434,7 @@ VoiceData::~VoiceData() { if (!waveform.isEmpty() && waveform[0] == -1 && waveform.size() > int32(sizeof(TaskId))) { - TaskId taskId = 0; + auto taskId = TaskId(); memcpy(&taskId, waveform.constData() + 1, sizeof(taskId)); Local::cancelTask(taskId); } @@ -866,7 +866,8 @@ void DocumentData::save( } } else { status = FileReady; - if (auto reader = owner().documentStreamedReader(this, origin)) { + auto reader = owner().documentStreamedReader(this, origin, true); + if (reader) { _loader = new Storage::StreamedFileDownloader( id, _dc, @@ -1043,26 +1044,24 @@ QString DocumentData::filepath(FilePathResolve resolve) const { bool DocumentData::isStickerSetInstalled() const { Expects(sticker() != nullptr); - const auto &set = sticker()->set; const auto &sets = _owner->stickerSets(); - switch (set.type()) { - case mtpc_inputStickerSetID: { - auto it = sets.constFind(set.c_inputStickerSetID().vid.v); - return (it != sets.cend()) - && !(it->flags & MTPDstickerSet::Flag::f_archived) - && (it->flags & MTPDstickerSet::Flag::f_installed_date); - } break; - case mtpc_inputStickerSetShortName: { - auto name = qs(set.c_inputStickerSetShortName().vshort_name).toLower(); - for (auto it = sets.cbegin(), e = sets.cend(); it != e; ++it) { - if (it->shortName.toLower() == name) { - return !(it->flags & MTPDstickerSet::Flag::f_archived) - && (it->flags & MTPDstickerSet::Flag::f_installed_date); + return sticker()->set.match([&](const MTPDinputStickerSetID &data) { + const auto i = sets.constFind(data.vid.v); + return (i != sets.cend()) + && !(i->flags & MTPDstickerSet::Flag::f_archived) + && (i->flags & MTPDstickerSet::Flag::f_installed_date); + }, [&](const MTPDinputStickerSetShortName &data) { + const auto name = qs(data.vshort_name).toLower(); + for (const auto &set : sets) { + if (set.shortName.toLower() == name) { + return !(set.flags & MTPDstickerSet::Flag::f_archived) + && (set.flags & MTPDstickerSet::Flag::f_installed_date); } } - } break; - } - return false; + return false; + }, [&](const MTPDinputStickerSetEmpty &) { + return false; + }); } Image *DocumentData::getReplyPreview(Data::FileOrigin origin) { @@ -1222,18 +1221,22 @@ bool DocumentData::inappPlaybackFailed() const { return _inappPlaybackFailed; } -auto DocumentData::createStreamingLoader(Data::FileOrigin origin) const +auto DocumentData::createStreamingLoader( + Data::FileOrigin origin, + bool forceRemoteLoader) const -> std::unique_ptr { if (!useStreamingLoader()) { return nullptr; } - const auto &location = this->location(true); - if (!data().isEmpty()) { - return Media::Streaming::MakeBytesLoader(data()); - } else if (!location.isEmpty() && location.accessEnable()) { - auto result = Media::Streaming::MakeFileLoader(location.name()); - location.accessDisable(); - return result; + if (!forceRemoteLoader) { + const auto &location = this->location(true); + if (!data().isEmpty()) { + return Media::Streaming::MakeBytesLoader(data()); + } else if (!location.isEmpty() && location.accessEnable()) { + auto result = Media::Streaming::MakeFileLoader(location.name()); + location.accessDisable(); + return result; + } } return hasRemoteLocation() ? std::make_unique( diff --git a/Telegram/SourceFiles/data/data_document.h b/Telegram/SourceFiles/data/data_document.h index 6d91f126b..bc35aff1b 100644 --- a/Telegram/SourceFiles/data/data_document.h +++ b/Telegram/SourceFiles/data/data_document.h @@ -215,8 +215,10 @@ public: [[nodiscard]] bool canBePlayed() const; [[nodiscard]] bool canBeStreamed() const; - [[nodiscard]] auto createStreamingLoader(Data::FileOrigin origin) const - -> std::unique_ptr; + [[nodiscard]] auto createStreamingLoader( + Data::FileOrigin origin, + bool forceRemoteLoader) const + -> std::unique_ptr; void setInappPlaybackFailed(); [[nodiscard]] bool inappPlaybackFailed() const; diff --git a/Telegram/SourceFiles/data/data_session.cpp b/Telegram/SourceFiles/data/data_session.cpp index f212d375d..debf2bdc4 100644 --- a/Telegram/SourceFiles/data/data_session.cpp +++ b/Telegram/SourceFiles/data/data_session.cpp @@ -1091,14 +1091,17 @@ void Session::requestDocumentViewRepaint( std::shared_ptr<::Media::Streaming::Reader> Session::documentStreamedReader( not_null document, - FileOrigin origin) { + FileOrigin origin, + bool forceRemoteLoader) { const auto i = _streamedReaders.find(document); if (i != end(_streamedReaders)) { if (auto result = i->second.lock()) { - return result; + if (!forceRemoteLoader || result->isRemoteLoader()) { + return result; + } } } - auto loader = document->createStreamingLoader(origin); + auto loader = document->createStreamingLoader(origin, forceRemoteLoader); if (!loader) { return nullptr; } @@ -1106,7 +1109,7 @@ std::shared_ptr<::Media::Streaming::Reader> Session::documentStreamedReader( this, std::move(loader)); if (!PruneDestroyedAndSet(_streamedReaders, document, result)) { - _streamedReaders.emplace(document, result); + _streamedReaders.emplace_or_assign(document, result); } return result; } diff --git a/Telegram/SourceFiles/data/data_session.h b/Telegram/SourceFiles/data/data_session.h index 23dd8d002..b84e66ea0 100644 --- a/Telegram/SourceFiles/data/data_session.h +++ b/Telegram/SourceFiles/data/data_session.h @@ -402,7 +402,8 @@ public: std::shared_ptr<::Media::Streaming::Reader> documentStreamedReader( not_null document, - FileOrigin origin); + FileOrigin origin, + bool forceRemoteLoader = false); HistoryItem *addNewMessage(const MTPMessage &data, NewMessageType type); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h index b9c0eb675..df476872b 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h @@ -7,6 +7,10 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #pragma once +namespace Storage { +class StreamedFileDownloader; +} // namespace Storage + namespace Media { namespace Streaming { @@ -35,6 +39,10 @@ public: // Parts will be sent from the main thread. [[nodiscard]] virtual rpl::producer parts() const = 0; + virtual void attachDownloader( + Storage::StreamedFileDownloader *downloader) = 0; + virtual void clearAttachedDownloader() = 0; + virtual ~Loader() = default; }; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp index a4319d7dd..951de9830 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp @@ -76,6 +76,15 @@ rpl::producer LoaderLocal::parts() const { return _parts.events(); } +void LoaderLocal::attachDownloader( + Storage::StreamedFileDownloader *downloader) { + Unexpected("Downloader attached to a local streaming loader."); +} + +void LoaderLocal::clearAttachedDownloader() { + Unexpected("Downloader detached from a local streaming loader."); +} + std::unique_ptr MakeFileLoader(const QString &path) { return std::make_unique(std::make_unique(path)); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h index 04be7271d..d741b18fc 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h @@ -32,6 +32,10 @@ public: // Parts will be sent from the main thread. [[nodiscard]] rpl::producer parts() const override; + void attachDownloader( + Storage::StreamedFileDownloader *downloader) override; + void clearAttachedDownloader() override; + private: void fail(); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp index 060217235..4b1fe4673 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "apiwrap.h" #include "auth_session.h" +#include "storage/streamed_file_downloader.h" #include "storage/cache/storage_cache_types.h" namespace Media { @@ -40,6 +41,14 @@ int LoaderMtproto::size() const { void LoaderMtproto::load(int offset) { crl::on_main(this, [=] { + if (_downloader) { + auto bytes = _downloader->readLoadedPart(offset); + if (!bytes.isEmpty()) { + cancelForOffset(offset); + _parts.fire({ offset, std::move(bytes) }); + return; + } + } if (_requests.contains(offset)) { return; } else if (_requested.add(offset)) { @@ -60,15 +69,28 @@ void LoaderMtproto::stop() { void LoaderMtproto::cancel(int offset) { crl::on_main(this, [=] { - if (const auto requestId = _requests.take(offset)) { - _sender.request(*requestId).cancel(); - sendNext(); - } else { - _requested.remove(offset); - } + cancelForOffset(offset); }); } +void LoaderMtproto::cancelForOffset(int offset) { + if (const auto requestId = _requests.take(offset)) { + _sender.request(*requestId).cancel(); + sendNext(); + } else { + _requested.remove(offset); + } +} + +void LoaderMtproto::attachDownloader( + Storage::StreamedFileDownloader *downloader) { + _downloader = downloader; +} + +void LoaderMtproto::clearAttachedDownloader() { + _downloader = nullptr; +} + void LoaderMtproto::increasePriority() { crl::on_main(this, [=] { _requested.increasePriority(); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h index f52a0bb98..5e21d3dcb 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -36,6 +36,10 @@ public: // Parts will be sent from the main thread. [[nodiscard]] rpl::producer parts() const override; + void attachDownloader( + Storage::StreamedFileDownloader *downloader) override; + void clearAttachedDownloader() override; + ~LoaderMtproto(); private: @@ -53,6 +57,7 @@ private: const QByteArray &encryptionKey, const QByteArray &encryptionIV, const QVector &hashes); + void cancelForOffset(int offset); const not_null _api; @@ -68,6 +73,8 @@ private: base::flat_map _requests; rpl::event_stream _parts; + Storage::StreamedFileDownloader *_downloader = nullptr; + }; } // namespace Streaming diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index 50507d444..d97e7e385 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -768,12 +768,12 @@ Reader::Reader( , _slices(_loader->size(), _cacheHelper != nullptr) { _loader->parts( ) | rpl::start_with_next([=](LoadedPart &&part) { - if (_downloaderAttached.load(std::memory_order_acquire)) { + if (_attachedDownloader) { _partsForDownloader.fire_copy(part); } - - _loadedParts.emplace(std::move(part)); - + if (_streamingActive) { + _loadedParts.emplace(std::move(part)); + } if (const auto waiting = _waiting.load(std::memory_order_acquire)) { _waiting.store(nullptr, std::memory_order_release); waiting->release(); @@ -811,6 +811,7 @@ void Reader::stopStreaming(bool stillActive) { _waiting.store(nullptr, std::memory_order_release); if (!stillActive) { _streamingActive = false; + _loadingOffsets.clear(); processDownloaderRequests(); } } @@ -819,8 +820,16 @@ rpl::producer Reader::partsForDownloader() const { return _partsForDownloader.events(); } -void Reader::loadForDownloader(int offset) { - _downloaderAttached.store(true, std::memory_order_release); +void Reader::loadForDownloader( + Storage::StreamedFileDownloader *downloader, + int offset) { + if (_attachedDownloader != downloader) { + if (_attachedDownloader) { + cancelForDownloader(_attachedDownloader); + } + _attachedDownloader = downloader; + _loader->attachDownloader(downloader); + } _downloaderOffsetRequests.emplace(offset); if (_streamingActive) { wakeFromSleep(); @@ -830,15 +839,18 @@ void Reader::loadForDownloader(int offset) { } void Reader::doneForDownloader(int offset) { - if (_downloaderOffsetsRequested.remove(offset) && !_streamingActive) { + _downloaderOffsetAcks.emplace(offset); + if (!_streamingActive) { processDownloaderRequests(); } } -void Reader::cancelForDownloader() { - if (_downloaderAttached.load(std::memory_order_acquire)) { +void Reader::cancelForDownloader( + Storage::StreamedFileDownloader *downloader) { + if (_attachedDownloader == downloader) { _downloaderOffsetRequests.take(); - _downloaderAttached.store(false, std::memory_order_release); + _attachedDownloader = nullptr; + _loader->clearAttachedDownloader(); } } @@ -920,6 +932,10 @@ void Reader::processDownloaderRequests() { return; } + for (const auto done : _downloaderOffsetAcks.take()) { + _downloaderOffsetsRequested.remove(done); + } + _offsetsForDownloader.pop_front(); if (_downloaderOffsetsRequested.emplace(offset).second) { _loader->load(offset); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 0918b8eb5..bb720c34b 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -12,6 +12,10 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "base/weak_ptr.h" #include "base/thread_safe_wrap.h" +namespace Storage { +class StreamedFileDownloader; +} // namespace Storage + namespace Storage { namespace Cache { struct Key; @@ -55,9 +59,11 @@ public: void startStreaming(); void stopStreaming(bool stillActive = false); [[nodiscard]] rpl::producer partsForDownloader() const; - void loadForDownloader(int offset); + void loadForDownloader( + Storage::StreamedFileDownloader *downloader, + int offset); void doneForDownloader(int offset); - void cancelForDownloader(); + void cancelForDownloader(Storage::StreamedFileDownloader *downloader); ~Reader(); @@ -215,8 +221,9 @@ private: // Even if streaming had failed, the Reader can work for the downloader. std::optional _streamingError; - std::atomic _downloaderAttached = false; + Storage::StreamedFileDownloader *_attachedDownloader = nullptr; base::thread_safe_queue _downloaderOffsetRequests; + base::thread_safe_queue _downloaderOffsetAcks; std::deque _offsetsForDownloader; base::flat_set _downloaderOffsetsRequested; int _downloaderSliceNumber = 0; // > 0 means we want it from cache. diff --git a/Telegram/SourceFiles/storage/file_download.cpp b/Telegram/SourceFiles/storage/file_download.cpp index 951b076d6..c874fb474 100644 --- a/Telegram/SourceFiles/storage/file_download.cpp +++ b/Telegram/SourceFiles/storage/file_download.cpp @@ -179,6 +179,7 @@ void FileLoader::finishWithBytes(const QByteArray &data) { cancel(true); return; } + _file.seek(0); if (_file.write(_data) != qint64(_data.size())) { cancel(true); return; @@ -534,42 +535,66 @@ int FileLoader::currentOffset() const { bool FileLoader::writeResultPart(int offset, bytes::const_span buffer) { Expects(!_finished); - if (!buffer.empty()) { - if (_fileIsOpen) { - auto fsize = _file.size(); - if (offset < fsize) { - _skippedBytes -= buffer.size(); - } else if (offset > fsize) { - _skippedBytes += offset - fsize; - } - _file.seek(offset); - if (_file.write(reinterpret_cast(buffer.data()), buffer.size()) != qint64(buffer.size())) { - cancel(true); - return false; - } - } else { - _data.reserve(offset + buffer.size()); - if (offset > _data.size()) { - _skippedBytes += offset - _data.size(); - _data.resize(offset); - } - if (offset == _data.size()) { - _data.append(reinterpret_cast(buffer.data()), buffer.size()); - } else { - _skippedBytes -= buffer.size(); - if (int64(offset + buffer.size()) > _data.size()) { - _data.resize(offset + buffer.size()); - } - const auto dst = bytes::make_detached_span(_data).subspan( - offset, - buffer.size()); - bytes::copy(dst, buffer); - } + if (buffer.empty()) { + return true; + } + if (_fileIsOpen) { + auto fsize = _file.size(); + if (offset < fsize) { + _skippedBytes -= buffer.size(); + } else if (offset > fsize) { + _skippedBytes += offset - fsize; } + _file.seek(offset); + if (_file.write(reinterpret_cast(buffer.data()), buffer.size()) != qint64(buffer.size())) { + cancel(true); + return false; + } + return true; + } + _data.reserve(offset + buffer.size()); + if (offset > _data.size()) { + _skippedBytes += offset - _data.size(); + _data.resize(offset); + } + if (offset == _data.size()) { + _data.append(reinterpret_cast(buffer.data()), buffer.size()); + } else { + _skippedBytes -= buffer.size(); + if (int64(offset + buffer.size()) > _data.size()) { + _data.resize(offset + buffer.size()); + } + const auto dst = bytes::make_detached_span(_data).subspan( + offset, + buffer.size()); + bytes::copy(dst, buffer); } return true; } +QByteArray FileLoader::readLoadedPartBack(int offset, int size) { + Expects(offset >= 0 && size > 0); + + if (_fileIsOpen) { + if (_file.openMode() == QIODevice::WriteOnly) { + _file.close(); + _fileIsOpen = _file.open(QIODevice::ReadWrite); + if (!_fileIsOpen) { + cancel(true); + return QByteArray(); + } + } + if (!_file.seek(offset)) { + return QByteArray(); + } + auto result = _file.read(size); + return (result.size() == size) ? result : QByteArray(); + } + return (offset + size <= _data.size()) + ? _data.mid(offset, size) + : QByteArray(); +} + bool FileLoader::finalizeResult() { Expects(!_finished); @@ -577,6 +602,7 @@ bool FileLoader::finalizeResult() { if (!_fileIsOpen) { _fileIsOpen = _file.open(QIODevice::WriteOnly); } + _file.seek(0); if (!_fileIsOpen || _file.write(_data) != qint64(_data.size())) { cancel(true); return false; diff --git a/Telegram/SourceFiles/storage/file_download.h b/Telegram/SourceFiles/storage/file_download.h index 9ffbce4a0..2727e87b5 100644 --- a/Telegram/SourceFiles/storage/file_download.h +++ b/Telegram/SourceFiles/storage/file_download.h @@ -186,6 +186,7 @@ protected: bool writeResultPart(int offset, bytes::const_span buffer); bool finalizeResult(); + [[nodiscard]] QByteArray readLoadedPartBack(int offset, int size); not_null _downloader; FileLoader *_prev = nullptr; diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp index 0c9e2d4ea..8edd0baa0 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp @@ -79,6 +79,16 @@ void StreamedFileDownloader::stop() { cancelRequests(); } +QByteArray StreamedFileDownloader::readLoadedPart(int offset) { + Expects(offset >= 0 && offset < _size); + Expects(!(offset % kPartSize)); + + const auto index = (offset / kPartSize); + return _partIsSaved[index] + ? readLoadedPartBack(offset, kPartSize) + : QByteArray(); +} + Storage::Cache::Key StreamedFileDownloader::cacheKey() const { return _cacheKey; } @@ -96,7 +106,7 @@ void StreamedFileDownloader::cancelRequests() { _partsRequested = 0; _nextPartIndex = 0; - _reader->cancelForDownloader(); + _reader->cancelForDownloader(this); } bool StreamedFileDownloader::loadPart() { @@ -113,7 +123,7 @@ bool StreamedFileDownloader::loadPart() { return false; } _nextPartIndex = index + 1; - _reader->loadForDownloader(index * kPartSize); + _reader->loadForDownloader(this, index * kPartSize); AssertIsDebug(); //_downloader->requestedAmountIncrement( diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.h b/Telegram/SourceFiles/storage/streamed_file_downloader.h index 14e899b6c..a223b7f5f 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.h +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.h @@ -43,6 +43,8 @@ public: Data::FileOrigin fileOrigin() const override; void stop() override; + QByteArray readLoadedPart(int offset); + private: Cache::Key cacheKey() const override; std::optional fileLocationKey() const override;