Variable parameter size function: how to conditionally set some arguments?

To create a boost :: process with output redirection, you must do:

bp::ipstream out;
bp::child c("c++filt", std_out > out);

      

Now, what would be the syntax for conditional redirection control:

bool redirect = true; // or false
bp::ipstream out;
bp::child c("c++filt", (redirect) ? std_out > out : "what should I put here??" );

      

+1


source to share


2 answers


Attractive API boost::process

rotates around of handler

objects that implement on_setup

, on_error

, on_success

(and potentially more, depending on the current operating system) methods that will be executed when called building process in the context of some of the inner panel and start the process to modify the behavior of the starting process installation. std_out > out

is an overloaded operator that will return such a handler. More information can be found in the extension process extension documentation .

So a general way to conditionally control redirects and other parameters would be to write a generic class proxy that takes an optional real handler and call the corresponding methods of the real handler:



#include <boost/process.hpp>
#include <boost/process/extend.hpp>
#include <boost/optional.hpp>

#include <memory>
#include <utility>

template<typename TRealHandler> class
t_OptionalHandler
:    public ::boost::process::extend::handler
{
    private: using t_OptionalRealHandler = ::boost::optional<TRealHandler>;

    private: t_OptionalRealHandler m_real_handler;

    public:
    t_OptionalHandler(void)
    :   m_real_handler{}
    {}

    public: explicit
    t_OptionalHandler(TRealHandler && real_handler)
    :   m_real_handler{::std::move(real_handler)}
    {}

    template<typename TExecutor>
    void on_setup(TExecutor & e) const
    {
        if(m_real_handler)
        {
            m_real_handler.get().on_setup(e);
        }
    }

    template<typename TExecutor>
    void on_error(TExecutor & e, const std::error_code & code) const
    {
        if(m_real_handler)
        {
            m_real_handler.get().on_error(e, code);
        }
    }

    template<typename TExecutor>
    void on_success(TExecutor & e) const
    {
        if(m_real_handler)
        {
            m_real_handler.get().on_success(e);
        }
    }
};

int main()
{
    bool const need_to_redirect{false};
    ::std::unique_ptr<::boost::process::ipstream> const p_stream
    {
        need_to_redirect?
        new ::boost::process::ipstream{}
        :
        nullptr
    };
    using t_OptionalStdOutRedirectionHandler = t_OptionalHandler
    <
        decltype(::boost::process::std_out > *p_stream)
     >;
    ::boost::process::child ch
    (
        "cmd"
    ,   need_to_redirect?
        t_OptionalStdOutRedirectionHandler{::boost::process::std_out > *p_stream}
        :
        t_OptionalStdOutRedirectionHandler{}
    );
    ch.wait();
    return(0);
}

      

+2


source


I was there.

Indeed, startup functions (not exactly factories, but composite procedural wrappers) were what I used.

Introduction

We had a CommandRunner with a legacy implementation that I rewrote to use Boost. I am missing the open interface:

class CommandRunner {
    public: struct IRunnerImpl;
};

      

The implementation was Pimpl-ed and worked with the base implementation while keeping mostly simple implementation-independent options:

struct CommandRunner::IRunnerImpl {
    virtual ~IRunnerImpl() = default;

    virtual void run()                    = 0;
    virtual void ensure_completed()       = 0;
    virtual std::string to_string() const = 0;

    friend class CommandRunner;
  protected:
    std::string            _working_directory;
    mylibrary::optional<mylibrary::io::time::duration> _time_constraint;
    std::string            _stdin_data;
    int                    _redirected_output_fd     = -1;
    std::string            _redirected_output_fname;
    bool                   _discard_output           = false;
    int                    _exit_code;
    std::string            _stdout_str;
    std::string            _stderr_str;
    bool                   _command_timed_out        = false;
    bool                   _sensitive_args           = false;
    string_map_t           _env;
};

      

Boost RunnerImpl



The core is ensure_completed

composed using helper lambdas like this:

try {
    mylibrary::threads::safe_io_service safe_ios;
    boost::asio::io_service& ios = safe_ios;
    mylibrary::io::time::timer deadline(ios);

    bp::group process_group;

    bp::async_pipe input(ios);
    std::future<std::string> output, error;

    if (_working_directory.empty())
        _working_directory = ".";

    auto on_exit = [this, &deadline](int exit, std::error_code ec) {
        if (!_command_timed_out) {
            _exit_code = exit;
        }
        deadline.cancel();
        if (ec) s_logger.log(LOG_WARNING) << "Child process returned " << ec.message();
        else    s_logger.log(LOG_DEBUG)   << "Child process returned";
    };

    auto launcher = [](auto&&... args) { return bp::child(std::forward<decltype(args)>(args).../*, bp::posix::fd.restrict_inherit()*/); };

    auto redirect_out = [&](auto f) {
        return [&,f](auto&&... args) {
            if (_discard_output) {
                if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
                    s_logger.log(LOG_ERR) << "Ignoring output redirection with set_discard_output. This is a bug.";
                }
                return f(std::forward<decltype(args)>(args)..., bp::std_out > bp::null, bp::std_err > bp::null);
            }

            if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
                s_logger.log(LOG_WARNING) << "Conflicting output redirection, ignoring filename with descriptor";
            }

            if (_redirected_output_fd != -1) {
                return f(std::forward<decltype(args)>(args)..., bp::posix::fd.bind(1, _redirected_output_fd), bp::std_err > error);
            }

            return _redirected_output_fname.empty()
                ? f(std::forward<decltype(args)>(args)..., bp::std_out > output,                   bp::std_err > error)
                : f(std::forward<decltype(args)>(args)..., bp::std_out > _redirected_output_fname, bp::std_err > error);
        };
    };

    bp::environment bp_env = boost::this_process::environment();
    for (auto& p : _env)
        bp_env[p.first] = p.second;

    auto c = redirect_out(launcher)(_command_path, _args,
            process_group,
            bp::std_in < input,
            bp::start_dir(_working_directory),
            bp_env,
            ios, bp::on_exit(on_exit)
        );

    if (_time_constraint) {
        deadline.expires_from_now(*_time_constraint);
        deadline.async_wait([&](boost::system::error_code ec) {
            if (ec != boost::asio::error::operation_aborted) {
                if (ec) {
                    s_logger.log(LOG_WARNING) << "Unexpected condition in CommandRunner deadline: " << ec.message();
                }

                _command_timed_out = true;
                _exit_code = 1;
                ::killpg(process_group.native_handle(), SIGTERM);

                deadline.expires_from_now(3s); // grace time
                deadline.async_wait([&](boost::system::error_code ec) { if (!ec) process_group.terminate(); }); // timed out
            }
        });
    }

    boost::asio::async_write(input, bp::buffer(_stdin_data), [&input](auto ec, auto bytes_written){
        if (ec) {
            s_logger.log(LOG_WARNING) << "Standard input rejected: " << ec.message() << " after " << bytes_written << " bytes written";
        }
        may_fail([&] { input.close(); });
    });

    ios.run();

    if (output.valid()) _stdout_str = output.get();
    if (error.valid())  _stderr_str = error.get();

    // make sure no grand children survive
    if (process_group && process_group.joinable() && !process_group.wait_for(1s))
        process_group.terminate();

    // Note: even kills get the child reaped; 'on_exit' handler is
    // actually the 'on wait_pid() complete'). No need for c.wait()
    // in this scenario
    //// may_fail([&] { if (c.running()) c.wait(); }); // to avoid zombies
} catch(bp::process_error const& e) {
    if (e.code().value() != static_cast<int>(std::errc::no_child_process)) throw;
}

      

Full limited listing

This compiler, but has no public interface. Just for demonstration purposes.

Note:

  • this did not go into production as there were cases where the Boost implementation got stuck with asynchronous I / O. The reason for this remains unclear, but I suspect this hairy case epoll

    is fundamentally broken
    , related to our use cases, wildly "multithreaded and forking.
  • this does not include our workarounds for safe_io_service

    (which guarantees sync and fork notifications across all active io_services)
  • this does not include our patch that adds limited FD inheritance:

    auto launcher = [](auto&&... args) { return 
         bp::child(std::forward<decltype(args)>(args)..., 
         bp::posix::fd.restrict_inherit()); };
    
          

  • other important things are not shown here like on-fork handlers for global (library) state (they used pthread_atfork

    and the like)

Coliru

#include <boost/process.hpp>
#include <boost/process/extend.hpp>
#include <boost/process/async.hpp>
#include <boost/process/posix.hpp>
#include <boost/optional.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <signal.h> // ::killpg
#include <iomanip>
#include <map>

///////
// mockups for standalone exposition
//#include "log.h"
enum LOG_LEVELS{LOG_DEBUG, LOG_WARNING, LOG_ERR};

struct DummyLogger {
    struct Tx {
        template <typename...Ts> friend Tx operator<<(Tx const&, Ts&&...) { return {}; }
    };
    Tx log(LOG_LEVELS) { return {}; }
} s_logger;

//#include "safe_io_service.h"
namespace mylibrary { 
    namespace threads {
        //this did manage fork notifications
        using safe_io_service = boost::asio::io_service;
    } 
    using boost::optional;
    namespace io { namespace time {
            using clock = std::chrono::high_resolution_clock;
            using duration = clock::duration;
            using timer = boost::asio::high_resolution_timer;
        }
    }
}
using namespace std::chrono_literals;

using string_vector_t = std::vector<std::string>;
using string_map_t = std::map<std::string, std::string>;

class CommandRunner {
    public: struct IRunnerImpl;
};

struct CommandRunner::IRunnerImpl {
    virtual ~IRunnerImpl() = default;

    virtual void run()                    = 0;
    virtual void ensure_completed()       = 0;
    virtual std::string to_string() const = 0;

    friend class CommandRunner;
  protected:
    std::string            _working_directory;
    mylibrary::optional<mylibrary::io::time::duration> _time_constraint;
    std::string            _stdin_data;
    int                    _redirected_output_fd     = -1;
    std::string            _redirected_output_fname;
    bool                   _discard_output           = false;
    int                    _exit_code;
    std::string            _stdout_str;
    std::string            _stderr_str;
    bool                   _command_timed_out        = false;
    bool                   _sensitive_args           = false;
    string_map_t           _env;
};

//
//////////////////////////////////////

namespace {

    namespace bp = boost::process;

    template <typename F> static auto may_fail(F&& f) {
        try {
            return std::forward<F>(f)();
        } catch(std::exception const& e) {
            s_logger.log(LOG_DEBUG) << "Ignoring non-fatal error (" << e.what() << ")";
        } catch(...) {
            s_logger.log(LOG_DEBUG) << "Ignoring non-fatal, unspecified error";
        }
    }

}

namespace mylibrary { namespace process { namespace with_boost {

    struct BoostRunnerImpl : CommandRunner::IRunnerImpl {

        BoostRunnerImpl(std::string cmd_path, string_vector_t args) :
            _command_path(std::move(cmd_path)),
            _args(std::move(args))
        {
        }

        std::string _command_path;
        string_vector_t _args;

        virtual void run() override {
            if (_completed) {
                s_logger.log(LOG_DEBUG) << "NOT running already completed command: " << *this;
                return;
            }
            ensure_completed();
        }

        ////////////////////////////
        // implementation
        virtual void ensure_completed() override {
            if (_completed) return;

            // Log command and args
            s_logger.log(LOG_DEBUG) << "Running command: " << *this;

            try {
                try {
                    mylibrary::threads::safe_io_service safe_ios;
                    boost::asio::io_service& ios = safe_ios;
                    mylibrary::io::time::timer deadline(ios);

                    bp::group process_group;

                    bp::async_pipe input(ios);
                    std::future<std::string> output, error;

                    if (_working_directory.empty())
                        _working_directory = ".";

                    auto on_exit = [this, &deadline](int exit, std::error_code ec) {
                        if (!_command_timed_out) {
                            _exit_code = exit;
                        }
                        deadline.cancel();
                        if (ec) s_logger.log(LOG_WARNING) << "Child process returned " << ec.message();
                        else    s_logger.log(LOG_DEBUG)   << "Child process returned";
                    };

                    auto launcher = [](auto&&... args) { return bp::child(std::forward<decltype(args)>(args).../*, bp::posix::fd.restrict_inherit()*/); };

                    auto redirect_out = [&](auto f) {
                        return [&,f](auto&&... args) {
                            if (_discard_output) {
                                if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
                                    s_logger.log(LOG_ERR) << "Ignoring output redirection with set_discard_output. This is a bug.";
                                }
                                return f(std::forward<decltype(args)>(args)..., bp::std_out > bp::null, bp::std_err > bp::null);
                            }

                            if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
                                s_logger.log(LOG_WARNING) << "Conflicting output redirection, ignoring filename with descriptor";
                            }

                            if (_redirected_output_fd != -1) {
                                return f(std::forward<decltype(args)>(args)..., bp::posix::fd.bind(1, _redirected_output_fd), bp::std_err > error);
                            }

                            return _redirected_output_fname.empty()
                                ? f(std::forward<decltype(args)>(args)..., bp::std_out > output,                   bp::std_err > error)
                                : f(std::forward<decltype(args)>(args)..., bp::std_out > _redirected_output_fname, bp::std_err > error);
                        };
                    };

                    bp::environment bp_env = boost::this_process::environment();
                    for (auto& p : _env)
                        bp_env[p.first] = p.second;

                    auto c = redirect_out(launcher)(_command_path, _args,
                            process_group,
                            bp::std_in < input,
                            bp::start_dir(_working_directory),
                            bp_env,
                            ios, bp::on_exit(on_exit)
                        );

                    if (_time_constraint) {
                        deadline.expires_from_now(*_time_constraint);
                        deadline.async_wait([&](boost::system::error_code ec) {
                            if (ec != boost::asio::error::operation_aborted) {
                                if (ec) {
                                    s_logger.log(LOG_WARNING) << "Unexpected condition in CommandRunner deadline: " << ec.message();
                                }

                                _command_timed_out = true;
                                _exit_code = 1;
                                ::killpg(process_group.native_handle(), SIGTERM);

                                deadline.expires_from_now(3s); // grace time
                                deadline.async_wait([&](boost::system::error_code ec) { if (!ec) process_group.terminate(); }); // timed out
                            }
                        });
                    }

                    boost::asio::async_write(input, bp::buffer(_stdin_data), [&input](auto ec, auto bytes_written){
                        if (ec) {
                            s_logger.log(LOG_WARNING) << "Standard input rejected: " << ec.message() << " after " << bytes_written << " bytes written";
                        }
                        may_fail([&] { input.close(); });
                    });

                    ios.run();

                    if (output.valid()) _stdout_str = output.get();
                    if (error.valid())  _stderr_str = error.get();

                    // make sure no grand children survive
                    if (process_group && process_group.joinable() && !process_group.wait_for(1s))
                        process_group.terminate();

                    // Note: even kills get the child reaped; 'on_exit' handler is
                    // actually the 'on wait_pid() complete'). No need for c.wait()
                    // in this scenario
                    //// may_fail([&] { if (c.running()) c.wait(); }); // to avoid zombies
                } catch(bp::process_error const& e) {
                    if (e.code().value() != static_cast<int>(std::errc::no_child_process)) throw;
                }
            } catch(std::exception const& e) {
                s_logger.log(LOG_ERR) << "CommandRunner: " << e.what();
                _completed = true;
                _exit_code = 1;
                throw;
            } catch(...) {
                s_logger.log(LOG_ERR) << "CommandRunner: unexpected error";
                _completed = true;
                _exit_code = 1;
                throw;
            }

            _completed = true;
        }

      private:
        bool _completed = false;

        friend std::ostream& operator<<(std::ostream& os, BoostRunnerImpl const& i) {
            os << i._command_path;
            if (i._sensitive_args)
                os << " (" << i._args.size() << " args)";
            else for (auto& arg : i._args)
                os << " " << std::quoted(arg);
            return os;
        }

        virtual std::string to_string() const override {
            std::ostringstream oss;
            oss << *this;
            return oss.str();
        }
    };

} } }

int main(){}

      

0


source







All Articles