How to save the result by deserializing Serde Zero-copy futures Hyper Chunk?

I am using futures, tokio, hyper and serde_json to query and deserialize some data that I need to save before my next request. My initial thought was to create a structure containing hyper::Chunk

, and deserialized data that was borrowed from Chunk

but couldn't use the lifetime correctly. I tried using a rental box , but I can't seem to get this to work. I may be using the lifetime 'buffer

before I declare the buffer Vec

, but perhaps I am confusing something else:

#[rental]
pub struct ChunkJson<T: serde::de::Deserialize<'buffer>> {
    buffer: Vec<u8>,
    json: T
}

      

Is there a way to make the rights of life correct or should I just use DeserializeOwned

and discard the null copy?

For more context, the following code works (periodically deserializing the JSON from two urls, saving the results so we can do something with them). I would like to change the types X

and Y

to use Cow<'a, str>

for my fields, changing from DeserializeOwned

to Deserialize<'a>

. For this to work, I need to save the slice that has been deserialized for each, but I don't know how. I'm looking for examples that use Serde null copy deserialization and keep the result, or some ideas for restructuring my code to work.

#[macro_use]
extern crate serde_derive;

extern crate serde;
extern crate serde_json;
extern crate futures;
extern crate tokio_core;
extern crate tokio_periodic;
extern crate hyper;

use std::collections::HashMap;
use std::error::Error;

use futures::future;
use futures::Future;
use futures::stream::Stream;
use hyper::Client;


fn stream_json<'a, T: serde::de::DeserializeOwned + Send + 'a>
    (handle: &tokio_core::reactor::Handle,
     url: String,
     period: u64)
     -> Box<Stream<Item = T, Error = Box<Error>> + 'a> {
    let client = Client::new(handle);
    let timer = tokio_periodic::PeriodicTimer::new(handle).unwrap();
    timer
        .reset(::std::time::Duration::new(period, 0))
        .unwrap();
    Box::new(futures::Stream::zip(timer.from_err::<Box<Error>>(), futures::stream::unfold( (), move |_| {
            let uri = url.parse::<hyper::Uri>().unwrap();
            let get = client.get(uri).from_err::<Box<Error>>().and_then(|res| {
                res.body().concat().from_err::<Box<Error>>().and_then(|chunks| {
                    let p: Result<T, Box<Error>> = serde_json::from_slice::<T>(chunks.as_ref()).map_err(|e| Box::new(e) as Box<Error>);
                    match p {
                        Ok(json) => future::ok((json, ())),
                        Err(err) => future::err(err)
                    }
                })
            });
            Some(get)
        })).map(|x| { x.1 }))
}

#[derive(Serialize, Deserialize, Debug)]
pub struct X {
    foo: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Y {
    bar: String,
}

fn main() {

    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let x_stream = stream_json::<HashMap<String, X>>(&handle, "http://localhost/X".to_string(), 2);
    let y_stream = stream_json::<HashMap<String, Y>>(&handle, "http://localhost/Y".to_string(), 5);
    let mut xy_stream = x_stream.merge(y_stream);

    let mut last_x = HashMap::new();
    let mut last_y = HashMap::new();

    loop {
        match core.run(futures::Stream::into_future(xy_stream)) {
            Ok((Some(item), stream)) => {
                match item {
                    futures::stream::MergedItem::First(x) => last_x = x,
                    futures::stream::MergedItem::Second(y) => last_y = y,
                    futures::stream::MergedItem::Both(x, y) => {
                        last_x = x;
                        last_y = y;
                    }
                }
                println!("\nx = {:?}", &last_x);
                println!("y = {:?}", &last_y);
                // Do more stuff with &last_x and &last_y

                xy_stream = stream;
            }
            Ok((None, stream)) => xy_stream = stream,
            Err(_) => {
                panic!("error");
            }
        }
    }
}

      

+3


source to share


1 answer


When trying to solve a complex programming problem, it is very helpful to remove as much as possible. Take your code and remove whatever you can until the problem goes away. Change your code a little and keep deleting until you can no longer. Then, turn the problem around and build from the smallest part and go back to the error. Doing both of these will show you where the problem is.

First, make sure we are deserializing correctly:

extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;

use std::borrow::Cow;

#[derive(Debug, Deserialize)]
pub struct Example<'a> {
    #[serde(borrow)]
    name: Cow<'a, str>,
    key: bool,
}

impl<'a> Example<'a> {
    fn info(&self) {
        println!("{:?}", self);
        match self.name {
            Cow::Borrowed(_) => println!("Is borrowed"),
            Cow::Owned(_) => println!("Is owned"),
        }
    }
}

fn main() {
    let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();

    let decoded: Example = serde_json::from_slice(&data).expect("Couldn't deserialize");
    decoded.info();
}

      

I forgot to add the attribute here #[serde(borrow)]

, so I'm glad I did this test!

We can then enter the rental box:

#[macro_use]
extern crate rental;

rental! {
    mod holding {
        use super::*;

        #[rental]
        pub struct VecHolder {
            data: Vec<u8>,
            parsed: Example<'data>,
        }
    }
}

fn main() {
    let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();

    let holder = holding::VecHolder::try_new(data, |data| {
        serde_json::from_slice(data)
    });
    let holder = match holder {
        Ok(holder) => holder,
        Err(_) => panic!("Unable to construct rental"),
    };

    holder.rent(|example| example.info());

    // Make sure we can move the data and it still valid
    let holder2 = { holder };
    holder2.rent(|example| example.info());
}

      

Then we'll try to create a lease Chunk

:

#[rental]
pub struct ChunkHolder {
    data: Chunk,
    parsed: Example<'data>,
}

      



Unfortunately this fails:

  --> src/main.rs:29:1
   |
29 | rental! {
   | ^
   |
   = help: message: Field `data` must have an angle-bracketed type parameter or be `String`.

      

Oops! While checking the lease documents , we can add #[target_ty_hack="[u8]"]

to the box data

. This leads to:

error[E0277]: the trait bound `hyper::Chunk: rental::__rental_prelude::StableDeref` is not satisfied
  --> src/main.rs:29:1
   |
29 | rental! {
   | ^ the trait `rental::__rental_prelude::StableDeref` is not implemented for `hyper::Chunk`
   |
   = note: required by `rental::__rental_prelude::static_assert_stable_deref`

      

This is annoying; since we cannot implement this feature for Chunk

, we just need to insert Chunk

, proving that it has a stable address:

#[rental]
pub struct ChunkHolder {
    data: Box<Chunk>,
    parsed: Example<'data>,
}

      

I also looked to see if there is a way to return Vec<u8>

from Chunk

, but it doesn't exist. This would be a different solution with less distribution and indirection.

At this point, โ€œeverythingโ€ is left to integrate this back into the futures code. It's a lot of work for everyone but you to recreate this, but I don't foresee any obvious problems with this.

+3


source







All Articles