上一篇我写的文章Tensorflow Rust实战上篇. 这一次咱们看看使用tensorflow创建了什么,并经过http接口提供服务。随着Actix Web1.0版本发布,我认为用它构建一些东西将是一个很好的时机。html
本文假设您对Futures及其运做方式有必定的了解。我将尽可能用更简单的术语解释,但理解Futures生态系统将很是有效地帮助阅读本文。为此,我建议你从tokio开始。git
有些人建议在深刻Futures以前等待async/await和friends功能发布。我认为你如今应该亲自动手:异步编程老是颇有挑战性。
再一次为了避免耐烦的人,您能够在actix-web分支上找到参考代码:
https://github.com/cetra3/mtc...github
这里的API很是简单。咱们想模仿咱们在命令行上所作的事情:提交一张图片,返回结果是一张图片。为了使事情变得有趣,咱们将提供一种方式:将边界框以JSON数组返回。web
关于经过http协议提交二进制数据,我首先想到了几种选择:算法
我认为最简单是原始数据,因此让咱们这样作! multipart/form-data可能ok,可是你必须处理多个图像的时候呢? JSON格式彷佛有点浪费,由于您不可避免地必须使用base64或相似的方式转换二进制数据。编程
因此咱们的API是这样的:json
在咱们上一篇博客中,咱们只是简单地使用main函数来执行全部操做,但咱们必须一些重构才能与actix一块儿使用。咱们但愿将MTCNN行为封装为结构,能够传递和转移。最终目标是在应用程序状态下使用它。segmentfault
让咱们将结构包含咱们想要的一切:api
首先,咱们建立一个新文件mtcnn.rs并加上结构体定义。数组
use tensorflow::{Graph, Session, Tensor}; pub struct Mtcnn { graph: Graph, session: Session, min_size: Tensor<f32>, thresholds: Tensor<f32>, factor: Tensor<f32> }
而后,如今咱们只是用new方法填充初始化内容。因为其中一些值的初始化并不是绝对可靠,咱们将返回Result:
pub fn new() -> Result<Self, Box<dyn Error>> { let model = include_bytes!("mtcnn.pb"); let mut graph = Graph::new(); graph.import_graph_def(&*model, &ImportGraphDefOptions::new())?; let session = Session::new(&SessionOptions::new(), &graph)?; let min_size = Tensor::new(&[]).with_values(&[40f32])?; let thresholds = Tensor::new(&[3]).with_values(&[0.6f32, 0.7f32, 0.7f32])?; let factor = Tensor::new(&[]).with_values(&[0.709f32])?; Ok(Self { graph, session, min_size, thresholds, factor }) }
我将在这里开始加快节奏,因此若是你遇到困难或不肯定发生了什么,请查看 Tensorflow Rust实战上篇,以解释这里发生的事情。
咱们已经添加了全部须要跑一个会话的东西。让咱们建立一个须要API作什么的方法:提交一张图片,响应一些边界框(框出人脸的位置):
pub fn run(&self, img: &DynamicImage) -> Result<Vec<BBoxes>, Status> { ... }
再一次,咱们响应了一个Result类型,由于在某些状况下run方法会失败。咱们使用Status类型来表示响应错误的类型。
像咱们先前的main方法,咱们须要压平图片的输入:
let input = { let mut flattened: Vec<f32> = Vec::new(); for (_x, _y, rgb) in img.pixels() { flattened.push(rgb[2] as f32); flattened.push(rgb[1] as f32); flattened.push(rgb[0] as f32); } Tensor::new(&[img.height() as u64, img.width() as u64, 3]) .with_values(&flattened)? };
而后咱们将提供全部相关输入。这与咱们以前的main方法相同,但咱们只是从self中借用值,而不是为每次运行建立它们:
let mut args = SessionRunArgs::new(); args.add_feed( &self.graph.operation_by_name_required("min_size")?, 0, &self.min_size, ); args.add_feed( &self.graph.operation_by_name_required("thresholds")?, 0, &self.thresholds, ); args.add_feed( &self.graph.operation_by_name_required("factor")?, 0, &self.factor, ); args.add_feed(&self.graph.operation_by_name_required("input")?, 0, &input);
接下来,咱们抓住咱们想要的输出:
let bbox = args.request_fetch(&self.graph.operation_by_name_required("box")?, 0); let prob = args.request_fetch(&self.graph.operation_by_name_required("prob")?, 0);
如今咱们设置了全部参数,咱们能够跑session了:
&self.session.run(&mut args)?;
噢哦!咱们获得一个编译器错误:
error[E0596]: cannot borrow `self.session` as mutable, as it is behind a `&` reference --> src/mtcnn.rs:68:10 | 36 | pub fn run(&self, img: &DynamicImage) -> Result<DynamicImage, Box<dyn Error>> { | ----- help: consider changing this to be a mutable reference: `&mut self` ... 68 | &self.session.run(&mut args)?; | ^^^^^^^^^^^^ `self` is a `&` reference, so the data it refers to cannot be borrowed as mutable
事实证实,Session::run()方法采用&mut self。咱们能够作些什么来解决这个问题:
咱们选择了第三种方式!
更新你的 Cargo.toml,指定git而不是cargo里的crate版本号:
tensorflow = { git = "https://github.com/tensorflow/rust"}
自从咱们的main方法以来,这一点都没有改变。咱们获取边界框,将它们放入咱们的BBox结构中:
//Our bounding box extents let bbox_res: Tensor<f32> = args.fetch(bbox)?; //Our facial probability let prob_res: Tensor<f32> = args.fetch(prob)?; //Let's store the results as a Vec<BBox> let mut bboxes = Vec::new(); let mut i = 0; let mut j = 0; //While we have responses, iterate through while i < bbox_res.len() { //Add in the 4 floats from the `bbox_res` array. //Notice the y1, x1, etc.. is ordered differently to our struct definition. bboxes.push(BBox { y1: bbox_res[i], x1: bbox_res[i + 1], y2: bbox_res[i + 2], x2: bbox_res[i + 3], prob: prob_res[j], // Add in the facial probability }); //Step `i` ahead by 4. i += 4; //Step `i` ahead by 1. j += 1; } debug!("BBox Length: {}, BBoxes:{:#?}", bboxes.len(), bboxes); Ok(bboxes)
到此,咱们的run方法完成了。
咱们打算响应表明BBox结构体的JSON,因此添加serde_derive中的Serialize(序列化相关模块):
use serde_derive::Serialize; #[derive(Copy, Clone, Debug, Serialize)] pub struct BBox { pub x1: f32, pub y1: f32, pub x2: f32, pub y2: f32, pub prob: f32, }
咱们将要添加一个方法,输入一张图片和一个边界框数组,响应输出的图片:
pub fn overlay(img: &DynamicImage, bboxes: &Vec<BBox>) -> DynamicImage
这里也没有多大的变化,只是响应了一张图片而不是保存一个文件:
//Let's clone the input image let mut output_image = img.clone(); //Iterate through all bounding boxes for bbox in bboxes { //Create a `Rect` from the bounding box. let rect = Rect::at(bbox.x1 as i32, bbox.y1 as i32) .of_size((bbox.x2 - bbox.x1) as u32, (bbox.y2 - bbox.y1) as u32); //Draw a green line around the bounding box draw_hollow_rect_mut(&mut output_image, rect, LINE_COLOUR); } output_image
好的,咱们已经完成了咱们的Mtcnn结构体和方法!咱们能够进一步吗?是的,绝对能够!但就目前而言,我认为这就是咱们所须要的。咱们已经封装了行为并建立了一个很好用的几个函数。
咱们再也不将它用做命令行程序,而是用做自托管的Web应用程序。由于咱们再也不有输入和输出文件,因此咱们须要更改应用程序所需的参数。
我认为咱们最初应该拿到的惟一参数是监听地址,即便这样咱们也应该使用合理的默认值。因此让咱们经过structopt的帮助来制做这个很是小的demo:
#[derive(StructOpt)] struct Opt { #[structopt( short = "l", long = "listen", help = "Listen Address", default_value = "127.0.0.1:8000" )] listen: String, }
Actix Web使用log crate来显示errors和debug message。
让咱们使用log替代println!。我喜欢使用pretty_env_logger,由于它将不一样的级别打印为不一样的颜色,而且咱们可使用有用的时间戳。
pretty_env_logger仍然使用环境变量。那就让咱们设置环境变量RUST_LOG,而后启动咱们的logger。
//Set the `RUST_LOG` var if none is provided if env::var("RUST_LOG").is_err() { env::set_var("RUST_LOG", "mtcnn=DEBUG,actix_web=DEBUG"); } //Create a timestamped logger pretty_env_logger::init_timed();
这为咱们的app和actix web设置了DEBUG级别日志,但容许咱们经过环境变量更改日志级别。
咱们须要将一些状态传递给actix使用:Mtcnn结构体和run方法。你能够经过多种方式传递状态提供actix,但最简单的方法应该是App::data方法。当咱们正在进入一个多线程世界时,咱们将不得不考虑Send/Sync。
好的,那么咱们如何在线程之间分享数据呢?好吧,做为第一步,我会看看std::sync。因为咱们知道mtcnn的run函数不须要可变引用,只须要不可变self引用,咱们能够将它包装在Arc中。若是咱们不得不使用可变引用,那么可能也须要Mutex,可是若是咱们使用tensorflow-rust的主分支,能够避免这种状况。
那么让咱们建立一个Arc:
let mtcnn = Arc::new(Mtcnn::new()?);
如今能够实例化服务:
HttpServer::new(move || { App::new() //Add in our mtcnn struct, we clone the reference for each worker thread .data(mtcnn.clone()) //Add in a logger to see the requests coming through .wrap(middleware::Logger::default()) // Add in some routes here .service( ... ) }) .bind(&opt.listen)? // Use the listener from the command arguments .run()
总结一下咱们已完成的事情:
Actix Web是一个异步框架,使用tokio。咱们的function是同步,须要一些时间才能处理完成。换句话说,咱们的请求是阻塞的。咱们能够混合使用同步和异步,固然,处理起来有点麻烦。
Actix 1.0大量使用Extractors,Extractors为方法定义提供彻底不一样形式。您指定但愿接口接收的内容,actix将为您进行串联起来。请注意:这确实意味着在运行以前不能发现错误。我在web::Data参数中使用了错误的类型签名时的一个示例。
那么咱们须要从咱们的请求中提取什么?request body的bytes和mtcnn:
fn handle_request( stream: web::Payload, mtcnn: web::Data<Arc<Mtcnn>>, ) -> impl Future<Item = HttpResponse, Error = ActixError> { ... }
咱们将在mtcnn中使用这种类型(web::Data<Arc<Mtcnn>>),所以让咱们为它建立一个类型别名:
type WebMtcnn = web::Data<Arc<Mtcnn>>;
注:这里的payload指的是http请求中header后面的部分。
咱们须要一种从payload中检索图像并返回Future的方法。 web::Payload结构体实现了Stream将Item设置为Bytes。
从流中得到单个字节是没有意义的,咱们想要得到整个批次并对图像进行解码!所以,让咱们将Stream转换为Future,并将咱们将要得到的全部单个字节合并到一个大的字节桶中。听起来很复杂,但幸运的是Stream有一个方法:concat2。
concat2是一个很是强大的组合器,它容许咱们将单个Stream轮询的结果加入到一个集合中,若是该项实现了Extend(以及一些其它的trait),Bytes就会支持扩展。
所以就像这样:
stream.concat2().and_then(....)
咱们须要解决的第二件事是:若是咱们要解码出图像,那么会阻止线程直到解码完成。若是它是一个巨大的图像,它可能须要几毫秒!所以,咱们但愿确保在发生这种状况时咱们不会发生阻塞。幸运的是,actix web有一种方法能够将阻塞代码包装为future:
stream.concat2().and_then(move |bytes| { web::block(move || { image::load_from_memory(&bytes) }) })
咱们采用stream,将其转换为 future 和 bytes,而后使用 web::block 将字节解码为后台线程中的图像并返回结果。load_from_memory 函数返回了一个Result,这意味着咱们能够将其用做返回类型。
所以,咱们的 Item 被转换为 Bytes 再到 DynamicImage,但咱们尚未处理错误类型,没法编译经过。咱们的错误类型应该是什么?让咱们使用 actix_web::Error 做为 ActixError:
use actix_web::{Error as ActixError} fn get_image(stream: web::Payload) -> impl Future<Item = DynamicImage, Error = ActixError> { stream.concat2().and_then(move |bytes| { web::block(move || { image::load_from_memory(&bytes) }) }) }
好吧,当咱们尝试编译时,出现了错误:
error[E0271]: type mismatch resolving `<impl futures::future::Future as futures::future::IntoFuture>::Error == actix_http::error::PayloadError` --> src/main.rs:67:22 | 67 | stream.concat2().and_then(move |bytes| { | ^^^^^^^^ expected enum `actix_threadpool::BlockingError`, found enum `actix_http::error::PayloadError` | = note: expected type `actix_threadpool::BlockingError<image::image::ImageError>` found type `actix_http::error::PayloadError` 还有一些未列出的内容...
当您组合 stream 时,将它们映射为 future,以及尝试从这些组合器得到一些输出时,您实际上处理的是Item类型 和 Error类型 。
处理多种类型的响应结果会使代码变得丑陋,这里不像 Result类型可使用问号(?)自动调整到正确的错误。当 ops::Try 和 async/await语法变得稳定的时候,事情可能变得简单,可是如今,咱们必须想办法处理这些错误类型。
咱们可使用 from_err() 方法。做用跟问号(?)基本相同,区别是from_err做用于future。咱们有两个正在处理的future:来自stream的字节数组 和 来自阻塞闭包的图像。咱们有3种错误类型:the Payload error, the Image load from memory error, and the blocking error:
fn get_image(stream: web::Payload) -> impl Future<Item = DynamicImage, Error = ActixError> { stream.concat2().from_err().and_then(move |bytes| { web::block(move || { image::load_from_memory(&bytes) }).from_err() }) }
最重要的是,咱们须要run起来:
mtcnn.run(&img)
可是咱们想要在一个线程池里跑起来:
web::block(|| mtcnn.run(&img))
让咱们看看函数声明。至少咱们须要图像和mtcnn结构体。而后咱们想要返回BBox的Vec。咱们保持错误类型相同,所以咱们将使用ActixError类型。
函数声明以下:
fn get_bboxes(img: DynamicImage, mtcnn: WebMtcnn) -> impl Future<Item = Vec<BBox>, Error = ActixError>
咱们须要在 web::block 上使用 from_err() 来转换错误类型,使用move来将图像提供给闭包:
fn get_bboxes(img: DynamicImage, mtcnn: WebMtcnn) -> impl Future<Item = Vec<BBox>, Error = ActixError> { web::block(move || mtcnn.run(&img)).from_err() }
但仍是会发生了编译错误:
error[E0277]: `*mut tensorflow_sys::TF_Status` cannot be sent between threads safely --> src/main.rs:75:5 | 75 | web::block(move || mtcnn.run(&img)).from_err() | ^^^^^^^^^^ `*mut tensorflow_sys::TF_Status` cannot be sent between threads safely | = help: within `tensorflow::Status`, the trait `std::marker::Send` is not implemented for `*mut tensorflow_sys::TF_Status` = note: required because it appears within the type `tensorflow::Status` = note: required by `actix_web::web::block`
tensorflow::Status,它是错误类型,不能在线程之间发送。
快捷方式是将error转换成String:
fn get_bboxes(img: DynamicImage, mtcnn: WebMtcnn) -> impl Future<Item = Vec<BBox>, Error = ActixError> { web::block(move || mtcnn.run(&img).map_err(|e| e.to_string())).from_err() }
由于String实现了Send,所以容许跨越线程间发送Result。
好的,咱们有2个函数,一个用于从请求中获取图像,另外一个用于获取边界框。咱们要返回回json HttpResponse:
fn return_bboxes( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { // Get the image from the input stream get_image(stream) // Get the bounding boxes from the image .and_then(move |img| get_bboxes(img, mtcnn)) // Map the bounding boxes to a json HttpResponse .map(|bboxes| HttpResponse::Ok().json(bboxes)) }
接着,在App里添接口定义:
HttpServer::new(move || { App::new() .data(mtcnn.clone()) .wrap(middleware::Logger::default()) // our new API service .service(web::resource("/api/v1/bboxes").to_async(return_bboxes)) }) .bind(&opt.listen)? .run()
run起来,用 curl 来提交一个请求:
$ curl --data-binary @rustfest.jpg http://localhost:8000/api/v1/bboxes [{"x1":471.4591,"y1":287.59888,"x2":495.3053,"y2":317.25327,"prob":0.9999908}....
使用 jmespath 来获取120张脸:
$ curl -s --data-binary @rustfest.jpg http://localhost:8000/api/v1/bboxes | jp "length(@)" 120
咱们想要的另外一个API调用是返回一个覆盖了边界框的图像。 这不是一个很大的延伸,但在图像上绘制框确定是一个阻塞动做,因此咱们将其发送到线程池中运行。
让咱们包装叠加函数,将其转换为future:
fn get_overlay(img: DynamicImage, bboxes: Vec<BBox>) -> impl Future<Item = Vec<u8>, Error = ActixError> { web::block(move || { let output_img = overlay(&img, &bboxes); ... }).from_err() }
咱们想要返回一个u8字节的Vec,这样咱们就能够在返回体中使用它。 因此咱们须要分配缓冲区并以JPEG格式写入:
let mut buffer = vec![]; output_img.write_to(&mut buffer, JPEG)?; // write out our buffer Ok(buffer)
将目前为止的函数尝试编译一次:
fn get_overlay(img: DynamicImage, bboxes: Vec<BBox>) -> impl Future<Item = Vec<u8>, Error = ActixError> { web::block(move || { let output_img = overlay(&img, &bboxes); let mut buffer = Vec::new(); output_img.write_to(&mut buffer, JPEG)?; // write out our buffer Ok(buffer) }).from_err() }
还差一点, 咱们缺乏一个类型注解:
error[E0282]: type annotations needed --> src/main.rs:82:5 | 82 | web::block(move || { | ^^^^^^^^^^ cannot infer type for `E`
为何这里是类型问题?关联到这一行:
Ok(buffer) // What's the `Error` type here?
目前,惟一的错误类型来自write_to方法,即ImageError。 可是这一行没有错误类型,多是任何东西。
我想到三种方法处理这个问题:
方法一:在web::block中声明错误
web::block::<_,_,ImageError>
这看上去有点凌乱,但能够编译经过。
方法二:使用 as 声明 Result 类型:
Ok(buffer) as Result<_, ImageError>
方法三:使用map在成功时返回一个buffer:
output_img.write_to(&mut buffer, JPEG).map(|_| buffer)
我认为为了可读性,#2多是最简单的。 web::block函数须要3个类型的参数,这些参数在第一次阅读代码时可能会引发混淆。 #3也不错,但我以为它看起来有点奇怪。
最终个人选择:
fn get_overlay(img: DynamicImage, bboxes: Vec<BBox>) -> impl Future<Item = Vec<u8>, Error = ActixError> { web::block(move || { let output_img = overlay(&img, &bboxes); let mut buffer = Vec::new(); output_img.write_to(&mut buffer, JPEG)?; // Type annotations required for the `web::block` Ok(buffer) as Result<_, ImageError> }).from_err() }
好的,咱们拥有了一些返回future的方法,future返回边界框和叠加图像。 让咱们将它们拼接在一块儿并返回一个HttpResponse:
fn return_overlay( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { //... magic happens here }
第一步是从字节流中获取图像:
get_image(stream)
而后咱们想要获取边界框:
get_image(stream).and_then(move |img| { get_bboxes(img, mtcnn) })
如今咱们想要得到叠加图像。 咱们有一个问题,如何使用image? get_bboxes返回future的图像,而后计算image上的人脸返回一个边界框数组。 这里有几个选择。 当咱们将image传递给get_bboxes时,咱们能够克隆image,但这会发生内存拷贝。 咱们能够等待 Pin 和 async/await 语法完成,而后可能更容易处理它。
或者咱们能够调整咱们的get_bboxes方法:
fn get_bboxes( img: DynamicImage, mtcnn: WebMtcnn, ) -> impl Future<Item = (DynamicImage, Vec<BBox>), Error = ActixError> { web::block(move || { mtcnn .run(&img) .map_err(|e| e.to_string()) //Return both the image and the bounding boxes .map(|bboxes| (img, bboxes)) }) .from_err() }
记录把 return_bboxes 方法也修改了:
fn return_bboxes( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { get_image(stream) .and_then(move |img| get_bboxes(img, mtcnn)) .map(|(_img, bboxes)| HttpResponse::Ok().json(bboxes)) }
若是rust能够将元组变成命令参数,那就太好了。 不幸的是不适合咱们,因此咱们须要建立一个闭包:
//Create our image overlay .and_then(|(img, bbox)| get_overlay(img, bbox)) .map(|buffer| { // Return a `HttpResponse` here })
咱们的 HttpResponse 须要将 buffer 包装到一个body:
HttpResponse::with_body(StatusCode::OK, buffer.into())
将 Content-Type设置为jpeg:
let mut response = HttpResponse::with_body(StatusCode::OK, buffer.into()); response .headers_mut() .insert(CONTENT_TYPE, HeaderValue::from_static("image/jpeg"));
获取叠加层的最终实现:
fn return_overlay( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { get_image(stream) .and_then(move |img| { get_bboxes(img, mtcnn) }) .and_then(|(img, bbox) | get_overlay(img, bbox)) .map(|buffer| { let mut response = HttpResponse::with_body(StatusCode::OK, buffer.into()); response .headers_mut() .insert(CONTENT_TYPE, HeaderValue::from_static("image/jpeg")); response }) }
在App注册此接口:
HttpServer::new(move || { App::new() .data(mtcnn.clone()) //Add in our data handler //Add in a logger to see the requets coming through .wrap(middleware::Logger::default()) //JSON bounding boxes .service(web::resource("/api/v1/bboxes").to_async(return_bboxes)) //Image overlay .service(web::resource("/api/v1/overlay").to_async(return_overlay)) }
run一下:
$ curl --data-binary @rustfest.jpg http://localhost:8000/api/v1/bboxes > output.jpg
结果:
咱们逐步将CLI应用程序转换为HTTP服务,并尝试了异步编程。如您所见,actix web是一个很是通用的Web框架。 我对它的兴趣来自于拥有构建Web应用程序所需的全部功能:多组件,线程池,高效率。虽然actix写异步还不是很优雅,但将来可期,由于我认为不少开发人员都在努力解决这个问题。
若是您正在寻找更多的actix示例,这个示例仓库是您最好的选择:https://github.com/actix/exam...
我期待看到社区将来的建设!