最新消息: USBMI致力于为网友们分享Windows、安卓、IOS等主流手机系统相关的资讯以及评测、同时提供相关教程、应用、软件下载等服务。

Rust

IT圈 admin 33浏览 0评论

Rust

Rust__使用tonic实现一元及流式(Unary and Streaming)RPC的实例

  本文展示了如何通过tonic(gRPC的rust实现)实现一元RPC和流式RPC。实例为通过Unary RPC实现对服务器中哈希表的单个读写,以及通过Streaming RPC进行批量读写。流式RPC可以使得客户端一边发送,服务器一边处理,不需要等到客户端全部发送完再处理,适用于数据量较大的批量处理情况。
完整代码Github

1.定义协议

//myproto.proto
syntax = "proto3";
package myproto;message KvPair {Key key = 1;Value val = 2;
}message Key {string key = 1;
}message Value {string val = 1;
}message ReplyState {string reply_info = 1;KvPair kvpair = 2;
}message RequestState {string request_info = 1;
}service MyRpc {//simple rpcrpc SetKv(KvPair) returns (ReplyState) {}rpc GetKv(Key) returns (ReplyState) {}// A server-to-client streaming RPC.rpc GetKvList(RequestState) returns (stream KvPair) {}// A client-to-server streaming RPC.rpc SetKvList(stream KvPair) returns (ReplyState) {}
}

2.将Proto转换为Rust数据结构

通过tonic自带的tonic_build工具(基于prost)将proto数据及方法转换为Rust数据结构及方法,build.rs在构建源文件前进行预构建,生成Rust代码。

fn main() {tonic_build::configure().out_dir("src/pb").compile(&["proto/my_proto.proto"], &["."]).expect("failed to compile protos");
}

构建后会在src/pb目录下生成myproto.rs文件,文件部分内容如下所示:

(1)该文件中包含在rust下实现的proto中的数据结构:

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KvPair {#[prost(message, optional, tag="1")]pub key: ::core::option::Option<Key>,#[prost(message, optional, tag="2")]pub val: ::core::option::Option<Value>,
}

并创建了2个mod

pub mod my_rpc_client
pub mod my_rpc_server

(2)在 mod my_rpc_client下创建了结构体:

pub struct MyRpcClient<T> {inner: tonic::client::Grpc<T>,
}

并为该结构体泛型为tonic::transport::Channel时实现了特化方法:

impl MyRpcClient<tonic::transport::Channel> {/// Attempt to create a new client by connecting to a given endpoint.pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>whereD: std::convert::TryInto<tonic::transport::Endpoint>,D::Error: Into<StdError>,{let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;Ok(Self::new(conn))}}

以及一些常规方法如:
pub fn new(inner: T) -> Self
pub fn send_gzip(mut self) -> Self
pub fn accept_gzip(mut self) -> Self
......

还有我们自定义的方法:

pub async fn set_kv(&mut self,request: impl tonic::IntoRequest<super::KvPair>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{...}
pub async fn get_kv(&mut self,request: impl tonic::IntoRequest<super::Key>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{...}
pub async fn get_kv_list(&mut self,request: impl tonic::IntoRequest<super::RequestState>,) -> Result<tonic::Response<tonic::codec::Streaming<super::KvPair>>,tonic::Status,> {...}
pub async fn set_kv_list(&mut self,request: impl tonic::IntoStreamingRequest<Message = super::KvPair>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status> {...}

(3)在 mod my_rpc_server下创建了trait及结构体:

#[async_trait]
pub trait MyRpc: Send + Sync + 'static {///simple rpcasync fn set_kv(&self,request: tonic::Request<super::KvPair>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;async fn get_kv(&self,request: tonic::Request<super::Key>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;///Server streaming response type for the GetKvList method.type GetKvListStream: futures_core::Stream<Item = Result<super::KvPair, tonic::Status>,>+ Send+ 'static;/// A server-to-client streaming RPC.async fn get_kv_list(&self,request: tonic::Request<super::RequestState>,) -> Result<tonic::Response<Self::GetKvListStream>, tonic::Status>;/// A client-to-server streaming RPC.async fn set_kv_list(&self,request: tonic::Request<tonic::Streaming<super::KvPair>>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;}
	#[derive(Debug)]pub struct MyRpcServer<T: MyRpc> {inner: _Inner<T>,accept_compression_encodings: (),send_compression_encodings: (),}struct _Inner<T>(Arc<T>);

为该结构实现:

	impl<T, B> tonic::codegen::Service<http::Request<B>> for MyRpcServer<T>whereT: MyRpc,B: Body + Send + 'static,B::Error: Into<StdError> + Send + 'static,impl<T: MyRpc> Clone for MyRpcServer<T>impl<T: MyRpc> Clone for _Inner<T>impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T>impl<T: MyRpc> tonic::transport::NamedService for MyRpcServer<T>

3.服务器代码

(1)首先引入myproto.rs文件

pub mod myproto {include!("pb/myproto.rs");
}

(2)然后建立RPC服务结构体并实现MyRpc trait
注意:在异步协程环境下要使用futures_util::lock::Mutex;而不是普通的Mutex,防止死锁。

#[derive(Debug)]
pub struct MyRpcService {table: Arc<Mutex<HashMap<Key, Value>>>,
}#[tonic::async_trait]
impl MyRpc for MyRpcService {async fn set_kv(&self,_request: Request<KvPair>,) -> Result<Response<ReplyState>, Status> {println!("set_kv = {:?}", _request);let kvpair = _request.into_inner();let k = kvpair.key.expect("key should not none.");let v = kvpair.val.expect("value should not none.");let tb = self.table.clone();if let Some(val) = tb.lock().await.insert(k.clone(), v) {return Ok(Response::new(ReplyState {reply_info: "update value, return old.".into(),kvpair: Some(KvPair {key: Some(k),val: Some(val.clone()),}),}));}Ok(Response::new(ReplyState{reply_info: "set new kvpair.".into(),kvpair: Some(KvPair {key: Some(k),val: None,}),}))}async fn get_kv(&self,_request: Request<Key>,) -> Result<Response<ReplyState>, Status> {println!("get_kv = {:?}", _request);let k = _request.into_inner();let tb = self.table.clone();if let Some(val) = tb.lock().await.get(&k) {return Ok(Response::new(ReplyState {reply_info: "get success.".into(),kvpair: Some(KvPair {key: Some(k),val: Some(val.clone()),}),}))}Ok(Response::new(ReplyState {reply_info: "get failed.".into(),kvpair: Some(KvPair {key: Some(k),val: None,}),}))}   ///Server streaming response type for the GetKvList method.type GetKvListStream = ReceiverStream<Result<KvPair, Status>>;/// A server-to-client streaming RPC.async fn get_kv_list(&self,_request: Request<RequestState>,) -> Result<Response<Self::GetKvListStream>, Status> {println!("get_kv_list = {:?}", _request);let (tx, rx) = mpsc::channel(10);let tb = self.table.clone();tokio::spawn(async move {for (k, v) in tb.lock().await.iter() {println!("  => send k = {:?}, v = {:?}", k, v);tx.send(Ok(KvPair {key: Some(k.clone()),val: Some(v.clone()),})).await.unwrap();}println!(" /// done sending");});Ok(Response::new(ReceiverStream::new(rx)))}/// A client-to-server streaming RPC.async fn set_kv_list(&self,_request: Request<tonic::Streaming<KvPair>>,) -> Result<Response<ReplyState>, Status> {println!("set_kv_list = {:?}", _request);let tb = self.table.clone();let mut stream = _request.into_inner();while let Some(kvpair) = stream.next().await {let kvpair = kvpair?;//stream.next().await -> Option<Result<...>>let k = kvpair.key;let v = kvpair.val;tb.lock().await.insert(k.unwrap(), v.unwrap());}Ok(Response::new(ReplyState {reply_info: "set all kvpair done.".into(),kvpair: None,}))}
}

(3)编写主函数
  在主函数中启动RPC服务

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>>{let addr = "[::1]:10000".parse().unwrap();let my_rpc = MyRpcService {table: Arc::new(Mutex::new(HashMap::new())),};let svc = MyRpcServer::new(my_rpc);Server::builder().add_service(svc).serve(addr).await?;Ok(())
}

4.客户端代码

(1)客户端代码引入myproto.rs文件

pub mod myproto {include!("pb/myproto.rs");
}

(2)实现批量写入和读取函数

async fn print_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> {let rqs = RequestState {request_info: "get all.".into(),};let mut stream = client.get_kv_list(Request::new(rqs)).await?.into_inner();while let Some(kvpair) = stream.message().await? {println!("KvPair = {:?}", kvpair);}Ok(())
}async fn run_set_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> {let mut pairs = vec![];for i in 0..3 {pairs.push(KvPair {key: Some(Key{key: i.to_string()}),val: Some(Value{val: i.to_string()})})}println!("pairs num = {:?}", pairs.len());let request = Request::new(stream::iter(pairs));match client.set_kv_list(request).await {Ok(response) => println!("ReplyState = {:?}", response.into_inner()),Err(e) => println!("something went wrong: {:?}", e),}Ok(())
}

(3)客户端主函数

  在主函数中调用RPC服务

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {let mut client = MyRpcClient::connect("http://[::1]:10000").await?;println!("*** SIMPLE RPC ***");let response0 = client.set_kv(Request::new(KvPair {key: Some(Key{key: "a".into()}),val: Some(Value{val: "1". into()}),})).await?;println!("RESPONSE0 = {:?}", response0);let response1 = client.get_kv(Request::new(Key{key: "a".into()})).await?;println!("RESPONSE1 = {:?}", response1);println!("\n*** CLIENT STREAMING ***");run_set_kv_list(&mut client).await?;println!("\n*** SERVER STREAMING ***");print_kv_list(&mut client).await?;Ok(())
}

5.简单测试

(1)服务器打印情况

(2)客户端打印情况

通过标准输出的内容可以看到:
(1)客户端通过set_kv("a", 1)发送请求到服务器,服务器向客户端返回了响应RESPONSE0,表示已经成功写入。
(2)客户端调用get_kv("a")向服务器发送请求,此时客户端收到了RESPONSE1,得到了结果("a", 1)
(3)客户端调用set_kv_list(),收到了批量写入成功的响应。
(4)客户端调用get_kv_list(),收到了服务器存储的所有KvPair

Rust

Rust__使用tonic实现一元及流式(Unary and Streaming)RPC的实例

  本文展示了如何通过tonic(gRPC的rust实现)实现一元RPC和流式RPC。实例为通过Unary RPC实现对服务器中哈希表的单个读写,以及通过Streaming RPC进行批量读写。流式RPC可以使得客户端一边发送,服务器一边处理,不需要等到客户端全部发送完再处理,适用于数据量较大的批量处理情况。
完整代码Github

1.定义协议

//myproto.proto
syntax = "proto3";
package myproto;message KvPair {Key key = 1;Value val = 2;
}message Key {string key = 1;
}message Value {string val = 1;
}message ReplyState {string reply_info = 1;KvPair kvpair = 2;
}message RequestState {string request_info = 1;
}service MyRpc {//simple rpcrpc SetKv(KvPair) returns (ReplyState) {}rpc GetKv(Key) returns (ReplyState) {}// A server-to-client streaming RPC.rpc GetKvList(RequestState) returns (stream KvPair) {}// A client-to-server streaming RPC.rpc SetKvList(stream KvPair) returns (ReplyState) {}
}

2.将Proto转换为Rust数据结构

通过tonic自带的tonic_build工具(基于prost)将proto数据及方法转换为Rust数据结构及方法,build.rs在构建源文件前进行预构建,生成Rust代码。

fn main() {tonic_build::configure().out_dir("src/pb").compile(&["proto/my_proto.proto"], &["."]).expect("failed to compile protos");
}

构建后会在src/pb目录下生成myproto.rs文件,文件部分内容如下所示:

(1)该文件中包含在rust下实现的proto中的数据结构:

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KvPair {#[prost(message, optional, tag="1")]pub key: ::core::option::Option<Key>,#[prost(message, optional, tag="2")]pub val: ::core::option::Option<Value>,
}

并创建了2个mod

pub mod my_rpc_client
pub mod my_rpc_server

(2)在 mod my_rpc_client下创建了结构体:

pub struct MyRpcClient<T> {inner: tonic::client::Grpc<T>,
}

并为该结构体泛型为tonic::transport::Channel时实现了特化方法:

impl MyRpcClient<tonic::transport::Channel> {/// Attempt to create a new client by connecting to a given endpoint.pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>whereD: std::convert::TryInto<tonic::transport::Endpoint>,D::Error: Into<StdError>,{let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;Ok(Self::new(conn))}}

以及一些常规方法如:
pub fn new(inner: T) -> Self
pub fn send_gzip(mut self) -> Self
pub fn accept_gzip(mut self) -> Self
......

还有我们自定义的方法:

pub async fn set_kv(&mut self,request: impl tonic::IntoRequest<super::KvPair>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{...}
pub async fn get_kv(&mut self,request: impl tonic::IntoRequest<super::Key>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>{...}
pub async fn get_kv_list(&mut self,request: impl tonic::IntoRequest<super::RequestState>,) -> Result<tonic::Response<tonic::codec::Streaming<super::KvPair>>,tonic::Status,> {...}
pub async fn set_kv_list(&mut self,request: impl tonic::IntoStreamingRequest<Message = super::KvPair>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status> {...}

(3)在 mod my_rpc_server下创建了trait及结构体:

#[async_trait]
pub trait MyRpc: Send + Sync + 'static {///simple rpcasync fn set_kv(&self,request: tonic::Request<super::KvPair>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;async fn get_kv(&self,request: tonic::Request<super::Key>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;///Server streaming response type for the GetKvList method.type GetKvListStream: futures_core::Stream<Item = Result<super::KvPair, tonic::Status>,>+ Send+ 'static;/// A server-to-client streaming RPC.async fn get_kv_list(&self,request: tonic::Request<super::RequestState>,) -> Result<tonic::Response<Self::GetKvListStream>, tonic::Status>;/// A client-to-server streaming RPC.async fn set_kv_list(&self,request: tonic::Request<tonic::Streaming<super::KvPair>>,) -> Result<tonic::Response<super::ReplyState>, tonic::Status>;}
	#[derive(Debug)]pub struct MyRpcServer<T: MyRpc> {inner: _Inner<T>,accept_compression_encodings: (),send_compression_encodings: (),}struct _Inner<T>(Arc<T>);

为该结构实现:

	impl<T, B> tonic::codegen::Service<http::Request<B>> for MyRpcServer<T>whereT: MyRpc,B: Body + Send + 'static,B::Error: Into<StdError> + Send + 'static,impl<T: MyRpc> Clone for MyRpcServer<T>impl<T: MyRpc> Clone for _Inner<T>impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T>impl<T: MyRpc> tonic::transport::NamedService for MyRpcServer<T>

3.服务器代码

(1)首先引入myproto.rs文件

pub mod myproto {include!("pb/myproto.rs");
}

(2)然后建立RPC服务结构体并实现MyRpc trait
注意:在异步协程环境下要使用futures_util::lock::Mutex;而不是普通的Mutex,防止死锁。

#[derive(Debug)]
pub struct MyRpcService {table: Arc<Mutex<HashMap<Key, Value>>>,
}#[tonic::async_trait]
impl MyRpc for MyRpcService {async fn set_kv(&self,_request: Request<KvPair>,) -> Result<Response<ReplyState>, Status> {println!("set_kv = {:?}", _request);let kvpair = _request.into_inner();let k = kvpair.key.expect("key should not none.");let v = kvpair.val.expect("value should not none.");let tb = self.table.clone();if let Some(val) = tb.lock().await.insert(k.clone(), v) {return Ok(Response::new(ReplyState {reply_info: "update value, return old.".into(),kvpair: Some(KvPair {key: Some(k),val: Some(val.clone()),}),}));}Ok(Response::new(ReplyState{reply_info: "set new kvpair.".into(),kvpair: Some(KvPair {key: Some(k),val: None,}),}))}async fn get_kv(&self,_request: Request<Key>,) -> Result<Response<ReplyState>, Status> {println!("get_kv = {:?}", _request);let k = _request.into_inner();let tb = self.table.clone();if let Some(val) = tb.lock().await.get(&k) {return Ok(Response::new(ReplyState {reply_info: "get success.".into(),kvpair: Some(KvPair {key: Some(k),val: Some(val.clone()),}),}))}Ok(Response::new(ReplyState {reply_info: "get failed.".into(),kvpair: Some(KvPair {key: Some(k),val: None,}),}))}   ///Server streaming response type for the GetKvList method.type GetKvListStream = ReceiverStream<Result<KvPair, Status>>;/// A server-to-client streaming RPC.async fn get_kv_list(&self,_request: Request<RequestState>,) -> Result<Response<Self::GetKvListStream>, Status> {println!("get_kv_list = {:?}", _request);let (tx, rx) = mpsc::channel(10);let tb = self.table.clone();tokio::spawn(async move {for (k, v) in tb.lock().await.iter() {println!("  => send k = {:?}, v = {:?}", k, v);tx.send(Ok(KvPair {key: Some(k.clone()),val: Some(v.clone()),})).await.unwrap();}println!(" /// done sending");});Ok(Response::new(ReceiverStream::new(rx)))}/// A client-to-server streaming RPC.async fn set_kv_list(&self,_request: Request<tonic::Streaming<KvPair>>,) -> Result<Response<ReplyState>, Status> {println!("set_kv_list = {:?}", _request);let tb = self.table.clone();let mut stream = _request.into_inner();while let Some(kvpair) = stream.next().await {let kvpair = kvpair?;//stream.next().await -> Option<Result<...>>let k = kvpair.key;let v = kvpair.val;tb.lock().await.insert(k.unwrap(), v.unwrap());}Ok(Response::new(ReplyState {reply_info: "set all kvpair done.".into(),kvpair: None,}))}
}

(3)编写主函数
  在主函数中启动RPC服务

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>>{let addr = "[::1]:10000".parse().unwrap();let my_rpc = MyRpcService {table: Arc::new(Mutex::new(HashMap::new())),};let svc = MyRpcServer::new(my_rpc);Server::builder().add_service(svc).serve(addr).await?;Ok(())
}

4.客户端代码

(1)客户端代码引入myproto.rs文件

pub mod myproto {include!("pb/myproto.rs");
}

(2)实现批量写入和读取函数

async fn print_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> {let rqs = RequestState {request_info: "get all.".into(),};let mut stream = client.get_kv_list(Request::new(rqs)).await?.into_inner();while let Some(kvpair) = stream.message().await? {println!("KvPair = {:?}", kvpair);}Ok(())
}async fn run_set_kv_list(client: &mut MyRpcClient<Channel>) -> Result<(), Box<dyn Error>> {let mut pairs = vec![];for i in 0..3 {pairs.push(KvPair {key: Some(Key{key: i.to_string()}),val: Some(Value{val: i.to_string()})})}println!("pairs num = {:?}", pairs.len());let request = Request::new(stream::iter(pairs));match client.set_kv_list(request).await {Ok(response) => println!("ReplyState = {:?}", response.into_inner()),Err(e) => println!("something went wrong: {:?}", e),}Ok(())
}

(3)客户端主函数

  在主函数中调用RPC服务

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {let mut client = MyRpcClient::connect("http://[::1]:10000").await?;println!("*** SIMPLE RPC ***");let response0 = client.set_kv(Request::new(KvPair {key: Some(Key{key: "a".into()}),val: Some(Value{val: "1". into()}),})).await?;println!("RESPONSE0 = {:?}", response0);let response1 = client.get_kv(Request::new(Key{key: "a".into()})).await?;println!("RESPONSE1 = {:?}", response1);println!("\n*** CLIENT STREAMING ***");run_set_kv_list(&mut client).await?;println!("\n*** SERVER STREAMING ***");print_kv_list(&mut client).await?;Ok(())
}

5.简单测试

(1)服务器打印情况

(2)客户端打印情况

通过标准输出的内容可以看到:
(1)客户端通过set_kv("a", 1)发送请求到服务器,服务器向客户端返回了响应RESPONSE0,表示已经成功写入。
(2)客户端调用get_kv("a")向服务器发送请求,此时客户端收到了RESPONSE1,得到了结果("a", 1)
(3)客户端调用set_kv_list(),收到了批量写入成功的响应。
(4)客户端调用get_kv_list(),收到了服务器存储的所有KvPair

发布评论

评论列表 (0)

  1. 暂无评论