[Merge] lp:~morphis/aethercast/rework-and-add-tests into lp:aethercast
Konrad Zapałowicz
konrad.zapalowicz at canonical.com
Wed Mar 23 00:21:43 UTC 2016
Review: Needs Fixing
Looks good, my comments mostly address the indentation, order on initialization list and the most important suggests using C++11 style random number generator.
Diff comments:
>
> === added file 'src/mcs/common/executorpool.cpp'
> --- src/mcs/common/executorpool.cpp 1970-01-01 00:00:00 +0000
> +++ src/mcs/common/executorpool.cpp 2016-03-22 15:08:39 +0000
> @@ -0,0 +1,84 @@
> +/*
> + * Copyright (C) 2016 Canonical, Ltd.
> + *
> + * This program is free software: you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License version 3, as published
> + * by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranties of
> + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
> + * PURPOSE. See the GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License along
> + * with this program. If not, see <http://www.gnu.org/licenses/>.
> + *
> + */
> +
> +#include "mcs/common/executorpool.h"
> +
> +namespace mcs {
> +namespace common {
> +
> +ExecutorPool::ExecutorPool(const ExecutorFactory::Ptr &factory, const std::uint32_t &size) :
> + size_(size),
> + running_(false),
> + factory_(factory) {
> +}
> +
> +ExecutorPool::~ExecutorPool() {
> + Stop();
> +}
> +
> +bool ExecutorPool::Add(const Executable::Ptr &executable) {
> + if (items_.size() == size_ || running_)
> + return false;
> +
> + auto executor = factory_->Create(executable);
> + items_.push_back(Item{executable, executor});
> +
> + return true;
> +}
> +
> +bool ExecutorPool::Start() {
> + if (running_)
> + return false;
> +
> + bool result = true;
> + for (Item &item : items_)
> + result &= item.executor->Start();
shouldn't we break at 1st failure here?
> +
> + // If we failed to start all we stop those we already
> + // started to come back into a well known state.
> + if (!result) {
> + for (auto &item : items_) {
> + if (item.executor->Running())
> + item.executor->Stop();
> + }
> + }
> +
> + running_ = result;
> +
> + return result;
> +}
> +
> +bool ExecutorPool::Stop() {
> + if (!running_)
> + return false;
> +
> + bool result = true;
> + for (auto &item : items_)
> + result &= item.executor->Stop();
> +
> + if (result)
> + running_ = false;
> +
> + return result;
> +}
> +
> +bool ExecutorPool::Running() const {
> + return running_;
> +}
> +
> +} // namespace common
> +} // namespace mcs
>
> === modified file 'src/mcs/mir/sourcemediamanager.cpp'
> --- src/mcs/mir/sourcemediamanager.cpp 2016-03-08 09:07:11 +0000
> +++ src/mcs/mir/sourcemediamanager.cpp 2016-03-22 15:08:39 +0000
> @@ -33,42 +37,45 @@
> namespace mcs {
> namespace mir {
>
> -SourceMediaManager::Ptr SourceMediaManager::Create(const std::string &remote_address) {
> - return std::shared_ptr<SourceMediaManager>(new SourceMediaManager(remote_address));
> -}
> -
> -SourceMediaManager::SourceMediaManager(const std::string &remote_address) :
> +SourceMediaManager::SourceMediaManager(const std::string &remote_address,
> + const mcs::common::ExecutorFactory::Ptr &executor_factory,
> + const mcs::video::BufferProducer::Ptr &producer,
> + const mcs::video::BaseEncoder::Ptr &encoder,
> + const mcs::network::Stream::Ptr &output_stream,
> + const mcs::report::ReportFactory::Ptr &report_factory) :
> remote_address_(remote_address),
> - state_(State::Stopped) {
> + producer_(producer),
> + encoder_(encoder),
> + output_stream_(output_stream),
> + report_factory_(report_factory),
> + state_(State::Stopped),
order on the initialization list does not match the order in which the member variables are declared.
> + pipeline_(executor_factory, 4) {
> }
>
> SourceMediaManager::~SourceMediaManager() {
> if (state_ != State::Stopped)
> - StopPipeline();
> + pipeline_.Stop();
> }
>
> bool SourceMediaManager::Configure() {
> - auto report_factory = report::ReportFactory::Create();
> -
> auto rr = mcs::video::ExtractRateAndResolution(format_);
>
> + if (!output_stream_->Connect(remote_address_, sink_port1_))
> + return false;
> +
> MCS_DEBUG("dimensions: %dx%d@%d", rr.width, rr.height, rr.framerate);
>
> - // FIXME we don't support any other mode than extend for now as that means some
> - // additional work from mir to still give us properly sized frames we can hand
> - // to the encoder.
> - Screencast::DisplayOutput output{Screencast::DisplayMode::kExtend, rr.width, rr.height};
> + video::DisplayOutput output{video::DisplayOutput::Mode::kExtend, rr.width, rr.height, rr.framerate};
>
> - connector_ = std::make_shared<mcs::mir::Screencast>(output);
> - if (!connector_->IsValid())
> + if (!producer_->Setup(output)) {
> + MCS_ERROR("Failed to setup buffer producer");
> return false;
> -
> - encoder_ = mcs::android::H264Encoder::Create(report_factory->CreateEncoderReport());
> + }
>
> int profile = 0, level = 0, constraint = 0;
> mcs::video::ExtractProfileLevel(format_, &profile, &level, &constraint);
>
> - auto config = mcs::android::H264Encoder::DefaultConfiguration();
> + auto config = encoder_->DefaultConfiguration();
> config.width = rr.width;
> config.height = rr.height;
> config.framerate = rr.framerate;
> @@ -81,63 +88,62 @@
> return false;
> }
>
> - encoder_executor_ = mcs::common::ThreadedExecutor::Create(encoder_, "Encoder");
> -
> - renderer_ = mcs::mir::StreamRenderer::Create(connector_, encoder_,
> - report_factory->CreateRendererReport());
> - renderer_->SetDimensions(rr.width, rr.height);
> -
> - auto rtp_sender = mcs::streaming::RTPSender::Create(remote_address_, sink_port1_,
> - report_factory->CreateSenderReport());
> - auto mpegts_packetizer = mcs::streaming::MPEGTSPacketizer::Create(report_factory->CreatePacketizerReport());
> -
> - sender_ = mcs::streaming::MediaSender::Create(mpegts_packetizer, rtp_sender, config);
> + renderer_ = std::make_shared<mcs::mir::StreamRenderer>(
> + producer_,
do not like the indentation here and line below. why break line after ( as 'producer_' and 'encoder_' would fit
> + encoder_,
> + report_factory_->CreateRendererReport());
> +
> + auto rtp_sender = std::make_shared<mcs::streaming::RTPSender>(
> + output_stream_,
> + report_factory_->CreateSenderReport());
> +
> + auto mpegts_packetizer = mcs::streaming::MPEGTSPacketizer::Create(
> + report_factory_->CreatePacketizerReport());
> +
> + sender_ = std::make_shared<mcs::streaming::MediaSender>(
> + mpegts_packetizer,
> + rtp_sender,
> + config);
> +
> encoder_->SetDelegate(sender_);
>
> + pipeline_.Add(encoder_);
> + pipeline_.Add(renderer_);
> + pipeline_.Add(rtp_sender);
> + pipeline_.Add(sender_);
> +
> return true;
> }
>
> -void SourceMediaManager::StartPipeline() {
> - sender_->Start();
> - encoder_executor_->Start();
> - renderer_->StartThreaded();
> -}
> -
> -void SourceMediaManager::StopPipeline() {
> - renderer_->Stop();
> - encoder_executor_->Stop();
> - sender_->Stop();
> -}
> -
> void SourceMediaManager::Play() {
> - if (!IsPaused() || !renderer_)
> + if (!IsPaused())
> return;
>
> MCS_DEBUG("");
>
> - StartPipeline();
> + pipeline_.Start();
>
> state_ = State::Playing;
> }
>
> void SourceMediaManager::Pause() {
> - if (IsPaused()|| !renderer_)
> + if (IsPaused())
> return;
>
> MCS_DEBUG("");
>
> - StopPipeline();
> + pipeline_.Stop();
>
> state_ = State::Paused;
> }
>
> void SourceMediaManager::Teardown() {
> - if (state_ == State::Stopped || !renderer_)
> + if (state_ == State::Stopped)
> return;
>
> MCS_DEBUG("");
>
> - StopPipeline();
> + pipeline_.Stop();
>
> state_ = State::Stopped;
> }
>
> === added file 'src/mcs/network/udpstream.cpp'
> --- src/mcs/network/udpstream.cpp 1970-01-01 00:00:00 +0000
> +++ src/mcs/network/udpstream.cpp 2016-03-22 15:08:39 +0000
> @@ -0,0 +1,178 @@
> +/*
> + * Copyright (C) 2016 Canonical, Ltd.
> + *
> + * This program is free software: you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License version 3, as published
> + * by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranties of
> + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
> + * PURPOSE. See the GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License along
> + * with this program. If not, see <http://www.gnu.org/licenses/>.
> + *
> + */
> +
> +#include <arpa/inet.h>
> +#include <fcntl.h>
> +#include <net/if.h>
> +#include <netdb.h>
> +#include <netinet/in.h>
> +#include <sys/ioctl.h>
> +#include <sys/socket.h>
> +#include <memory.h>
> +#include <errno.h>
> +#include <error.h>
> +#include <stdlib.h>
> +
> +#include <system_error>
> +
> +#include "mcs/logger.h"
> +
> +#include "mcs/network/udpstream.h"
> +
> +namespace {
> +static constexpr unsigned int kUdpTxBufferSize = 256 * 1024;
> +/* Value below configured MTU so that we don't require any further splits */
> +static constexpr unsigned int kMaxUDPPacketSize = 1472;
> +
> +static mcs::network::Port PickRandomRTPPort() {
> + // Pick an even integer in range [1024, 65534)
> + static const size_t range = (65534 - 1024) / 2;
At the first glance, given the comment above, in order to change the lower bound I would just change the "- 1024" to sth different like "- 2048". There is a high chance that I would miss the necessary change in the line below.
With all that C++11 around couldn't this just be rewritten using std::uniform_int_distribution?
> + return (mcs::network::Port)(((float)(range + 1) * rand()) / RAND_MAX) * 2 + 1024;
why c-style cast?
> +}
> +
> +static int MakeSocketNonBlocking(int socket) {
> + int flags = fcntl(socket, F_GETFL, 0);
> + if (flags < 0)
> + flags = 0;
> + int res = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
> + if (res < 0)
> + return -errno;
> + return 0;
> +}
> +
> +}
> +
> +namespace mcs {
> +namespace network {
> +
> +UdpStream::UdpStream() :
> + socket_(0),
> + local_port_(PickRandomRTPPort()) {
> +}
> +
> +UdpStream::~UdpStream() {
> + if (socket_ > 0)
> + ::close(socket_);
> +}
> +
> +bool UdpStream::Connect(const std::string &address, const Port &port) {
> + MCS_DEBUG("Connected with remote on %s:%d", address, port);
> +
> + socket_ = ::socket(AF_INET, SOCK_DGRAM, 0);
> + if (socket_ < 0) {
> + MCS_ERROR("Failed to create socket: %s (%d)", ::strerror(errno), errno);
> + return false;
> + }
> +
> + int value = kUdpTxBufferSize;
> + if (::setsockopt(socket_, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) < 0) {
> + MCS_ERROR("Failed to set socket transmit buffer size: %s (%d)", ::strerror(errno), errno);
> + return false;
> + }
> +
> + if (::MakeSocketNonBlocking(socket_) < 0) {
> + MCS_ERROR("Failed to make socket non blocking");
> + return false;
> + }
> +
> + struct sockaddr_in addr;
> + memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
> + addr.sin_family = AF_INET;
> + addr.sin_addr.s_addr = htonl(INADDR_ANY);
> + addr.sin_port = htons(local_port_);
> +
> + if (::bind(socket_, (const struct sockaddr *) &addr, sizeof(addr)) < 0) {
> + MCS_ERROR("Failed to bind socket to address: %s (%d)", ::strerror(errno), errno);
> + return false;
> + }
> +
> + struct sockaddr_in remote_addr;
> + memset(remote_addr.sin_zero, 0, sizeof(remote_addr.sin_zero));
> + remote_addr.sin_family = AF_INET;
> + remote_addr.sin_port = htons(port);
> +
> + struct hostent *ent = gethostbyname(address.c_str());
> + if (!ent) {
> + MCS_ERROR("Failed to resolve remote address");
> + return false;
> + }
> +
> + remote_addr.sin_addr.s_addr = *(in_addr_t*) ent->h_addr;
> +
> + if (::connect(socket_, (const struct sockaddr*) &remote_addr, sizeof(remote_addr)) < 0) {
> + MCS_ERROR("Failed to connect to remote: %s (%d)", ::strerror(errno), errno);
> + return false;
> + }
> +
> + return true;
> +}
> +
> +bool UdpStream::WaitUntilReady() {
> + fd_set fds;
> + FD_SET(socket_, &fds);
> +
> + int ret = ::select(socket_ + 1, nullptr, &fds, nullptr, nullptr);
> + if (!FD_ISSET(socket_, &fds))
> + return false;
> +
> + return true;
> +}
> +
> +Stream::Error UdpStream::Write(const uint8_t *data, unsigned int size) {
> + auto bytes_sent = ::send(socket_, data, size, 0);
> +
> + // If we get an error back which relates to a possible congested
> + // socket we try to resend one time and then fall into our actual
> + // error handling.
> + if (bytes_sent < 0) {
> + switch (errno) {
> + case ECONNREFUSED:
> + case ENOPROTOOPT:
> + case EPROTO:
> + case EHOSTUNREACH:
> + case ENETUNREACH:
> + case ENETDOWN:
> + MCS_DEBUG("Trying to resend due to a possible congested socket");
> + bytes_sent = ::send(socket_, data, size, 0);
> + break;
> + default:
> + break;
> + }
> + }
> +
> + if (bytes_sent < 0) {
> + MCS_ERROR("Failed to send packet to remote: %s (%d)", ::strerror(-errno), errno);
> + return Error::kFailed;
> + }
> + else if (bytes_sent == 0) {
> + MCS_ERROR("Remote has closed connection: %s (%d)", ::strerror(-errno), errno);
> + return Error::kRemoteClosedConnection;
> + }
> +
> + return Error::kNone;
> +}
> +
> +Port UdpStream::LocalPort() const {
> + return local_port_;
> +}
> +
> +std::uint32_t UdpStream::MaxUnitSize() const {
> + return kMaxUDPPacketSize;
> +}
> +
> +} // namespace network
> +} // namespace mcs
--
https://code.launchpad.net/~morphis/aethercast/rework-and-add-tests/+merge/289804
Your team Ubuntu Phablet Team is requested to review the proposed merge of lp:~morphis/aethercast/rework-and-add-tests into lp:aethercast.
More information about the Ubuntu-reviews
mailing list