Variable parameter size function: how to conditionally set some arguments?
To create a boost :: process with output redirection, you must do:
bp::ipstream out;
bp::child c("c++filt", std_out > out);
Now, what would be the syntax for conditional redirection control:
bool redirect = true; // or false
bp::ipstream out;
bp::child c("c++filt", (redirect) ? std_out > out : "what should I put here??" );
source to share
Attractive API boost::process
rotates around of handler
objects that implement on_setup
, on_error
, on_success
(and potentially more, depending on the current operating system) methods that will be executed when called building process in the context of some of the inner panel and start the process to modify the behavior of the starting process installation. std_out > out
is an overloaded operator that will return such a handler. More information can be found in the extension process extension documentation .
So a general way to conditionally control redirects and other parameters would be to write a generic class proxy that takes an optional real handler and call the corresponding methods of the real handler:
#include <boost/process.hpp>
#include <boost/process/extend.hpp>
#include <boost/optional.hpp>
#include <memory>
#include <utility>
template<typename TRealHandler> class
t_OptionalHandler
: public ::boost::process::extend::handler
{
private: using t_OptionalRealHandler = ::boost::optional<TRealHandler>;
private: t_OptionalRealHandler m_real_handler;
public:
t_OptionalHandler(void)
: m_real_handler{}
{}
public: explicit
t_OptionalHandler(TRealHandler && real_handler)
: m_real_handler{::std::move(real_handler)}
{}
template<typename TExecutor>
void on_setup(TExecutor & e) const
{
if(m_real_handler)
{
m_real_handler.get().on_setup(e);
}
}
template<typename TExecutor>
void on_error(TExecutor & e, const std::error_code & code) const
{
if(m_real_handler)
{
m_real_handler.get().on_error(e, code);
}
}
template<typename TExecutor>
void on_success(TExecutor & e) const
{
if(m_real_handler)
{
m_real_handler.get().on_success(e);
}
}
};
int main()
{
bool const need_to_redirect{false};
::std::unique_ptr<::boost::process::ipstream> const p_stream
{
need_to_redirect?
new ::boost::process::ipstream{}
:
nullptr
};
using t_OptionalStdOutRedirectionHandler = t_OptionalHandler
<
decltype(::boost::process::std_out > *p_stream)
>;
::boost::process::child ch
(
"cmd"
, need_to_redirect?
t_OptionalStdOutRedirectionHandler{::boost::process::std_out > *p_stream}
:
t_OptionalStdOutRedirectionHandler{}
);
ch.wait();
return(0);
}
source to share
I was there.
Indeed, startup functions (not exactly factories, but composite procedural wrappers) were what I used.
Introduction
We had a CommandRunner with a legacy implementation that I rewrote to use Boost. I am missing the open interface:
class CommandRunner {
public: struct IRunnerImpl;
};
The implementation was Pimpl-ed and worked with the base implementation while keeping mostly simple implementation-independent options:
struct CommandRunner::IRunnerImpl {
virtual ~IRunnerImpl() = default;
virtual void run() = 0;
virtual void ensure_completed() = 0;
virtual std::string to_string() const = 0;
friend class CommandRunner;
protected:
std::string _working_directory;
mylibrary::optional<mylibrary::io::time::duration> _time_constraint;
std::string _stdin_data;
int _redirected_output_fd = -1;
std::string _redirected_output_fname;
bool _discard_output = false;
int _exit_code;
std::string _stdout_str;
std::string _stderr_str;
bool _command_timed_out = false;
bool _sensitive_args = false;
string_map_t _env;
};
Boost RunnerImpl
The core is ensure_completed
composed using helper lambdas like this:
try {
mylibrary::threads::safe_io_service safe_ios;
boost::asio::io_service& ios = safe_ios;
mylibrary::io::time::timer deadline(ios);
bp::group process_group;
bp::async_pipe input(ios);
std::future<std::string> output, error;
if (_working_directory.empty())
_working_directory = ".";
auto on_exit = [this, &deadline](int exit, std::error_code ec) {
if (!_command_timed_out) {
_exit_code = exit;
}
deadline.cancel();
if (ec) s_logger.log(LOG_WARNING) << "Child process returned " << ec.message();
else s_logger.log(LOG_DEBUG) << "Child process returned";
};
auto launcher = [](auto&&... args) { return bp::child(std::forward<decltype(args)>(args).../*, bp::posix::fd.restrict_inherit()*/); };
auto redirect_out = [&](auto f) {
return [&,f](auto&&... args) {
if (_discard_output) {
if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
s_logger.log(LOG_ERR) << "Ignoring output redirection with set_discard_output. This is a bug.";
}
return f(std::forward<decltype(args)>(args)..., bp::std_out > bp::null, bp::std_err > bp::null);
}
if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
s_logger.log(LOG_WARNING) << "Conflicting output redirection, ignoring filename with descriptor";
}
if (_redirected_output_fd != -1) {
return f(std::forward<decltype(args)>(args)..., bp::posix::fd.bind(1, _redirected_output_fd), bp::std_err > error);
}
return _redirected_output_fname.empty()
? f(std::forward<decltype(args)>(args)..., bp::std_out > output, bp::std_err > error)
: f(std::forward<decltype(args)>(args)..., bp::std_out > _redirected_output_fname, bp::std_err > error);
};
};
bp::environment bp_env = boost::this_process::environment();
for (auto& p : _env)
bp_env[p.first] = p.second;
auto c = redirect_out(launcher)(_command_path, _args,
process_group,
bp::std_in < input,
bp::start_dir(_working_directory),
bp_env,
ios, bp::on_exit(on_exit)
);
if (_time_constraint) {
deadline.expires_from_now(*_time_constraint);
deadline.async_wait([&](boost::system::error_code ec) {
if (ec != boost::asio::error::operation_aborted) {
if (ec) {
s_logger.log(LOG_WARNING) << "Unexpected condition in CommandRunner deadline: " << ec.message();
}
_command_timed_out = true;
_exit_code = 1;
::killpg(process_group.native_handle(), SIGTERM);
deadline.expires_from_now(3s); // grace time
deadline.async_wait([&](boost::system::error_code ec) { if (!ec) process_group.terminate(); }); // timed out
}
});
}
boost::asio::async_write(input, bp::buffer(_stdin_data), [&input](auto ec, auto bytes_written){
if (ec) {
s_logger.log(LOG_WARNING) << "Standard input rejected: " << ec.message() << " after " << bytes_written << " bytes written";
}
may_fail([&] { input.close(); });
});
ios.run();
if (output.valid()) _stdout_str = output.get();
if (error.valid()) _stderr_str = error.get();
// make sure no grand children survive
if (process_group && process_group.joinable() && !process_group.wait_for(1s))
process_group.terminate();
// Note: even kills get the child reaped; 'on_exit' handler is
// actually the 'on wait_pid() complete'). No need for c.wait()
// in this scenario
//// may_fail([&] { if (c.running()) c.wait(); }); // to avoid zombies
} catch(bp::process_error const& e) {
if (e.code().value() != static_cast<int>(std::errc::no_child_process)) throw;
}
Full limited listing
This compiler, but has no public interface. Just for demonstration purposes.
Note:
- this did not go into production as there were cases where the Boost implementation got stuck with asynchronous I / O. The reason for this remains unclear, but I suspect this hairy case
epoll
is fundamentally broken , related to our use cases, wildly "multithreaded and forking. - this does not include our workarounds for
safe_io_service
(which guarantees sync and fork notifications across all active io_services) -
this does not include our patch that adds limited FD inheritance:
auto launcher = [](auto&&... args) { return bp::child(std::forward<decltype(args)>(args)..., bp::posix::fd.restrict_inherit()); };
-
other important things are not shown here like on-fork handlers for global (library) state (they used
pthread_atfork
and the like)
#include <boost/process.hpp>
#include <boost/process/extend.hpp>
#include <boost/process/async.hpp>
#include <boost/process/posix.hpp>
#include <boost/optional.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <signal.h> // ::killpg
#include <iomanip>
#include <map>
///////
// mockups for standalone exposition
//#include "log.h"
enum LOG_LEVELS{LOG_DEBUG, LOG_WARNING, LOG_ERR};
struct DummyLogger {
struct Tx {
template <typename...Ts> friend Tx operator<<(Tx const&, Ts&&...) { return {}; }
};
Tx log(LOG_LEVELS) { return {}; }
} s_logger;
//#include "safe_io_service.h"
namespace mylibrary {
namespace threads {
//this did manage fork notifications
using safe_io_service = boost::asio::io_service;
}
using boost::optional;
namespace io { namespace time {
using clock = std::chrono::high_resolution_clock;
using duration = clock::duration;
using timer = boost::asio::high_resolution_timer;
}
}
}
using namespace std::chrono_literals;
using string_vector_t = std::vector<std::string>;
using string_map_t = std::map<std::string, std::string>;
class CommandRunner {
public: struct IRunnerImpl;
};
struct CommandRunner::IRunnerImpl {
virtual ~IRunnerImpl() = default;
virtual void run() = 0;
virtual void ensure_completed() = 0;
virtual std::string to_string() const = 0;
friend class CommandRunner;
protected:
std::string _working_directory;
mylibrary::optional<mylibrary::io::time::duration> _time_constraint;
std::string _stdin_data;
int _redirected_output_fd = -1;
std::string _redirected_output_fname;
bool _discard_output = false;
int _exit_code;
std::string _stdout_str;
std::string _stderr_str;
bool _command_timed_out = false;
bool _sensitive_args = false;
string_map_t _env;
};
//
//////////////////////////////////////
namespace {
namespace bp = boost::process;
template <typename F> static auto may_fail(F&& f) {
try {
return std::forward<F>(f)();
} catch(std::exception const& e) {
s_logger.log(LOG_DEBUG) << "Ignoring non-fatal error (" << e.what() << ")";
} catch(...) {
s_logger.log(LOG_DEBUG) << "Ignoring non-fatal, unspecified error";
}
}
}
namespace mylibrary { namespace process { namespace with_boost {
struct BoostRunnerImpl : CommandRunner::IRunnerImpl {
BoostRunnerImpl(std::string cmd_path, string_vector_t args) :
_command_path(std::move(cmd_path)),
_args(std::move(args))
{
}
std::string _command_path;
string_vector_t _args;
virtual void run() override {
if (_completed) {
s_logger.log(LOG_DEBUG) << "NOT running already completed command: " << *this;
return;
}
ensure_completed();
}
////////////////////////////
// implementation
virtual void ensure_completed() override {
if (_completed) return;
// Log command and args
s_logger.log(LOG_DEBUG) << "Running command: " << *this;
try {
try {
mylibrary::threads::safe_io_service safe_ios;
boost::asio::io_service& ios = safe_ios;
mylibrary::io::time::timer deadline(ios);
bp::group process_group;
bp::async_pipe input(ios);
std::future<std::string> output, error;
if (_working_directory.empty())
_working_directory = ".";
auto on_exit = [this, &deadline](int exit, std::error_code ec) {
if (!_command_timed_out) {
_exit_code = exit;
}
deadline.cancel();
if (ec) s_logger.log(LOG_WARNING) << "Child process returned " << ec.message();
else s_logger.log(LOG_DEBUG) << "Child process returned";
};
auto launcher = [](auto&&... args) { return bp::child(std::forward<decltype(args)>(args).../*, bp::posix::fd.restrict_inherit()*/); };
auto redirect_out = [&](auto f) {
return [&,f](auto&&... args) {
if (_discard_output) {
if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
s_logger.log(LOG_ERR) << "Ignoring output redirection with set_discard_output. This is a bug.";
}
return f(std::forward<decltype(args)>(args)..., bp::std_out > bp::null, bp::std_err > bp::null);
}
if (_redirected_output_fd != -1 && !_redirected_output_fname.empty()) {
s_logger.log(LOG_WARNING) << "Conflicting output redirection, ignoring filename with descriptor";
}
if (_redirected_output_fd != -1) {
return f(std::forward<decltype(args)>(args)..., bp::posix::fd.bind(1, _redirected_output_fd), bp::std_err > error);
}
return _redirected_output_fname.empty()
? f(std::forward<decltype(args)>(args)..., bp::std_out > output, bp::std_err > error)
: f(std::forward<decltype(args)>(args)..., bp::std_out > _redirected_output_fname, bp::std_err > error);
};
};
bp::environment bp_env = boost::this_process::environment();
for (auto& p : _env)
bp_env[p.first] = p.second;
auto c = redirect_out(launcher)(_command_path, _args,
process_group,
bp::std_in < input,
bp::start_dir(_working_directory),
bp_env,
ios, bp::on_exit(on_exit)
);
if (_time_constraint) {
deadline.expires_from_now(*_time_constraint);
deadline.async_wait([&](boost::system::error_code ec) {
if (ec != boost::asio::error::operation_aborted) {
if (ec) {
s_logger.log(LOG_WARNING) << "Unexpected condition in CommandRunner deadline: " << ec.message();
}
_command_timed_out = true;
_exit_code = 1;
::killpg(process_group.native_handle(), SIGTERM);
deadline.expires_from_now(3s); // grace time
deadline.async_wait([&](boost::system::error_code ec) { if (!ec) process_group.terminate(); }); // timed out
}
});
}
boost::asio::async_write(input, bp::buffer(_stdin_data), [&input](auto ec, auto bytes_written){
if (ec) {
s_logger.log(LOG_WARNING) << "Standard input rejected: " << ec.message() << " after " << bytes_written << " bytes written";
}
may_fail([&] { input.close(); });
});
ios.run();
if (output.valid()) _stdout_str = output.get();
if (error.valid()) _stderr_str = error.get();
// make sure no grand children survive
if (process_group && process_group.joinable() && !process_group.wait_for(1s))
process_group.terminate();
// Note: even kills get the child reaped; 'on_exit' handler is
// actually the 'on wait_pid() complete'). No need for c.wait()
// in this scenario
//// may_fail([&] { if (c.running()) c.wait(); }); // to avoid zombies
} catch(bp::process_error const& e) {
if (e.code().value() != static_cast<int>(std::errc::no_child_process)) throw;
}
} catch(std::exception const& e) {
s_logger.log(LOG_ERR) << "CommandRunner: " << e.what();
_completed = true;
_exit_code = 1;
throw;
} catch(...) {
s_logger.log(LOG_ERR) << "CommandRunner: unexpected error";
_completed = true;
_exit_code = 1;
throw;
}
_completed = true;
}
private:
bool _completed = false;
friend std::ostream& operator<<(std::ostream& os, BoostRunnerImpl const& i) {
os << i._command_path;
if (i._sensitive_args)
os << " (" << i._args.size() << " args)";
else for (auto& arg : i._args)
os << " " << std::quoted(arg);
return os;
}
virtual std::string to_string() const override {
std::ostringstream oss;
oss << *this;
return oss.str();
}
};
} } }
int main(){}
source to share