Basic code for media streaming + testing on music.

This commit is contained in:
John Preston 2019-02-13 21:10:18 +03:00
parent dc95756ec9
commit 473e30e594
25 changed files with 1573 additions and 26 deletions

View file

@ -28,6 +28,10 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "mainwindow.h"
#include "core/application.h"
// #TODO streaming
#include "media/streaming/media_streaming_file.h"
#include "media/streaming/media_streaming_loader_mtproto.h"
namespace {
constexpr auto kMemoryForCache = 32 * 1024 * 1024;
@ -303,6 +307,24 @@ void DocumentOpenClickHandler::Open(
return;
}
}
if (playMusic || playVideo) {
AssertIsDebug();
if (auto loader = data->createStreamingLoader(origin)) {
static auto file = std::unique_ptr<Media::Streaming::File>();
file = std::make_unique<Media::Streaming::File>(
&data->owner(),
std::move(loader));
data->session().lifetime().add([] {
file = nullptr;
});
file->start(
(playMusic
? Media::Streaming::Mode::Audio
: Media::Streaming::Mode::Video),
0);
}
return;
}
if (!location.isEmpty() || (!data->data().isEmpty() && (playVoice || playMusic || playVideo || playAnimation))) {
using State = Media::Player::State;
if (playVoice) {
@ -1286,6 +1308,21 @@ bool DocumentData::hasRemoteLocation() const {
return (_dc != 0 && _access != 0);
}
auto DocumentData::createStreamingLoader(Data::FileOrigin origin) const
-> std::unique_ptr<Media::Streaming::Loader> {
return hasRemoteLocation()
? std::make_unique<Media::Streaming::LoaderMtproto>(
&session().api(),
_dc,
MTP_inputDocumentFileLocation(
MTP_long(id),
MTP_long(_access),
MTP_bytes(_fileReference)),
size,
origin)
: nullptr;
}
bool DocumentData::hasWebLocation() const {
return _urlLocation.dc() != 0 && _urlLocation.accessHash() != 0;
}

View file

@ -20,6 +20,12 @@ struct Key;
} // namespace Cache
} // namespace Storage
namespace Media {
namespace Streaming {
class Loader;
} // namespace Streaming
} // namespace Media
namespace Data {
class Session;
} // namespace Data
@ -217,6 +223,9 @@ public:
const QString &songPerformer);
[[nodiscard]] QString composeNameString() const;
[[nodiscard]] auto createStreamingLoader(Data::FileOrigin origin) const
-> std::unique_ptr<Media::Streaming::Loader>;
~DocumentData();
DocumentId id = 0;

View file

@ -173,6 +173,13 @@ bool ReplyPreview::empty() const {
} // namespace Data
AudioMsgId AudioMsgId::ForVideo() {
auto result = AudioMsgId();
result._playId = rand_value<uint32>();
result._type = Type::Video;
return result;
}
void AudioMsgId::setTypeFromAudio() {
if (_audio->isVoiceMessage() || _audio->isVideoMessage()) {
_type = Type::Voice;

View file

@ -369,6 +369,7 @@ public:
, _playId(playId) {
setTypeFromAudio();
}
[[nodiscard]] static AudioMsgId ForVideo();
Type type() const {
return _type;

View file

@ -772,8 +772,8 @@ void Mixer::play(
}
}
void Mixer::feedFromVideo(VideoSoundPart &&part) {
_loader->feedFromVideo(std::move(part));
void Mixer::feedFromVideo(const VideoSoundPart &part) {
_loader->feedFromVideo(part);
}
crl::time Mixer::getVideoCorrectedTime(const AudioMsgId &audio, crl::time frameMs, crl::time systemMs) {

View file

@ -120,7 +120,7 @@ public:
void stop(const AudioMsgId &audio, State state);
// Video player audio stream interface.
void feedFromVideo(VideoSoundPart &&part);
void feedFromVideo(const VideoSoundPart &part);
crl::time getVideoCorrectedTime(
const AudioMsgId &id,
crl::time frameMs,

View file

@ -21,7 +21,7 @@ Loaders::Loaders(QThread *thread) : _fromVideoNotify([this] { videoSoundAdded();
connect(thread, SIGNAL(finished()), this, SLOT(deleteLater()));
}
void Loaders::feedFromVideo(VideoSoundPart &&part) {
void Loaders::feedFromVideo(const VideoSoundPart &part) {
auto invoke = false;
{
QMutexLocker lock(&_fromVideoMutex);

View file

@ -21,7 +21,7 @@ class Loaders : public QObject {
public:
Loaders(QThread *thread);
void feedFromVideo(VideoSoundPart &&part);
void feedFromVideo(const VideoSoundPart &part);
~Loaders();
signals:

View file

@ -17,15 +17,14 @@ struct VideoSoundData {
};
struct VideoSoundPart {
AVPacket *packet = nullptr;
const AVPacket *packet = nullptr;
AudioMsgId audio;
uint32 playId = 0;
};
namespace FFMpeg {
// AVPacket has a deprecated field, so when you copy an AVPacket
// variable (e.g. inside QQueue), a compile warning is emited.
// variable (e.g. inside QQueue), a compile warning is emitted.
// We wrap full AVPacket data in a new AVPacketDataWrap struct.
// All other fields are copied from AVPacket without modifications.
struct AVPacketDataWrap {

View file

@ -47,9 +47,6 @@ bool isAlignedImage(const QImage &image) {
FFMpegReaderImplementation::FFMpegReaderImplementation(FileLocation *location, QByteArray *data, const AudioMsgId &audio) : ReaderImplementation(location, data)
, _audioMsgId(audio) {
_frame = av_frame_alloc();
av_init_packet(&_packetNull);
_packetNull.data = nullptr;
_packetNull.size = 0;
}
ReaderImplementation::ReadResult FFMpegReaderImplementation::readNextFrame() {
@ -317,7 +314,6 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) {
LOG(("Gif Error: Unable to av_find_best_stream %1, error %2, %3").arg(logData()).arg(_streamId).arg(av_make_error_string(err, sizeof(err), _streamId)));
return false;
}
_packetNull.stream_index = _streamId;
auto rotateTag = av_dict_get(_fmtContext->streams[_streamId]->metadata, "rotate", NULL, 0);
if (rotateTag && *rotateTag->value) {
@ -341,7 +337,7 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) {
av_codec_set_pkt_timebase(_codecContext, _fmtContext->streams[_streamId]->time_base);
av_opt_set_int(_codecContext, "refcounted_frames", 1, 0);
_codec = avcodec_find_decoder(_codecContext->codec_id);
const auto codec = avcodec_find_decoder(_codecContext->codec_id);
_audioStreamId = av_find_best_stream(_fmtContext, AVMEDIA_TYPE_AUDIO, -1, -1, 0, 0);
if (_mode == Mode::Inspecting) {
@ -351,7 +347,7 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) {
_audioStreamId = -1;
}
if ((res = avcodec_open2(_codecContext, _codec, 0)) < 0) {
if ((res = avcodec_open2(_codecContext, codec, 0)) < 0) {
LOG(("Gif Error: Unable to avcodec_open2 %1, error %2, %3").arg(logData()).arg(res).arg(av_make_error_string(err, sizeof(err), res)));
return false;
}
@ -370,7 +366,7 @@ bool FFMpegReaderImplementation::start(Mode mode, crl::time &positionMs) {
av_codec_set_pkt_timebase(audioContext, _fmtContext->streams[_audioStreamId]->time_base);
av_opt_set_int(audioContext, "refcounted_frames", 1, 0);
auto audioCodec = avcodec_find_decoder(audioContext->codec_id);
const auto audioCodec = avcodec_find_decoder(audioContext->codec_id);
if ((res = avcodec_open2(audioContext, audioCodec, 0)) < 0) {
avcodec_free_context(&audioContext);
LOG(("Gif Error: Unable to avcodec_open2 %1, error %2, %3").arg(logData()).arg(res).arg(av_make_error_string(err, sizeof(err), res)));
@ -490,10 +486,11 @@ FFMpegReaderImplementation::PacketResult FFMpegReaderImplementation::readPacket(
if (res == AVERROR_EOF) {
if (_audioStreamId >= 0) {
// queue terminating packet to audio player
VideoSoundPart part;
part.packet = &_packetNull;
part.audio = _audioMsgId;
Player::mixer()->feedFromVideo(std::move(part));
auto drain = AVPacket();
av_init_packet(&drain);
drain.data = nullptr;
drain.size = 0;
Player::mixer()->feedFromVideo({ &drain, _audioMsgId });
}
return PacketResult::EndOfFile;
}
@ -516,10 +513,7 @@ void FFMpegReaderImplementation::processPacket(AVPacket *packet) {
_lastReadAudioMs = countPacketMs(packet);
// queue packet to audio player
VideoSoundPart part;
part.packet = packet;
part.audio = _audioMsgId;
Player::mixer()->feedFromVideo(std::move(part));
Player::mixer()->feedFromVideo({ packet, _audioMsgId });
}
} else {
av_packet_unref(packet);

View file

@ -84,7 +84,6 @@ private:
uchar *_ioBuffer = nullptr;
AVIOContext *_ioContext = nullptr;
AVFormatContext *_fmtContext = nullptr;
AVCodec *_codec = nullptr;
AVCodecContext *_codecContext = nullptr;
int _streamId = 0;
AVFrame *_frame = nullptr;
@ -100,7 +99,6 @@ private:
crl::time _lastReadAudioMs = 0;
QQueue<FFMpeg::AVPacketDataWrap> _packetQueue;
AVPacket _packetNull; // for final decoding
int _packetStartedSize = 0;
uint8_t *_packetStartedData = nullptr;
bool _packetStarted = false;

View file

@ -0,0 +1,175 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "media/streaming/media_streaming_common.h"
extern "C" {
#include <libavutil/opt.h>
} // extern "C"
namespace Media {
namespace Streaming {
namespace {
constexpr int kSkipInvalidDataPackets = 10;
} // namespace
void LogError(QLatin1String method) {
LOG(("Streaming Error: Error in %1.").arg(method));
}
void LogError(QLatin1String method, AvErrorWrap error) {
LOG(("Streaming Error: Error in %1 (code: %2, text: %3)."
).arg(method
).arg(error.code()
).arg(error.text()));
}
crl::time PtsToTime(int64_t pts, const AVRational &timeBase) {
return (pts == AV_NOPTS_VALUE)
? Information::kDurationUnknown
: ((pts * 1000LL * timeBase.num) / timeBase.den);
}
std::optional<AvErrorWrap> ReadNextFrame(Stream &stream) {
Expects(stream.frame != nullptr);
auto error = AvErrorWrap();
if (stream.frame->data) {
av_frame_unref(stream.frame.get());
}
do {
error = avcodec_receive_frame(stream.codec, stream.frame.get());
if (!error) {
//processReadFrame(); // #TODO streaming
return std::nullopt;
}
if (error.code() != AVERROR(EAGAIN) || stream.queue.empty()) {
return error;
}
const auto packet = &stream.queue.front().fields();
const auto guard = gsl::finally([
&,
size = packet->size,
data = packet->data
] {
packet->size = size;
packet->data = data;
stream.queue.pop_front();
});
error = avcodec_send_packet(
stream.codec,
packet->data ? packet : nullptr); // Drain on eof.
if (!error) {
continue;
}
LogError(qstr("avcodec_send_packet"), error);
if (error.code() == AVERROR_INVALIDDATA
// There is a sample voice message where skipping such packet
// results in a crash (read_access to nullptr) in swr_convert().
&& stream.codec->codec_id != AV_CODEC_ID_OPUS) {
if (++stream.invalidDataPackets < kSkipInvalidDataPackets) {
continue; // Try to skip a bad packet.
}
}
return error;
} while (true);
[[unreachable]];
}
CodecPointer::CodecPointer(std::nullptr_t) {
}
CodecPointer::CodecPointer(CodecPointer &&other)
: _context(base::take(other._context)) {
}
CodecPointer &CodecPointer::operator=(CodecPointer &&other) {
if (this != &other) {
destroy();
_context = base::take(other._context);
}
return *this;
}
CodecPointer &CodecPointer::operator=(std::nullptr_t) {
destroy();
return *this;
}
void CodecPointer::destroy() {
if (_context) {
avcodec_free_context(&_context);
}
}
CodecPointer CodecPointer::FromStream(not_null<AVStream*> stream) {
auto error = AvErrorWrap();
auto result = CodecPointer();
const auto context = result._context = avcodec_alloc_context3(nullptr);
if (!context) {
LogError(qstr("avcodec_alloc_context3"));
return {};
}
error = avcodec_parameters_to_context(context, stream->codecpar);
if (error) {
LogError(qstr("avcodec_parameters_to_context"), error);
return {};
}
av_codec_set_pkt_timebase(context, stream->time_base);
av_opt_set_int(context, "refcounted_frames", 1, 0);
const auto codec = avcodec_find_decoder(context->codec_id);
if (!codec) {
LogError(qstr("avcodec_find_decoder"), context->codec_id);
return {};
} else if ((error = avcodec_open2(context, codec, nullptr))) {
LogError(qstr("avcodec_open2"), error);
return {};
}
return result;
}
AVCodecContext *CodecPointer::get() const {
return _context;
}
AVCodecContext *CodecPointer::operator->() const {
Expects(_context != nullptr);
return get();
}
CodecPointer::operator AVCodecContext*() const {
return get();
}
AVCodecContext* CodecPointer::release() {
return base::take(_context);
}
CodecPointer::~CodecPointer() {
destroy();
}
FrameDeleter::pointer FrameDeleter::create() {
return av_frame_alloc();
}
void FrameDeleter::operator()(pointer value) {
av_frame_free(&value);
}
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,162 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
} // extern "C"
namespace Media {
namespace Streaming {
[[nodiscard]] crl::time PtsToTime(int64_t pts, const AVRational &timeBase);
enum class Mode {
Both,
Audio,
Video,
Inspection
};
struct Information {
static constexpr auto kDurationUnknown = crl::time(-1);
QSize video;
bool audio = false;
crl::time duration = kDurationUnknown;
crl::time started = 0;
QImage cover;
};
class AvErrorWrap {
public:
AvErrorWrap(int code = 0) : _code(code) {
}
[[nodiscard]] explicit operator bool() const {
return (_code < 0);
}
[[nodiscard]] int code() const {
return _code;
}
[[nodiscard]] QString text() const {
char string[AV_ERROR_MAX_STRING_SIZE] = { 0 };
return QString::fromUtf8(av_make_error_string(
string,
sizeof(string),
_code));
}
private:
int _code = 0;
};
void LogError(QLatin1String method);
void LogError(QLatin1String method, AvErrorWrap error);
class Packet {
public:
Packet() {
setEmpty();
}
Packet(const AVPacket &data) {
bytes::copy(_data, bytes::object_as_span(&data));
}
Packet(Packet &&other) {
bytes::copy(_data, other._data);
if (!other.empty()) {
other.release();
}
}
Packet &operator=(Packet &&other) {
if (this != &other) {
av_packet_unref(&fields());
bytes::copy(_data, other._data);
if (!other.empty()) {
other.release();
}
}
return *this;
}
~Packet() {
av_packet_unref(&fields());
}
[[nodiscard]] AVPacket &fields() {
return *reinterpret_cast<AVPacket*>(_data);
}
[[nodiscard]] const AVPacket &fields() const {
return *reinterpret_cast<const AVPacket*>(_data);
}
[[nodiscard]] bool empty() const {
return !fields().data;
}
void release() {
setEmpty();
}
private:
void setEmpty() {
auto &native = fields();
av_init_packet(&native);
native.data = nullptr;
native.size = 0;
}
alignas(alignof(AVPacket)) bytes::type _data[sizeof(AVPacket)];
};
class CodecPointer {
public:
CodecPointer(std::nullptr_t = nullptr);
CodecPointer(CodecPointer &&other);
CodecPointer &operator=(CodecPointer &&other);
CodecPointer &operator=(std::nullptr_t);
~CodecPointer();
[[nodiscard]] static CodecPointer FromStream(
not_null<AVStream*> stream);
[[nodiscard]] AVCodecContext *get() const;
[[nodiscard]] AVCodecContext *operator->() const;
[[nodiscard]] operator AVCodecContext*() const;
[[nodiscard]] AVCodecContext* release();
private:
void destroy();
AVCodecContext *_context = nullptr;
};
struct FrameDeleter {
using pointer = AVFrame*;
[[nodiscard]] static pointer create();
void operator()(pointer value);
};
using FramePointer = std::unique_ptr<FrameDeleter::pointer, FrameDeleter>;
struct Stream {
CodecPointer codec;
FramePointer frame;
std::deque<Packet> queue;
crl::time lastReadPositionTime = 0;
int invalidDataPackets = 0;
};
[[nodiscard]] std::optional<AvErrorWrap> ReadNextFrame(Stream &stream);
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,536 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "media/streaming/media_streaming_file.h"
#include "media/streaming/media_streaming_loader.h"
#include "media/audio/media_audio.h" // #TODO streaming
#include "media/audio/media_child_ffmpeg_loader.h"
#include "ui/toast/toast.h"
namespace Media {
namespace Streaming {
File::Context::Context(not_null<Reader*> reader)
: _reader(reader)
, _size(reader->size())
, _audioMsgId(AudioMsgId::ForVideo()) {
}
int File::Context::Read(void *opaque, uint8_t *buffer, int bufferSize) {
return static_cast<Context*>(opaque)->read(
bytes::make_span(buffer, bufferSize));
}
int64_t File::Context::Seek(void *opaque, int64_t offset, int whence) {
return static_cast<Context*>(opaque)->seek(offset, whence);
}
int File::Context::read(bytes::span buffer) {
const auto amount = std::min(size_type(_size - _offset), buffer.size());
if (unroll() || amount < 0) {
return -1;
} else if (!amount) {
return amount;
}
buffer = buffer.subspan(0, amount);
while (!_reader->fill(buffer, _offset, &_semaphore)) {
_semaphore.acquire();
if (_interrupted) {
return -1;
} else if (_reader->failed()) {
_failed = true;
return -1;
}
}
_offset += amount;
return amount;
}
int64_t File::Context::seek(int64_t offset, int whence) {
const auto checkedSeek = [&](int64_t offset) {
if (_failed || offset < 0 || offset > _size) {
return -1;
}
return (_offset = offset);
};
switch (whence) {
case SEEK_SET: return checkedSeek(offset);
case SEEK_CUR: return checkedSeek(_offset + offset);
case SEEK_END: return checkedSeek(_size + offset);
case AVSEEK_SIZE: return _size;
}
return -1;
}
void File::Context::logError(QLatin1String method) {
if (!unroll()) {
LogError(method);
}
}
void File::Context::logError(QLatin1String method, AvErrorWrap error) {
if (!unroll()) {
LogError(method, error);
}
}
void File::Context::logFatal(QLatin1String method) {
if (!unroll()) {
LogError(method);
_failed = true;
}
}
void File::Context::logFatal(QLatin1String method, AvErrorWrap error) {
if (!unroll()) {
LogError(method, error);
_failed = true;
}
}
void File::Context::initStream(StreamWrap &wrap, AVMediaType type) {
wrap.id = av_find_best_stream(
_formatContext,
type,
-1,
-1,
0,
0);
if (wrap.id < 0) {
return;
}
wrap.info = _formatContext->streams[wrap.id];
if (type == AVMEDIA_TYPE_VIDEO) {
const auto rotateTag = av_dict_get(
wrap.info->metadata,
"rotate",
nullptr,
0);
if (rotateTag && *rotateTag->value) {
const auto stringRotateTag = QString::fromUtf8(rotateTag->value);
auto toIntSucceeded = false;
const auto rotateDegrees = stringRotateTag.toInt(&toIntSucceeded);
if (toIntSucceeded) {
//_rotation = rotationFromDegrees(rotateDegrees); // #TODO streaming
}
}
}
wrap.stream.codec = CodecPointer::FromStream(wrap.info);
if (!wrap.stream.codec) {
ClearStream(wrap);
return;
}
wrap.stream.frame.reset(FrameDeleter::create());
if (!wrap.stream.frame) {
ClearStream(wrap);
return;
}
}
void File::Context::seekToPosition(crl::time positionTime) {
auto error = AvErrorWrap();
if (!positionTime) {
return;
}
const auto &main = mainStream();
Assert(main.info != nullptr);
const auto timeBase = main.info->time_base;
const auto timeStamp = (positionTime * timeBase.den)
/ (1000LL * timeBase.num);
error = av_seek_frame(
_formatContext,
main.id,
timeStamp,
0);
if (!error) {
return;
}
error = av_seek_frame(
_formatContext,
main.id,
timeStamp,
AVSEEK_FLAG_BACKWARD);
if (!error) {
return;
}
return logFatal(qstr("av_seek_frame"), error);
}
base::variant<Packet, AvErrorWrap> File::Context::readPacket() {
auto error = AvErrorWrap();
auto result = Packet();
error = av_read_frame(_formatContext, &result.fields());
if (unroll()) {
return AvErrorWrap();
} else if (!error) {
return std::move(result);
} else if (error.code() != AVERROR_EOF) {
logFatal(qstr("av_read_frame"), error);
}
return error;
}
void File::Context::start(Mode mode, crl::time positionTime) {
auto error = AvErrorWrap();
_mode = mode;
if (unroll()) {
return;
}
_ioBuffer = reinterpret_cast<uchar*>(av_malloc(AVBlockSize));
_ioContext = avio_alloc_context(
_ioBuffer,
AVBlockSize,
0,
static_cast<void*>(this),
&Context::Read,
nullptr,
&Context::Seek);
_formatContext = avformat_alloc_context();
if (!_formatContext) {
return logFatal(qstr("avformat_alloc_context"));
}
_formatContext->pb = _ioContext;
error = avformat_open_input(&_formatContext, nullptr, nullptr, nullptr);
if (error) {
_ioBuffer = nullptr;
return logFatal(qstr("avformat_open_input"), error);
}
_opened = true;
if ((error = avformat_find_stream_info(_formatContext, nullptr))) {
return logFatal(qstr("avformat_find_stream_info"), error);
}
initStream(_video, AVMEDIA_TYPE_VIDEO);
initStream(_audio, AVMEDIA_TYPE_AUDIO);
if (!mainStreamUnchecked().info) {
return logFatal(qstr("RequiredStreamAbsent"));
}
readInformation(positionTime);
if (_audio.info
&& (_mode == Mode::Audio || _mode == Mode::Both)) { // #TODO streaming
Player::mixer()->resume(_audioMsgId, true);
}
}
auto File::Context::mainStreamUnchecked() const -> const StreamWrap & {
return (_mode == Mode::Video || (_video.info && _mode != Mode::Audio))
? _video
: _audio;
}
auto File::Context::mainStream() const -> const StreamWrap & {
const auto &result = mainStreamUnchecked();
Ensures(result.info != nullptr);
return result;
}
auto File::Context::mainStream() -> StreamWrap & {
return const_cast<StreamWrap&>(((const Context*)this)->mainStream());
}
void File::Context::readInformation(crl::time positionTime) {
const auto &main = mainStream();
const auto info = main.info;
auto information = Information();
information.duration = PtsToTime(info->duration, info->time_base);
auto result = readPacket();
const auto packet = base::get_if<Packet>(&result);
if (unroll()) {
return;
} else if (packet) {
if (positionTime > 0) {
const auto time = CountPacketPositionTime(
_formatContext->streams[packet->fields().stream_index],
*packet);
information.started = (time == Information::kDurationUnknown)
? positionTime
: time;
}
} else {
information.started = positionTime;
}
if (_audio.info
&& (_mode == Mode::Audio || _mode == Mode::Both)) { // #TODO streaming
auto soundData = std::make_unique<VideoSoundData>();
soundData->context = _audio.stream.codec.release();
soundData->frequency = _audio.info->codecpar->sample_rate;
if (_audio.info->duration == AV_NOPTS_VALUE) {
soundData->length = (_formatContext->duration * soundData->frequency) / AV_TIME_BASE;
} else {
soundData->length = (_audio.info->duration * soundData->frequency * _audio.info->time_base.num) / _audio.info->time_base.den;
}
Player::mixer()->play(_audioMsgId, std::move(soundData), information.started);
}
if (packet) {
processPacket(std::move(*packet));
} else {
enqueueEofPackets();
}
information.cover = readFirstVideoFrame();
if (unroll()) {
return;
}
information.audio = (_audio.info != nullptr);
_information = std::move(information);
}
QImage File::Context::readFirstVideoFrame() {
auto result = QImage();
while (_video.info && result.isNull()) {
auto frame = tryReadFirstVideoFrame();
if (unroll()) {
return QImage();
}
frame.match([&](QImage &image) {
if (!image.isNull()) {
result = std::move(image);
} else {
ClearStream(_video);
}
}, [&](const AvErrorWrap &error) {
if (error.code() == AVERROR(EAGAIN)) {
readNextPacket();
} else {
ClearStream(_video);
}
});
}
if (!_video.info && _mode == Mode::Video) {
logFatal(qstr("RequiredStreamEmpty"));
return QImage();
}
return result;
}
base::variant<QImage, AvErrorWrap> File::Context::tryReadFirstVideoFrame() {
Expects(_video.info != nullptr);
if (unroll()) {
return AvErrorWrap();
}
const auto error = ReadNextFrame(_video.stream);
if (error) {
if (error->code() == AVERROR_EOF) {
// No valid video stream.
if (_mode == Mode::Video) {
logFatal(qstr("RequiredStreamEmpty"));
}
return QImage();
} else if (error->code() != AVERROR(EAGAIN)) {
_failed = true;
}
return *error;
}
return QImage(); // #TODO streaming decode frame
}
void File::Context::enqueueEofPackets() {
if (_audio.info) {
Enqueue(_audio, Packet());
}
if (_video.info) {
Enqueue(_video, Packet());
}
_readTillEnd = true;
}
void File::Context::processPacket(Packet &&packet) {
const auto &native = packet.fields();
const auto streamId = native.stream_index;
const auto check = [&](StreamWrap &wrap) {
if ((native.stream_index == wrap.id) && wrap.info) {
// #TODO streaming queue packet to audio player
if ((_mode == Mode::Audio || _mode == Mode::Both)
&& (wrap.info == _audio.info)) {
Player::mixer()->feedFromVideo({ &native, _audioMsgId });
packet.release();
} else {
Enqueue(wrap, std::move(packet));
}
return true;
}
return false;
};
check(_audio) && check(_video);
}
void File::Context::readNextPacket() {
auto result = readPacket();
const auto packet = base::get_if<Packet>(&result);
if (unroll()) {
return;
} else if (packet) {
processPacket(std::move(*packet));
} else {
// Still trying to read by drain.
Assert(result.is<AvErrorWrap>());
Assert(result.get<AvErrorWrap>().code() == AVERROR_EOF);
enqueueEofPackets();
}
}
crl::time File::Context::CountPacketPositionTime(
not_null<const AVStream*> info,
const Packet &packet) {
const auto &native = packet.fields();
const auto packetPts = (native.pts == AV_NOPTS_VALUE)
? native.dts
: native.pts;
const auto &timeBase = info->time_base;
return PtsToTime(packetPts, info->time_base);
}
void File::Context::ClearStream(StreamWrap &wrap) {
wrap.id = -1;
wrap.stream = Stream();
wrap.info = nullptr;
}
crl::time File::Context::CountPacketPositionTime(
const StreamWrap &wrap,
const Packet &packet) {
return CountPacketPositionTime(wrap.info, packet);
}
void File::Context::Enqueue(StreamWrap &wrap, Packet &&packet) {
const auto time = CountPacketPositionTime(wrap, packet);
if (time != Information::kDurationUnknown) {
wrap.stream.lastReadPositionTime = time;
}
QMutexLocker lock(&wrap.mutex);
wrap.stream.queue.push_back(std::move(packet));
}
void File::Context::interrupt() {
_interrupted = true;
_semaphore.release();
}
bool File::Context::interrupted() const {
return _interrupted;
}
bool File::Context::failed() const {
return _failed;
}
bool File::Context::unroll() const {
return failed() || interrupted();
}
File::Context::~Context() {
ClearStream(_audio);
ClearStream(_video);
//if (_swsContext) {
// sws_freeContext(_swsContext);
//}
if (_opened) {
avformat_close_input(&_formatContext);
}
if (_ioContext) {
av_freep(&_ioContext->buffer);
av_freep(&_ioContext);
} else if (_ioBuffer) {
av_freep(&_ioBuffer);
}
if (_formatContext) {
avformat_free_context(_formatContext);
}
}
bool File::Context::started() const {
return _information.has_value();
}
bool File::Context::finished() const {
return unroll() || _readTillEnd;
}
const Media::Streaming::Information & File::Context::information() const {
Expects(_information.has_value());
return *_information;
}
File::File(
not_null<Data::Session*> owner,
std::unique_ptr<Loader> loader)
: _reader(owner, std::move(loader)) {
}
void File::start(Mode mode, crl::time positionTime) {
finish();
_context = std::make_unique<Context>(&_reader);
_thread = std::thread([=, context = _context.get()] {
context->start(mode, positionTime);
if (context->interrupted()) {
return;
} else if (context->failed()) {
crl::on_main(context, [=] {
// #TODO streaming failed
});
} else {
crl::on_main(context, [=, info = context->information()] {
// #TODO streaming started
});
while (!context->finished()) {
context->readNextPacket();
}
crl::on_main(context, [] { AssertIsDebug();
Ui::Toast::Show("Finished loading.");
});
}
});
}
//rpl::producer<Information> File::information() const {
//
//}
//
//rpl::producer<Packet> File::video() const {
//
//}
//
//rpl::producer<Packet> File::audio() const {
//
//}
void File::finish() {
if (_thread.joinable()) {
_context->interrupt();
_thread.join();
}
_context = nullptr;
}
File::~File() {
finish();
}
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,136 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "media/streaming/media_streaming_common.h"
#include "media/streaming/media_streaming_reader.h"
#include "base/bytes.h"
#include "base/weak_ptr.h"
#include <thread>
namespace Data {
class Session;
} // namespace Data
namespace Media {
namespace Streaming {
class Loader;
class File final {
public:
File(not_null<Data::Session*> owner, std::unique_ptr<Loader> loader);
File(const File &other) = delete;
File &operator=(const File &other) = delete;
void start(Mode mode, crl::time positionTime);
//rpl::producer<Information> information() const;
//rpl::producer<Packet> video() const;
//rpl::producer<Packet> audio() const;
~File();
private:
void finish();
class Context final : public base::has_weak_ptr {
public:
Context(not_null<Reader*> reader);
void start(Mode mode, crl::time positionTime);
void readNextPacket();
void interrupt();
[[nodiscard]] bool interrupted() const;
[[nodiscard]] bool failed() const;
[[nodiscard]] bool started() const;
[[nodiscard]] bool finished() const;
[[nodiscard]] const Information &information() const;
~Context();
private:
struct StreamWrap {
int id = -1;
AVStream *info = nullptr;
Stream stream;
QMutex mutex;
};
static int Read(void *opaque, uint8_t *buffer, int bufferSize);
static int64_t Seek(void *opaque, int64_t offset, int whence);
[[nodiscard]] int read(bytes::span buffer);
[[nodiscard]] int64_t seek(int64_t offset, int whence);
[[nodiscard]] bool unroll() const;
void logError(QLatin1String method);
void logError(QLatin1String method, AvErrorWrap error);
void logFatal(QLatin1String method);
void logFatal(QLatin1String method, AvErrorWrap error);
void initStream(StreamWrap &wrap, AVMediaType type);
void seekToPosition(crl::time positionTime);
// #TODO base::expected.
[[nodiscard]] base::variant<Packet, AvErrorWrap> readPacket();
void processPacket(Packet &&packet);
[[nodiscard]] const StreamWrap &mainStreamUnchecked() const;
[[nodiscard]] const StreamWrap &mainStream() const;
[[nodiscard]] StreamWrap &mainStream();
void readInformation(crl::time positionTime);
[[nodiscard]] QImage readFirstVideoFrame();
[[nodiscard]] auto tryReadFirstVideoFrame()
-> base::variant<QImage, AvErrorWrap>;
void enqueueEofPackets();
static void ClearStream(StreamWrap &wrap);
[[nodiscard]] static crl::time CountPacketPositionTime(
not_null<const AVStream*> info,
const Packet &packet);
[[nodiscard]] static crl::time CountPacketPositionTime(
const StreamWrap &wrap,
const Packet &packet);
static void Enqueue(StreamWrap &wrap, Packet &&packet);
not_null<Reader*> _reader;
Mode _mode = Mode::Both;
int _offset = 0;
int _size = 0;
bool _failed = false;
bool _opened = false;
bool _readTillEnd = false;
crl::semaphore _semaphore;
std::atomic<bool> _interrupted = false;
std::optional<Information> _information;
uchar *_ioBuffer = nullptr;
AVIOContext *_ioContext = nullptr;
AVFormatContext *_formatContext = nullptr;
StreamWrap _video;
StreamWrap _audio;
AudioMsgId _audioMsgId;
};
std::thread _thread;
Reader _reader;
std::unique_ptr<Context> _context;
};
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,14 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "media/streaming/media_streaming_loader.h"
namespace Media {
namespace Streaming {
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,38 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
namespace Media {
namespace Streaming {
struct LoadedPart {
int offset = 0;
QByteArray bytes;
static constexpr auto kFailedOffset = -1;
};
class Loader {
public:
static constexpr auto kPartSize = 128 * 1024;
//[[nodiscard]] virtual Storage::Cache::Key baseCacheKey() const = 0;
[[nodiscard]] virtual int size() const = 0;
virtual void load(int offset, int till = -1) = 0;
virtual void stop() = 0;
// Parts will be sent from the main thread.
[[nodiscard]] virtual rpl::producer<LoadedPart> parts() const = 0;
virtual ~Loader() = default;
};
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,144 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "media/streaming/media_streaming_loader_mtproto.h"
#include "apiwrap.h"
namespace Media {
namespace Streaming {
namespace {
constexpr auto kMaxConcurrentRequests = 1; // #TODO streaming
} // namespace
LoaderMtproto::LoaderMtproto(
not_null<ApiWrap*> api,
MTP::DcId dcId,
const MTPInputFileLocation &location,
int size,
Data::FileOrigin origin)
: _api(api)
, _dcId(dcId)
, _location(location)
, _size(size)
, _origin(origin) {
}
int LoaderMtproto::size() const {
return _size;
}
void LoaderMtproto::load(int offset, int till) {
crl::on_main(this, [=] {
cancelRequestsBefore(offset);
_till = till;
sendNext(offset);
});
}
void LoaderMtproto::sendNext(int possibleOffset) {
Expects((possibleOffset % kPartSize) == 0);
const auto offset = _requests.empty()
? possibleOffset
: _requests.back().first + kPartSize;
if ((_till >= 0 && offset >= _till) || (_size > 0 && offset >= _size)) {
return;
} else if (_requests.size() >= kMaxConcurrentRequests) {
return;
}
static auto DcIndex = 0;
const auto id = _sender.request(MTPupload_GetFile(
_location,
MTP_int(offset),
MTP_int(kPartSize)
)).done([=](const MTPupload_File &result) {
requestDone(offset, result);
}).fail([=](const RPCError &error) {
requestFailed(offset, error);
}).toDC(
MTP::downloadDcId(_dcId, (++DcIndex) % MTP::kDownloadSessionsCount)
).send();
_requests.emplace(offset, id);
sendNext(offset + kPartSize);
}
void LoaderMtproto::requestDone(int offset, const MTPupload_File &result) {
result.match([&](const MTPDupload_file &data) {
_requests.erase(offset);
if (data.vbytes.v.size() == kPartSize) {
sendNext(offset + kPartSize);
}
_parts.fire({ offset, data.vbytes.v });
}, [&](const MTPDupload_fileCdnRedirect &data) {
changeCdnParams(
offset,
data.vdc_id.v,
data.vfile_token.v,
data.vencryption_key.v,
data.vencryption_iv.v,
data.vfile_hashes.v);
});
}
void LoaderMtproto::changeCdnParams(
int offset,
MTP::DcId dcId,
const QByteArray &token,
const QByteArray &encryptionKey,
const QByteArray &encryptionIV,
const QVector<MTPFileHash> &hashes) {
// #TODO streaming
}
void LoaderMtproto::requestFailed(int offset, const RPCError &error) {
const auto &type = error.type();
if (error.code() != 400 || !type.startsWith(qstr("FILE_REFERENCE_"))) {
_parts.fire({ LoadedPart::kFailedOffset });
return;
}
const auto callback = [=](const Data::UpdatedFileReferences &updated) {
// #TODO streaming
};
_api->refreshFileReference(_origin, crl::guard(this, callback));
}
void LoaderMtproto::stop() {
crl::on_main(this, [=] {
for (const auto [offset, requestId] : base::take(_requests)) {
_sender.request(requestId).cancel();
}
});
}
rpl::producer<LoadedPart> LoaderMtproto::parts() const {
return _parts.events();
}
LoaderMtproto::~LoaderMtproto() = default;
void LoaderMtproto::cancelRequestsBefore(int offset) {
const auto from = begin(_requests);
const auto till = ranges::lower_bound(
_requests,
offset,
ranges::less(),
[](auto pair) { return pair.first; });
ranges::for_each(
from,
till,
_sender.requestCanceller(),
&base::flat_map<int, mtpRequestId>::value_type::second);
_requests.erase(from, till);
}
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,68 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "media/streaming/media_streaming_loader.h"
#include "mtproto/sender.h"
#include "data/data_file_origin.h"
class ApiWrap;
namespace Media {
namespace Streaming {
class LoaderMtproto : public Loader, public base::has_weak_ptr {
public:
LoaderMtproto(
not_null<ApiWrap*> api,
MTP::DcId dcId,
const MTPInputFileLocation &location,
int size,
Data::FileOrigin origin);
//[[nodiscard]] Storage::Cache::Key baseCacheKey() const override;
[[nodiscard]] int size() const override;
void load(int offset, int till = -1) override;
void stop() override;
// Parts will be sent from the main thread.
[[nodiscard]] rpl::producer<LoadedPart> parts() const override;
~LoaderMtproto();
private:
void cancelRequestsBefore(int offset);
void sendNext(int possibleOffset);
void requestDone(int offset, const MTPupload_File &result);
void requestFailed(int offset, const RPCError &error);
void changeCdnParams(
int offset,
MTP::DcId dcId,
const QByteArray &token,
const QByteArray &encryptionKey,
const QByteArray &encryptionIV,
const QVector<MTPFileHash> &hashes);
const not_null<ApiWrap*> _api;
const MTP::DcId _dcId = 0;
const MTPInputFileLocation _location;
const int _size = 0;
const Data::FileOrigin _origin;
int _till = -1;
MTP::Sender _sender;
base::flat_map<int, mtpRequestId> _requests;
rpl::event_stream<LoadedPart> _parts;
};
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,161 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "media/streaming/media_streaming_reader.h"
#include "media/streaming/media_streaming_loader.h"
#include "storage/cache/storage_cache_database.h"
#include "data/data_session.h"
namespace Media {
namespace Streaming {
namespace {
template <typename Range> // Range::value_type is Pair<int, QByteArray>
int FindNotLoadedStart(Range &&parts, int offset) {
auto result = offset;
for (const auto &part : parts) {
const auto partStart = part.first;
const auto partEnd = partStart + part.second.size();
if (partStart <= result && partEnd >= result) {
result = partEnd;
} else {
break;
}
}
return result;
}
template <typename Range> // Range::value_type is Pair<int, QByteArray>
void CopyLoaded(bytes::span buffer, Range &&parts, int offset, int till) {
auto filled = offset;
for (const auto &part : parts) {
const auto bytes = bytes::make_span(part.second);
const auto partStart = part.first;
const auto partEnd = int(partStart + bytes.size());
const auto copyTill = std::min(partEnd, till);
Assert(partStart <= filled && filled < copyTill);
const auto from = filled - partStart;
const auto copy = copyTill - filled;
bytes::copy(buffer, bytes.subspan(from, copy));
buffer = buffer.subspan(copy);
filled += copy;
}
}
} // namespace
Reader::Reader(
not_null<Data::Session*> owner,
std::unique_ptr<Loader> loader)
: _owner(owner)
, _loader(std::move(loader)) {
_loader->parts(
) | rpl::start_with_next([=](LoadedPart &&part) {
QMutexLocker lock(&_loadedPartsMutex);
_loadedParts.push_back(std::move(part));
lock.unlock();
if (const auto waiting = _waiting.load()) {
_waiting = nullptr;
waiting->release();
}
}, _lifetime);
}
int Reader::size() const {
return _loader->size();
}
bool Reader::failed() const {
return _failed;
}
bool Reader::fill(
bytes::span buffer,
int offset,
crl::semaphore *notify) {
Expects(offset + buffer.size() <= size());
const auto wait = [&](int offset) {
_waiting = notify;
loadFor(offset);
return false;
};
const auto done = [&] {
_waiting = nullptr;
return true;
};
const auto failed = [&] {
_waiting = nullptr;
if (notify) {
notify->release();
}
return false;
};
processLoadedParts();
if (_failed) {
return failed();
}
const auto after = ranges::upper_bound(
_data,
offset,
ranges::less(),
&base::flat_map<int, QByteArray>::value_type::first);
if (after == begin(_data)) {
return wait(offset);
}
const auto till = int(offset + buffer.size());
const auto start = after - 1;
const auto finish = ranges::lower_bound(
start,
end(_data),
till,
ranges::less(),
&base::flat_map<int, QByteArray>::value_type::first);
const auto parts = ranges::make_iterator_range(start, finish);
const auto haveTill = FindNotLoadedStart(parts, offset);
if (haveTill < till) {
return wait(haveTill);
}
CopyLoaded(buffer, parts, offset, till);
return done();
}
void Reader::processLoadedParts() {
QMutexLocker lock(&_loadedPartsMutex);
auto loaded = std::move(_loadedParts);
lock.unlock();
if (_failed) {
return;
}
for (auto &part : loaded) {
if (part.offset == LoadedPart::kFailedOffset
|| (part.bytes.size() != Loader::kPartSize
&& part.offset + part.bytes.size() != size())) {
_failed = true;
return;
}
_data.emplace(part.offset, std::move(part.bytes));
}
}
void Reader::loadFor(int offset) {
const auto part = offset / Loader::kPartSize;
_loader->load(part * Loader::kPartSize);
}
Reader::~Reader() = default;
} // namespace Streaming
} // namespace Media

View file

@ -0,0 +1,56 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "base/bytes.h"
namespace Data {
class Session;
} // namespace Data
namespace Media {
namespace Streaming {
class Loader;
struct LoadedPart;
class Reader final {
public:
Reader(not_null<Data::Session*> owner, std::unique_ptr<Loader> loader);
static constexpr auto kPartSize = 128 * 1024;
int size() const;
bool fill(
bytes::span buffer,
int offset,
crl::semaphore *notify = nullptr);
bool failed() const;
~Reader();
private:
void processLoadedParts();
void loadFor(int offset);
const not_null<Data::Session*> _owner;
const std::unique_ptr<Loader> _loader;
QMutex _loadedPartsMutex;
std::vector<LoadedPart> _loadedParts;
std::atomic<crl::semaphore*> _waiting = nullptr;
// #TODO streaming optimize
base::flat_map<int, QByteArray> _data;
bool _failed = false;
rpl::lifetime _lifetime;
};
} // namespace Streaming
} // namespace Media

@ -1 +1 @@
Subproject commit 74ddc4d1ac3a6a2cfc82aa963f7779010c8b8a78
Subproject commit 84072fba75f14620935e5e91788ce603daeb1988

View file

@ -22,6 +22,7 @@
'-Wno-unused-but-set-variable',
'-Wno-missing-field-initializers',
'-Wno-sign-compare',
'-Wno-attributes',
],
},
'conditions': [

View file

@ -43,6 +43,7 @@
'-Wno-comment',
'-Wno-missing-field-initializers',
'-Wno-sign-compare',
'-Wno-unknown-attributes',
],
},
'xcode_settings': {

View file

@ -454,6 +454,16 @@
<(src_loc)/media/player/media_player_volume_controller.h
<(src_loc)/media/player/media_player_widget.cpp
<(src_loc)/media/player/media_player_widget.h
<(src_loc)/media/streaming/media_streaming_common.cpp
<(src_loc)/media/streaming/media_streaming_common.h
<(src_loc)/media/streaming/media_streaming_file.cpp
<(src_loc)/media/streaming/media_streaming_file.h
<(src_loc)/media/streaming/media_streaming_loader.cpp
<(src_loc)/media/streaming/media_streaming_loader.h
<(src_loc)/media/streaming/media_streaming_loader_mtproto.cpp
<(src_loc)/media/streaming/media_streaming_loader_mtproto.h
<(src_loc)/media/streaming/media_streaming_reader.cpp
<(src_loc)/media/streaming/media_streaming_reader.h
<(src_loc)/media/view/media_clip_controller.cpp
<(src_loc)/media/view/media_clip_controller.h
<(src_loc)/media/view/media_clip_playback.cpp