How can I use hyper :: client from another thread?

I have multiple threads doing heavy operations and need to use a client in the middle. I am using Hyper v0.11 as an HTTP client and I would like to reuse connections, so I need to use the same hyper::Client

one in order to keep open connections (in mode keep-alive

).

The client is not shared between threads (it does not implement Sync

or Send

). Here is a small piece of code I was trying to do:

let mut core = Core::new().expect("Create Client Event Loop");
let handle = core.handle();

let remote = core.remote();

let client = Client::new(&handle.clone());

thread::spawn(move || {

    // intensive operations...

    let response = &client.get("http://google.com".parse().unwrap()).and_then(|res| {
        println!("Response: {}", res.status());
        Ok(())
    });

    remote.clone().spawn(|_| {
        response.map(|_| { () }).map_err(|_| { () })
    });

    // more intensive operations...
});
core.run(futures::future::empty::<(), ()>()).unwrap();

      

This code doesn't compile:

thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Weak<std::cell::RefCell<tokio_core::reactor::Inner>>`

thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<tokio_proto::util::client_proxy::ClientProxy<tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RequestLine>, hyper::Body>, tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RawStatus>, tokio_proto::streaming::body::Body<hyper::Chunk, hyper::Error>>, hyper::Error>>>>`
...
remote.clone().spawn(|_| {
               ^^^^^ the trait `std::marker::Sync` is not implemented for `futures::Future<Error=hyper::Error, Item=hyper::Response> + 'static`

      

Is there a way to reuse the same client from different threads or a different approach?

+3


source to share


1 answer


The short answer is no, but it's better.

Each object Client

contains a pool of connections. Here's how Hyper Pool

is defined in version 0.11.0:

pub struct Pool<T> {
    inner: Rc<RefCell<PoolInner<T>>>,
}

      

Since it is inner

counted with Rc

and checked at runtime with RefCell

, the pool is certainly not thread safe. When you tried to move this Client

to a new thread, this object will contain a pool that lives on another thread that would be the source of the data calculations.

This implementation is clear. Attempting to reuse an HTTP connection across multiple threads is not very common as it requires synchronized resource access, which is mainly related to I / O intensity. This ties in pretty well with Tokio's asynchronous nature. In fact, it makes more sense to execute multiple requests on the same thread, and the Tokio core takes care of sending messages and receiving them asynchronously, without waiting for each response in sequence to respond. In addition, computationally intensive tasks can be performed by a CPU pool from futures_cpupool

. With that in mind, the code below works great:



extern crate tokio_core;
extern crate hyper;
extern crate futures;
extern crate futures_cpupool;

use tokio_core::reactor::Core;
use hyper::client::Client;
use futures::Future;
use futures_cpupool::CpuPool;

fn main() {

    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = Client::new(&handle.clone());
    let pool = CpuPool::new(1);

    println!("Begin!");
    let req = client.get("http://google.com".parse().unwrap())
        .and_then(|res| {
            println!("Response: {}", res.status());
            Ok(())
        });
    let intensive = pool.spawn_fn(|| {
        println!("I'm working hard!!!");
        std::thread::sleep(std::time::Duration::from_secs(1));
        println!("Phew!");
        Ok(())
    });

    let task = req.join(intensive)
        .map(|_|{
            println!("End!");
        });
    core.run(task).unwrap();
}

      

If no response is received too late, the output will be:

Begin!
I'm working hard!!!
Response: 302 Found
Phew!
End!

      

If you have multiple tasks running on separate threads, the problem becomes open as there are multiple architectures. One of them is to delegate all communications to one actor, thus requiring all other worker threads to send their data to him. In addition, you can have one client object per worker, and you can also have separate connection pools.

+3


source







All Articles