Use ready parts from loader in streaming.

This commit is contained in:
John Preston 2019-04-12 13:25:00 +04:00
parent 8704f6efd0
commit cca906d383
15 changed files with 207 additions and 86 deletions

View file

@ -434,7 +434,7 @@ VoiceData::~VoiceData() {
if (!waveform.isEmpty()
&& waveform[0] == -1
&& waveform.size() > int32(sizeof(TaskId))) {
TaskId taskId = 0;
auto taskId = TaskId();
memcpy(&taskId, waveform.constData() + 1, sizeof(taskId));
Local::cancelTask(taskId);
}
@ -866,7 +866,8 @@ void DocumentData::save(
}
} else {
status = FileReady;
if (auto reader = owner().documentStreamedReader(this, origin)) {
auto reader = owner().documentStreamedReader(this, origin, true);
if (reader) {
_loader = new Storage::StreamedFileDownloader(
id,
_dc,
@ -1043,26 +1044,24 @@ QString DocumentData::filepath(FilePathResolve resolve) const {
bool DocumentData::isStickerSetInstalled() const {
Expects(sticker() != nullptr);
const auto &set = sticker()->set;
const auto &sets = _owner->stickerSets();
switch (set.type()) {
case mtpc_inputStickerSetID: {
auto it = sets.constFind(set.c_inputStickerSetID().vid.v);
return (it != sets.cend())
&& !(it->flags & MTPDstickerSet::Flag::f_archived)
&& (it->flags & MTPDstickerSet::Flag::f_installed_date);
} break;
case mtpc_inputStickerSetShortName: {
auto name = qs(set.c_inputStickerSetShortName().vshort_name).toLower();
for (auto it = sets.cbegin(), e = sets.cend(); it != e; ++it) {
if (it->shortName.toLower() == name) {
return !(it->flags & MTPDstickerSet::Flag::f_archived)
&& (it->flags & MTPDstickerSet::Flag::f_installed_date);
return sticker()->set.match([&](const MTPDinputStickerSetID &data) {
const auto i = sets.constFind(data.vid.v);
return (i != sets.cend())
&& !(i->flags & MTPDstickerSet::Flag::f_archived)
&& (i->flags & MTPDstickerSet::Flag::f_installed_date);
}, [&](const MTPDinputStickerSetShortName &data) {
const auto name = qs(data.vshort_name).toLower();
for (const auto &set : sets) {
if (set.shortName.toLower() == name) {
return !(set.flags & MTPDstickerSet::Flag::f_archived)
&& (set.flags & MTPDstickerSet::Flag::f_installed_date);
}
}
} break;
}
return false;
return false;
}, [&](const MTPDinputStickerSetEmpty &) {
return false;
});
}
Image *DocumentData::getReplyPreview(Data::FileOrigin origin) {
@ -1222,18 +1221,22 @@ bool DocumentData::inappPlaybackFailed() const {
return _inappPlaybackFailed;
}
auto DocumentData::createStreamingLoader(Data::FileOrigin origin) const
auto DocumentData::createStreamingLoader(
Data::FileOrigin origin,
bool forceRemoteLoader) const
-> std::unique_ptr<Media::Streaming::Loader> {
if (!useStreamingLoader()) {
return nullptr;
}
const auto &location = this->location(true);
if (!data().isEmpty()) {
return Media::Streaming::MakeBytesLoader(data());
} else if (!location.isEmpty() && location.accessEnable()) {
auto result = Media::Streaming::MakeFileLoader(location.name());
location.accessDisable();
return result;
if (!forceRemoteLoader) {
const auto &location = this->location(true);
if (!data().isEmpty()) {
return Media::Streaming::MakeBytesLoader(data());
} else if (!location.isEmpty() && location.accessEnable()) {
auto result = Media::Streaming::MakeFileLoader(location.name());
location.accessDisable();
return result;
}
}
return hasRemoteLocation()
? std::make_unique<Media::Streaming::LoaderMtproto>(

View file

@ -215,8 +215,10 @@ public:
[[nodiscard]] bool canBePlayed() const;
[[nodiscard]] bool canBeStreamed() const;
[[nodiscard]] auto createStreamingLoader(Data::FileOrigin origin) const
-> std::unique_ptr<Media::Streaming::Loader>;
[[nodiscard]] auto createStreamingLoader(
Data::FileOrigin origin,
bool forceRemoteLoader) const
-> std::unique_ptr<Media::Streaming::Loader>;
void setInappPlaybackFailed();
[[nodiscard]] bool inappPlaybackFailed() const;

View file

@ -1091,14 +1091,17 @@ void Session::requestDocumentViewRepaint(
std::shared_ptr<::Media::Streaming::Reader> Session::documentStreamedReader(
not_null<DocumentData*> document,
FileOrigin origin) {
FileOrigin origin,
bool forceRemoteLoader) {
const auto i = _streamedReaders.find(document);
if (i != end(_streamedReaders)) {
if (auto result = i->second.lock()) {
return result;
if (!forceRemoteLoader || result->isRemoteLoader()) {
return result;
}
}
}
auto loader = document->createStreamingLoader(origin);
auto loader = document->createStreamingLoader(origin, forceRemoteLoader);
if (!loader) {
return nullptr;
}
@ -1106,7 +1109,7 @@ std::shared_ptr<::Media::Streaming::Reader> Session::documentStreamedReader(
this,
std::move(loader));
if (!PruneDestroyedAndSet(_streamedReaders, document, result)) {
_streamedReaders.emplace(document, result);
_streamedReaders.emplace_or_assign(document, result);
}
return result;
}

View file

@ -402,7 +402,8 @@ public:
std::shared_ptr<::Media::Streaming::Reader> documentStreamedReader(
not_null<DocumentData*> document,
FileOrigin origin);
FileOrigin origin,
bool forceRemoteLoader = false);
HistoryItem *addNewMessage(const MTPMessage &data, NewMessageType type);

View file

@ -7,6 +7,10 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
namespace Storage {
class StreamedFileDownloader;
} // namespace Storage
namespace Media {
namespace Streaming {
@ -35,6 +39,10 @@ public:
// Parts will be sent from the main thread.
[[nodiscard]] virtual rpl::producer<LoadedPart> parts() const = 0;
virtual void attachDownloader(
Storage::StreamedFileDownloader *downloader) = 0;
virtual void clearAttachedDownloader() = 0;
virtual ~Loader() = default;
};

View file

@ -76,6 +76,15 @@ rpl::producer<LoadedPart> LoaderLocal::parts() const {
return _parts.events();
}
void LoaderLocal::attachDownloader(
Storage::StreamedFileDownloader *downloader) {
Unexpected("Downloader attached to a local streaming loader.");
}
void LoaderLocal::clearAttachedDownloader() {
Unexpected("Downloader detached from a local streaming loader.");
}
std::unique_ptr<LoaderLocal> MakeFileLoader(const QString &path) {
return std::make_unique<LoaderLocal>(std::make_unique<QFile>(path));
}

View file

@ -32,6 +32,10 @@ public:
// Parts will be sent from the main thread.
[[nodiscard]] rpl::producer<LoadedPart> parts() const override;
void attachDownloader(
Storage::StreamedFileDownloader *downloader) override;
void clearAttachedDownloader() override;
private:
void fail();

View file

@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "apiwrap.h"
#include "auth_session.h"
#include "storage/streamed_file_downloader.h"
#include "storage/cache/storage_cache_types.h"
namespace Media {
@ -40,6 +41,14 @@ int LoaderMtproto::size() const {
void LoaderMtproto::load(int offset) {
crl::on_main(this, [=] {
if (_downloader) {
auto bytes = _downloader->readLoadedPart(offset);
if (!bytes.isEmpty()) {
cancelForOffset(offset);
_parts.fire({ offset, std::move(bytes) });
return;
}
}
if (_requests.contains(offset)) {
return;
} else if (_requested.add(offset)) {
@ -60,15 +69,28 @@ void LoaderMtproto::stop() {
void LoaderMtproto::cancel(int offset) {
crl::on_main(this, [=] {
if (const auto requestId = _requests.take(offset)) {
_sender.request(*requestId).cancel();
sendNext();
} else {
_requested.remove(offset);
}
cancelForOffset(offset);
});
}
void LoaderMtproto::cancelForOffset(int offset) {
if (const auto requestId = _requests.take(offset)) {
_sender.request(*requestId).cancel();
sendNext();
} else {
_requested.remove(offset);
}
}
void LoaderMtproto::attachDownloader(
Storage::StreamedFileDownloader *downloader) {
_downloader = downloader;
}
void LoaderMtproto::clearAttachedDownloader() {
_downloader = nullptr;
}
void LoaderMtproto::increasePriority() {
crl::on_main(this, [=] {
_requested.increasePriority();

View file

@ -36,6 +36,10 @@ public:
// Parts will be sent from the main thread.
[[nodiscard]] rpl::producer<LoadedPart> parts() const override;
void attachDownloader(
Storage::StreamedFileDownloader *downloader) override;
void clearAttachedDownloader() override;
~LoaderMtproto();
private:
@ -53,6 +57,7 @@ private:
const QByteArray &encryptionKey,
const QByteArray &encryptionIV,
const QVector<MTPFileHash> &hashes);
void cancelForOffset(int offset);
const not_null<ApiWrap*> _api;
@ -68,6 +73,8 @@ private:
base::flat_map<int, mtpRequestId> _requests;
rpl::event_stream<LoadedPart> _parts;
Storage::StreamedFileDownloader *_downloader = nullptr;
};
} // namespace Streaming

View file

@ -768,12 +768,12 @@ Reader::Reader(
, _slices(_loader->size(), _cacheHelper != nullptr) {
_loader->parts(
) | rpl::start_with_next([=](LoadedPart &&part) {
if (_downloaderAttached.load(std::memory_order_acquire)) {
if (_attachedDownloader) {
_partsForDownloader.fire_copy(part);
}
_loadedParts.emplace(std::move(part));
if (_streamingActive) {
_loadedParts.emplace(std::move(part));
}
if (const auto waiting = _waiting.load(std::memory_order_acquire)) {
_waiting.store(nullptr, std::memory_order_release);
waiting->release();
@ -811,6 +811,7 @@ void Reader::stopStreaming(bool stillActive) {
_waiting.store(nullptr, std::memory_order_release);
if (!stillActive) {
_streamingActive = false;
_loadingOffsets.clear();
processDownloaderRequests();
}
}
@ -819,8 +820,16 @@ rpl::producer<LoadedPart> Reader::partsForDownloader() const {
return _partsForDownloader.events();
}
void Reader::loadForDownloader(int offset) {
_downloaderAttached.store(true, std::memory_order_release);
void Reader::loadForDownloader(
Storage::StreamedFileDownloader *downloader,
int offset) {
if (_attachedDownloader != downloader) {
if (_attachedDownloader) {
cancelForDownloader(_attachedDownloader);
}
_attachedDownloader = downloader;
_loader->attachDownloader(downloader);
}
_downloaderOffsetRequests.emplace(offset);
if (_streamingActive) {
wakeFromSleep();
@ -830,15 +839,18 @@ void Reader::loadForDownloader(int offset) {
}
void Reader::doneForDownloader(int offset) {
if (_downloaderOffsetsRequested.remove(offset) && !_streamingActive) {
_downloaderOffsetAcks.emplace(offset);
if (!_streamingActive) {
processDownloaderRequests();
}
}
void Reader::cancelForDownloader() {
if (_downloaderAttached.load(std::memory_order_acquire)) {
void Reader::cancelForDownloader(
Storage::StreamedFileDownloader *downloader) {
if (_attachedDownloader == downloader) {
_downloaderOffsetRequests.take();
_downloaderAttached.store(false, std::memory_order_release);
_attachedDownloader = nullptr;
_loader->clearAttachedDownloader();
}
}
@ -920,6 +932,10 @@ void Reader::processDownloaderRequests() {
return;
}
for (const auto done : _downloaderOffsetAcks.take()) {
_downloaderOffsetsRequested.remove(done);
}
_offsetsForDownloader.pop_front();
if (_downloaderOffsetsRequested.emplace(offset).second) {
_loader->load(offset);

View file

@ -12,6 +12,10 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "base/weak_ptr.h"
#include "base/thread_safe_wrap.h"
namespace Storage {
class StreamedFileDownloader;
} // namespace Storage
namespace Storage {
namespace Cache {
struct Key;
@ -55,9 +59,11 @@ public:
void startStreaming();
void stopStreaming(bool stillActive = false);
[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
void loadForDownloader(int offset);
void loadForDownloader(
Storage::StreamedFileDownloader *downloader,
int offset);
void doneForDownloader(int offset);
void cancelForDownloader();
void cancelForDownloader(Storage::StreamedFileDownloader *downloader);
~Reader();
@ -215,8 +221,9 @@ private:
// Even if streaming had failed, the Reader can work for the downloader.
std::optional<Error> _streamingError;
std::atomic<bool> _downloaderAttached = false;
Storage::StreamedFileDownloader *_attachedDownloader = nullptr;
base::thread_safe_queue<int> _downloaderOffsetRequests;
base::thread_safe_queue<int> _downloaderOffsetAcks;
std::deque<int> _offsetsForDownloader;
base::flat_set<int> _downloaderOffsetsRequested;
int _downloaderSliceNumber = 0; // > 0 means we want it from cache.

View file

@ -179,6 +179,7 @@ void FileLoader::finishWithBytes(const QByteArray &data) {
cancel(true);
return;
}
_file.seek(0);
if (_file.write(_data) != qint64(_data.size())) {
cancel(true);
return;
@ -534,42 +535,66 @@ int FileLoader::currentOffset() const {
bool FileLoader::writeResultPart(int offset, bytes::const_span buffer) {
Expects(!_finished);
if (!buffer.empty()) {
if (_fileIsOpen) {
auto fsize = _file.size();
if (offset < fsize) {
_skippedBytes -= buffer.size();
} else if (offset > fsize) {
_skippedBytes += offset - fsize;
}
_file.seek(offset);
if (_file.write(reinterpret_cast<const char*>(buffer.data()), buffer.size()) != qint64(buffer.size())) {
cancel(true);
return false;
}
} else {
_data.reserve(offset + buffer.size());
if (offset > _data.size()) {
_skippedBytes += offset - _data.size();
_data.resize(offset);
}
if (offset == _data.size()) {
_data.append(reinterpret_cast<const char*>(buffer.data()), buffer.size());
} else {
_skippedBytes -= buffer.size();
if (int64(offset + buffer.size()) > _data.size()) {
_data.resize(offset + buffer.size());
}
const auto dst = bytes::make_detached_span(_data).subspan(
offset,
buffer.size());
bytes::copy(dst, buffer);
}
if (buffer.empty()) {
return true;
}
if (_fileIsOpen) {
auto fsize = _file.size();
if (offset < fsize) {
_skippedBytes -= buffer.size();
} else if (offset > fsize) {
_skippedBytes += offset - fsize;
}
_file.seek(offset);
if (_file.write(reinterpret_cast<const char*>(buffer.data()), buffer.size()) != qint64(buffer.size())) {
cancel(true);
return false;
}
return true;
}
_data.reserve(offset + buffer.size());
if (offset > _data.size()) {
_skippedBytes += offset - _data.size();
_data.resize(offset);
}
if (offset == _data.size()) {
_data.append(reinterpret_cast<const char*>(buffer.data()), buffer.size());
} else {
_skippedBytes -= buffer.size();
if (int64(offset + buffer.size()) > _data.size()) {
_data.resize(offset + buffer.size());
}
const auto dst = bytes::make_detached_span(_data).subspan(
offset,
buffer.size());
bytes::copy(dst, buffer);
}
return true;
}
QByteArray FileLoader::readLoadedPartBack(int offset, int size) {
Expects(offset >= 0 && size > 0);
if (_fileIsOpen) {
if (_file.openMode() == QIODevice::WriteOnly) {
_file.close();
_fileIsOpen = _file.open(QIODevice::ReadWrite);
if (!_fileIsOpen) {
cancel(true);
return QByteArray();
}
}
if (!_file.seek(offset)) {
return QByteArray();
}
auto result = _file.read(size);
return (result.size() == size) ? result : QByteArray();
}
return (offset + size <= _data.size())
? _data.mid(offset, size)
: QByteArray();
}
bool FileLoader::finalizeResult() {
Expects(!_finished);
@ -577,6 +602,7 @@ bool FileLoader::finalizeResult() {
if (!_fileIsOpen) {
_fileIsOpen = _file.open(QIODevice::WriteOnly);
}
_file.seek(0);
if (!_fileIsOpen || _file.write(_data) != qint64(_data.size())) {
cancel(true);
return false;

View file

@ -186,6 +186,7 @@ protected:
bool writeResultPart(int offset, bytes::const_span buffer);
bool finalizeResult();
[[nodiscard]] QByteArray readLoadedPartBack(int offset, int size);
not_null<Storage::Downloader*> _downloader;
FileLoader *_prev = nullptr;

View file

@ -79,6 +79,16 @@ void StreamedFileDownloader::stop() {
cancelRequests();
}
QByteArray StreamedFileDownloader::readLoadedPart(int offset) {
Expects(offset >= 0 && offset < _size);
Expects(!(offset % kPartSize));
const auto index = (offset / kPartSize);
return _partIsSaved[index]
? readLoadedPartBack(offset, kPartSize)
: QByteArray();
}
Storage::Cache::Key StreamedFileDownloader::cacheKey() const {
return _cacheKey;
}
@ -96,7 +106,7 @@ void StreamedFileDownloader::cancelRequests() {
_partsRequested = 0;
_nextPartIndex = 0;
_reader->cancelForDownloader();
_reader->cancelForDownloader(this);
}
bool StreamedFileDownloader::loadPart() {
@ -113,7 +123,7 @@ bool StreamedFileDownloader::loadPart() {
return false;
}
_nextPartIndex = index + 1;
_reader->loadForDownloader(index * kPartSize);
_reader->loadForDownloader(this, index * kPartSize);
AssertIsDebug();
//_downloader->requestedAmountIncrement(

View file

@ -43,6 +43,8 @@ public:
Data::FileOrigin fileOrigin() const override;
void stop() override;
QByteArray readLoadedPart(int offset);
private:
Cache::Key cacheKey() const override;
std::optional<MediaKey> fileLocationKey() const override;