[Merge] lp:~morphis/aethercast/rework-and-add-tests into lp:aethercast

Konrad Zapałowicz konrad.zapalowicz at canonical.com
Wed Mar 23 00:21:43 UTC 2016


Review: Needs Fixing

Looks good, my comments mostly address the indentation, order on initialization list and the most important suggests using C++11 style random number generator.

Diff comments:

> 
> === added file 'src/mcs/common/executorpool.cpp'
> --- src/mcs/common/executorpool.cpp	1970-01-01 00:00:00 +0000
> +++ src/mcs/common/executorpool.cpp	2016-03-22 15:08:39 +0000
> @@ -0,0 +1,84 @@
> +/*
> + * Copyright (C) 2016 Canonical, Ltd.
> + *
> + * This program is free software: you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License version 3, as published
> + * by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranties of
> + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
> + * PURPOSE.  See the GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License along
> + * with this program.  If not, see <http://www.gnu.org/licenses/>.
> + *
> + */
> +
> +#include "mcs/common/executorpool.h"
> +
> +namespace mcs {
> +namespace common {
> +
> +ExecutorPool::ExecutorPool(const ExecutorFactory::Ptr &factory, const std::uint32_t &size) :
> +    size_(size),
> +    running_(false),
> +    factory_(factory) {
> +}
> +
> +ExecutorPool::~ExecutorPool() {
> +    Stop();
> +}
> +
> +bool ExecutorPool::Add(const Executable::Ptr &executable) {
> +    if (items_.size() == size_ || running_)
> +        return false;
> +
> +    auto executor = factory_->Create(executable);
> +    items_.push_back(Item{executable, executor});
> +
> +    return true;
> +}
> +
> +bool ExecutorPool::Start() {
> +    if (running_)
> +        return false;
> +
> +    bool result = true;
> +    for (Item &item : items_)
> +        result &= item.executor->Start();

shouldn't we break at 1st failure here?

> +
> +    // If we failed to start all we stop those we already
> +    // started to come back into a well known state.
> +    if (!result) {
> +        for (auto &item : items_) {
> +            if (item.executor->Running())
> +                item.executor->Stop();
> +        }
> +    }
> +
> +    running_ = result;
> +
> +    return result;
> +}
> +
> +bool ExecutorPool::Stop() {
> +    if (!running_)
> +        return false;
> +
> +    bool result = true;
> +    for (auto &item : items_)
> +        result &= item.executor->Stop();
> +
> +    if (result)
> +        running_ = false;
> +
> +    return result;
> +}
> +
> +bool ExecutorPool::Running() const {
> +    return running_;
> +}
> +
> +} // namespace common
> +} // namespace mcs
> 
> === modified file 'src/mcs/mir/sourcemediamanager.cpp'
> --- src/mcs/mir/sourcemediamanager.cpp	2016-03-08 09:07:11 +0000
> +++ src/mcs/mir/sourcemediamanager.cpp	2016-03-22 15:08:39 +0000
> @@ -33,42 +37,45 @@
>  namespace mcs {
>  namespace mir {
>  
> -SourceMediaManager::Ptr SourceMediaManager::Create(const std::string &remote_address) {
> -    return std::shared_ptr<SourceMediaManager>(new SourceMediaManager(remote_address));
> -}
> -
> -SourceMediaManager::SourceMediaManager(const std::string &remote_address) :
> +SourceMediaManager::SourceMediaManager(const std::string &remote_address,
> +                                       const mcs::common::ExecutorFactory::Ptr &executor_factory,
> +                                       const mcs::video::BufferProducer::Ptr &producer,
> +                                       const mcs::video::BaseEncoder::Ptr &encoder,
> +                                       const mcs::network::Stream::Ptr &output_stream,
> +                                       const mcs::report::ReportFactory::Ptr &report_factory) :
>      remote_address_(remote_address),
> -    state_(State::Stopped) {
> +    producer_(producer),
> +    encoder_(encoder),
> +    output_stream_(output_stream),
> +    report_factory_(report_factory),
> +    state_(State::Stopped),

order on the initialization list does not match the order in which the member variables are declared.

> +    pipeline_(executor_factory, 4) {
>  }
>  
>  SourceMediaManager::~SourceMediaManager() {
>      if (state_ != State::Stopped)
> -        StopPipeline();
> +        pipeline_.Stop();
>  }
>  
>  bool SourceMediaManager::Configure() {
> -    auto report_factory = report::ReportFactory::Create();
> -
>      auto rr = mcs::video::ExtractRateAndResolution(format_);
>  
> +    if (!output_stream_->Connect(remote_address_, sink_port1_))
> +        return false;
> +
>      MCS_DEBUG("dimensions: %dx%d@%d", rr.width, rr.height, rr.framerate);
>  
> -    // FIXME we don't support any other mode than extend for now as that means some
> -    // additional work from mir to still give us properly sized frames we can hand
> -    // to the encoder.
> -    Screencast::DisplayOutput output{Screencast::DisplayMode::kExtend, rr.width, rr.height};
> +    video::DisplayOutput output{video::DisplayOutput::Mode::kExtend, rr.width, rr.height, rr.framerate};
>  
> -    connector_ = std::make_shared<mcs::mir::Screencast>(output);
> -    if (!connector_->IsValid())
> +    if (!producer_->Setup(output)) {
> +        MCS_ERROR("Failed to setup buffer producer");
>          return false;
> -
> -    encoder_ = mcs::android::H264Encoder::Create(report_factory->CreateEncoderReport());
> +    }
>  
>      int profile = 0, level = 0, constraint = 0;
>      mcs::video::ExtractProfileLevel(format_, &profile, &level, &constraint);
>  
> -    auto config = mcs::android::H264Encoder::DefaultConfiguration();
> +    auto config = encoder_->DefaultConfiguration();
>      config.width = rr.width;
>      config.height = rr.height;
>      config.framerate = rr.framerate;
> @@ -81,63 +88,62 @@
>          return false;
>      }
>  
> -    encoder_executor_ = mcs::common::ThreadedExecutor::Create(encoder_, "Encoder");
> -
> -    renderer_ = mcs::mir::StreamRenderer::Create(connector_, encoder_,
> -                                                 report_factory->CreateRendererReport());
> -    renderer_->SetDimensions(rr.width, rr.height);
> -
> -    auto rtp_sender = mcs::streaming::RTPSender::Create(remote_address_, sink_port1_,
> -                                                        report_factory->CreateSenderReport());
> -    auto mpegts_packetizer = mcs::streaming::MPEGTSPacketizer::Create(report_factory->CreatePacketizerReport());
> -
> -    sender_ = mcs::streaming::MediaSender::Create(mpegts_packetizer, rtp_sender, config);
> +    renderer_ = std::make_shared<mcs::mir::StreamRenderer>(
> +                producer_,

do not like the indentation here and line below. why break line after ( as 'producer_' and 'encoder_' would fit

> +                encoder_,
> +                report_factory_->CreateRendererReport());
> +
> +    auto rtp_sender = std::make_shared<mcs::streaming::RTPSender>(
> +                output_stream_,
> +                report_factory_->CreateSenderReport());
> +
> +    auto mpegts_packetizer = mcs::streaming::MPEGTSPacketizer::Create(
> +                report_factory_->CreatePacketizerReport());
> +
> +    sender_ = std::make_shared<mcs::streaming::MediaSender>(
> +                mpegts_packetizer,
> +                rtp_sender,
> +                config);
> +
>      encoder_->SetDelegate(sender_);
>  
> +    pipeline_.Add(encoder_);
> +    pipeline_.Add(renderer_);
> +    pipeline_.Add(rtp_sender);
> +    pipeline_.Add(sender_);
> +
>      return true;
>  }
>  
> -void SourceMediaManager::StartPipeline() {
> -    sender_->Start();
> -    encoder_executor_->Start();
> -    renderer_->StartThreaded();
> -}
> -
> -void SourceMediaManager::StopPipeline() {
> -    renderer_->Stop();
> -    encoder_executor_->Stop();
> -    sender_->Stop();
> -}
> -
>  void SourceMediaManager::Play() {
> -    if (!IsPaused() || !renderer_)
> +    if (!IsPaused())
>          return;
>  
>      MCS_DEBUG("");
>  
> -    StartPipeline();
> +    pipeline_.Start();
>  
>      state_ = State::Playing;
>  }
>  
>  void SourceMediaManager::Pause() {
> -    if (IsPaused()|| !renderer_)
> +    if (IsPaused())
>          return;
>  
>      MCS_DEBUG("");
>  
> -    StopPipeline();
> +    pipeline_.Stop();
>  
>      state_ = State::Paused;
>  }
>  
>  void SourceMediaManager::Teardown() {
> -    if (state_ == State::Stopped || !renderer_)
> +    if (state_ == State::Stopped)
>          return;
>  
>      MCS_DEBUG("");
>  
> -    StopPipeline();
> +    pipeline_.Stop();
>  
>      state_ = State::Stopped;
>  }
> 
> === added file 'src/mcs/network/udpstream.cpp'
> --- src/mcs/network/udpstream.cpp	1970-01-01 00:00:00 +0000
> +++ src/mcs/network/udpstream.cpp	2016-03-22 15:08:39 +0000
> @@ -0,0 +1,178 @@
> +/*
> + * Copyright (C) 2016 Canonical, Ltd.
> + *
> + * This program is free software: you can redistribute it and/or modify it
> + * under the terms of the GNU General Public License version 3, as published
> + * by the Free Software Foundation.
> + *
> + * This program is distributed in the hope that it will be useful, but
> + * WITHOUT ANY WARRANTY; without even the implied warranties of
> + * MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
> + * PURPOSE.  See the GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License along
> + * with this program.  If not, see <http://www.gnu.org/licenses/>.
> + *
> + */
> +
> +#include <arpa/inet.h>
> +#include <fcntl.h>
> +#include <net/if.h>
> +#include <netdb.h>
> +#include <netinet/in.h>
> +#include <sys/ioctl.h>
> +#include <sys/socket.h>
> +#include <memory.h>
> +#include <errno.h>
> +#include <error.h>
> +#include <stdlib.h>
> +
> +#include <system_error>
> +
> +#include "mcs/logger.h"
> +
> +#include "mcs/network/udpstream.h"
> +
> +namespace {
> +static constexpr unsigned int kUdpTxBufferSize = 256 * 1024;
> +/* Value below configured MTU so that we don't require any further splits */
> +static constexpr unsigned int kMaxUDPPacketSize = 1472;
> +
> +static mcs::network::Port PickRandomRTPPort() {
> +    // Pick an even integer in range [1024, 65534)
> +    static const size_t range = (65534 - 1024) / 2;

At the first glance, given the comment above, in order to change the lower bound I would just change the "- 1024" to sth different like "- 2048". There is a high chance that I would miss the necessary change in the line below.

With all that C++11 around couldn't this just be rewritten using std::uniform_int_distribution?

> +    return (mcs::network::Port)(((float)(range + 1) * rand()) / RAND_MAX) * 2 + 1024;

why c-style cast?

> +}
> +
> +static int MakeSocketNonBlocking(int socket) {
> +    int flags = fcntl(socket, F_GETFL, 0);
> +    if (flags < 0)
> +        flags = 0;
> +    int res = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
> +    if (res < 0)
> +        return -errno;
> +    return 0;
> +}
> +
> +}
> +
> +namespace mcs {
> +namespace network {
> +
> +UdpStream::UdpStream() :
> +    socket_(0),
> +    local_port_(PickRandomRTPPort()) {
> +}
> +
> +UdpStream::~UdpStream() {
> +    if (socket_ > 0)
> +        ::close(socket_);
> +}
> +
> +bool UdpStream::Connect(const std::string &address, const Port &port) {
> +    MCS_DEBUG("Connected with remote on %s:%d", address, port);
> +
> +    socket_ = ::socket(AF_INET, SOCK_DGRAM, 0);
> +    if (socket_ < 0) {
> +        MCS_ERROR("Failed to create socket: %s (%d)", ::strerror(errno), errno);
> +        return false;
> +    }
> +
> +    int value = kUdpTxBufferSize;
> +    if (::setsockopt(socket_, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) < 0) {
> +        MCS_ERROR("Failed to set socket transmit buffer size: %s (%d)", ::strerror(errno), errno);
> +        return false;
> +    }
> +
> +    if (::MakeSocketNonBlocking(socket_) < 0) {
> +        MCS_ERROR("Failed to make socket non blocking");
> +        return false;
> +    }
> +
> +    struct sockaddr_in addr;
> +    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
> +    addr.sin_family = AF_INET;
> +    addr.sin_addr.s_addr = htonl(INADDR_ANY);
> +    addr.sin_port = htons(local_port_);
> +
> +    if (::bind(socket_, (const struct sockaddr *) &addr, sizeof(addr)) < 0) {
> +        MCS_ERROR("Failed to bind socket to address: %s (%d)", ::strerror(errno), errno);
> +        return false;
> +    }
> +
> +    struct sockaddr_in remote_addr;
> +    memset(remote_addr.sin_zero, 0, sizeof(remote_addr.sin_zero));
> +    remote_addr.sin_family = AF_INET;
> +    remote_addr.sin_port = htons(port);
> +
> +    struct hostent *ent = gethostbyname(address.c_str());
> +    if (!ent) {
> +        MCS_ERROR("Failed to resolve remote address");
> +        return false;
> +    }
> +
> +    remote_addr.sin_addr.s_addr = *(in_addr_t*) ent->h_addr;
> +
> +    if (::connect(socket_, (const struct sockaddr*) &remote_addr, sizeof(remote_addr)) < 0) {
> +        MCS_ERROR("Failed to connect to remote: %s (%d)", ::strerror(errno), errno);
> +        return false;
> +    }
> +
> +    return true;
> +}
> +
> +bool UdpStream::WaitUntilReady() {
> +    fd_set fds;
> +    FD_SET(socket_, &fds);
> +
> +    int ret = ::select(socket_ + 1, nullptr, &fds, nullptr, nullptr);
> +    if (!FD_ISSET(socket_, &fds))
> +        return false;
> +
> +    return true;
> +}
> +
> +Stream::Error UdpStream::Write(const uint8_t *data, unsigned int size) {
> +    auto bytes_sent = ::send(socket_, data, size, 0);
> +
> +    // If we get an error back which relates to a possible congested
> +    // socket we try to resend one time and then fall into our actual
> +    // error handling.
> +    if (bytes_sent < 0) {
> +        switch (errno) {
> +        case ECONNREFUSED:
> +        case ENOPROTOOPT:
> +        case EPROTO:
> +        case EHOSTUNREACH:
> +        case ENETUNREACH:
> +        case ENETDOWN:
> +            MCS_DEBUG("Trying to resend due to a possible congested socket");
> +            bytes_sent = ::send(socket_, data, size, 0);
> +            break;
> +        default:
> +            break;
> +        }
> +    }
> +
> +    if (bytes_sent < 0) {
> +        MCS_ERROR("Failed to send packet to remote: %s (%d)", ::strerror(-errno), errno);
> +        return Error::kFailed;
> +    }
> +    else if (bytes_sent == 0) {
> +        MCS_ERROR("Remote has closed connection: %s (%d)", ::strerror(-errno), errno);
> +        return Error::kRemoteClosedConnection;
> +    }
> +
> +    return Error::kNone;
> +}
> +
> +Port UdpStream::LocalPort() const {
> +    return local_port_;
> +}
> +
> +std::uint32_t UdpStream::MaxUnitSize() const {
> +    return kMaxUDPPacketSize;
> +}
> +
> +} // namespace network
> +} // namespace mcs


-- 
https://code.launchpad.net/~morphis/aethercast/rework-and-add-tests/+merge/289804
Your team Ubuntu Phablet Team is requested to review the proposed merge of lp:~morphis/aethercast/rework-and-add-tests into lp:aethercast.



More information about the Ubuntu-reviews mailing list