gRPC实战--如何在NodeJS中有效使用gRPC流

gRPC实战包含一系列文章,包括原创和翻译。最终会造成一个完整的系列,后续会不断完善,增长新的内容:node

=============================================================segmentfault

g10.png

本文将说明如何在NodeJS应用程序的GRPC中使用流。服务器

什么是gRPC中的流

gRPC中的流可帮助咱们在单个RPC调用中发送消息流。并发

g11.png

gRPC 的流式,分为三种类型:负载均衡

  • server-side streaming RPC:服务器端流式 RPC
  • Client-side streaming RPC:客户端流式 RPC
  • Bidirectional streaming RPC:双向流式 RPC

gRPC中的流使用场景

  • 大规模数据包
  • 实时场景

在本文中,咱们将重点关注如下流:ide

  • Server Streaming GRPC:在这种状况下,客户端向服务器发出单个请求,服务器将消息流发送回客户端。
  • Client Streaming GRPC:在这种状况下,客户端将消息流发送到服务器。而后,服务器处理流并将单个响应发送回客户端。

Server Streaming gRPC

如今让咱们为服务器流gRPC建立服务器和客户端代码。函数

建立 .proto 文件

建立一个名为proto的文件夹。在该文件夹中建立一个名为employee.proto的文件。将如下内容复制到employee.proto中:post

syntax = "proto3";

package employee;

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}
}


message EmployeeRequest {
  repeated int32 employeeIdList = 1;
}

message EmployeeResponse{
  string message = 1;
}

请参阅个人grpc基础文章,以了解有关.proto文件和协议缓冲区的更多信息。ui

在这里,咱们建立一个名为paySalary的rpc,它接受EmployeeRequest做为请求并发送EmployeeResponse流做为响应。咱们使用关键字流来指示服务器将发送消息流。spa

上面也定义了EmployeeRequestEmployeeResponse。 repeate关键字表示将发送数据列表。

在此示例中,paySalary的请求将是员工ID的列表。服务器将经过一条消息流作出响应,告知是否已向员工支付薪水。

为服务器建立虚拟数据

建立一个名为data.js的文件,并将如下代码复制到其中。

//Hardcode some data for employees
let employees = [{
    id: 1,
    email: "abcd@abcd.com",
    firstName: "First1",
    lastName: "Last1"   
},
{
    id: 2,
    email: "xyz@xyz.com",
    firstName: "First2",
    lastName: "Last2"   
},
{
    id: 3,
    email: "temp@temp.com",
    firstName: "First3",
    lastName: "Last3"   
},
];

exports.employees = employees;

咱们将其用做服务器的数据源。

建立Server

建立一个名为server.js的文件。将如下代码复制到server.js中

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition)

接下来,将如下代码片断添加到server.js中

let { paySalary } = require('./pay_salary.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

在上面的脚本中,咱们将启动GRPC Server并将Employee Service和paySalary实现一块儿添加到其中。

可是paySalary函数在pay_salary.js文件中定义。

让咱们建立一个pay_salary.js文件。

将如下脚本添加到pay_salary.js文件中

let { employees } = require('./data.js');
const _ = require('lodash');

function paySalary(call) {
    let employeeIdList = call.request.employeeIdList;
  
    _.each(employeeIdList, function (employeeId) {
      let employee = _.find(employees, { id: employeeId });
      if (employee != null) {
        let responseMessage = "Salary paid for ".concat(
          employee.firstName,
          ", ",
          employee.lastName);
        call.write({ message: responseMessage });
      }
      else{
        call.write({message: "Employee with Id " + employeeId + " not found in record"});
      }
  
    });
    call.end();
  
}
exports.paySalary = paySalary;

paySalary函数将调用做为输入。 call.request将包含客户端发送的请求。

call.request.employeeIdList将包含客户端发送的员工ID的列表。

而后,咱们遍历EmployeeId,并为每一个员工ID进行一些处理。

对于每一个员工ID,咱们最后都调用call.write函数。 call.write将在流中将单个消息写回到客户端。

在这种状况下,对于每位员工,call.write都会发回工资是否已经支付。

处理完全部员工编号后,咱们将调用call.end函数。 call.end指示流已完成。

这是最终的server.js文件

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition)

let { paySalary } = require('./pay_salary.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

建立Client

建立一个名为client_grpc_server_stream.js的文件。将如下代码复制到文件中。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');

let packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

接下来,将如下脚本片断添加到客户端。

function main() {
  let client = new employee_proto.Employee('localhost:4500',
                                       grpc.credentials.createInsecure());
                                       
  let employeeIdList = [1,10,2];
  let call = client.paySalary({employeeIdList: employeeIdList});

  call.on('data',function(response){
    console.log(response.message);
  });

  call.on('end',function(){
    console.log('All Salaries have been paid');
  });

}

main();

client变量将具备存根,这将有助于咱们在服务器中调用该函数。

employeeIdList是提供给服务器的输入。

let call = client.paySalary({employeeIdList: employeeIdList}); 脚本调用服务器中的paySalary函数,并将employeeIdList做为请求传递。因为服务器将要发送消息流,所以调用对象将帮助咱们侦听流事件。

咱们会侦听呼叫对象中的“数据”事件,以查看流中来自服务器的任何消息。以下面的脚本所示。

call.on('data',function(response){
    console.log(response.message);
  });

在这里,只要咱们从服务器收到任何消息,咱们就只打印响应消息。

咱们在调用对象中侦听“结束”事件,以了解服务器流什么时候结束。以下面的脚本所示。

call.on('end',function(){
    console.log('All Salaries have been paid');
  });

在此流结束时,咱们正在打印“已支付全部薪水”。

这是client_gprc_server_stream.js的完整代码。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');

let packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

function main() {
  let client = new employee_proto.Employee('localhost:4500',
                                       grpc.credentials.createInsecure());
                                       
  let employeeIdList = [1,10,2];
  let call = client.paySalary({employeeIdList: employeeIdList});

  call.on('data',function(response){
    console.log(response.message);
  });

  call.on('end',function(){
    console.log('All Salaries have been paid');
  });

}

main();

运行代码

打开命令提示符,而后使用如下脚本启动服务器。

node server.js

打开一个新的命令提示符,并使用如下脚本运行客户端。

node client_grpc_server_stream.js

在运行客户端时,咱们将得到如下输出。

Salary paid for First1, Last1
Employee with Id 10 not found in record
Salary paid for First2, Last2
All Salaries have been paid

在这种状况下,客户端已向服务器发送了3个Id的1,10,2。服务器一一处理ID,而后将消息流发送给客户端。流中的全部消息完成后,将显示消息“已支付全部薪水”。

Client Streaming GRPC

如今,让咱们为客户端流GRPC建立服务器和客户端代码。

建立.proto文件

在先前建立的employee.proto文件中,添加如下内容

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}

  rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}

message ReportEmployeeRequest {
  int32 id = 1;
}

message ReportEmployeeResponse{
  string successfulReports = 1;
  string failedReports = 2;
}

在这里,咱们添加了一个名为generateReport的新rpc,它接受ReportEmployeeRequest流做为请求并返回ReportEmployeeResponse做为响应。

所以,向rpc输入的内容是员工ID的流,服务器的响应将是单个响应,其中指出生成了多少报告以及有多少报告失败。

这是咱们更改后的完整的employee.proto文件:

syntax = "proto3";

package employee;

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}

  rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}


message EmployeeRequest {
  repeated int32 employeeIdList = 1;
}

message EmployeeResponse{
  string message = 1;
}

message ReportEmployeeRequest {
  int32 id = 1;
}

message ReportEmployeeResponse{
  string successfulReports = 1;
  string failedReports = 2;
}

建立Server

这是添加了新rpc的完整server.js代码:

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;


let { paySalary } = require('./pay_salary.js');
let { generateReport } = require('./generate_report.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary ,
      generateReport: generateReport }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

在上面的脚本中,咱们能够看到咱们还向grpc服务器添加了generateReport函数。咱们还能够看到generateReport函数来自generate_report.js文件。

建立一个名为generate_report.js的文件。

将如下脚本添加到文件中:

let { employees } = require('./data.js');
const _ = require('lodash');

function generateReport(call, callback){

    let successfulReports = [];
    let failedReports = [];
    call.on('data',function(employeeStream){
        let employeeId = employeeStream.id;
        let employee = _.find(employees, { id: employeeId });
        if (employee != null) {
          successfulReports.push(employee.firstName);
        }
      else{
          failedReports.push(employeeId);
      }

    });
    call.on('end',function(){
        callback(null,{
            successfulReports: successfulReports.join(),
            failedReports: failedReports.join()
        })
    })
}

exports.generateReport = generateReport;

generateReport函数接受两个输入,即调用和回调

为了从客户端获取消息流,咱们须要在调用对象中监听数据事件。这是在如下脚本中完成的。

call.on('data',function(employeeStream){
        let employeeId = employeeStream.id;
        let employee = _.find(employees, { id: employeeId });
        if (employee != null) {
          successfulReports.push(employee.firstName);
        }
      else{
          failedReports.push(employeeId);
      }

    });

来自客户端的每条消息都会调用data事件。该消息存在于employeeStream变量中。收到消息后,咱们尝试生成报告,并肯定报告是成功仍是失败。

调用对象上的结束事件表示客户端流已结束。如下代码显示了如何监听结束事件。

call.on('end',function(){
        callback(null,{
            successfulReports: successfulReports.join(),
            failedReports: failedReports.join()
        })
    })

在这种状况下,当结束事件发生时,咱们将全部成功和失败报告组合到一个响应对象中,并使用回调对象将其发送回客户端。

建立Client

建立一个名为client_grpc_client_stream.js的文件。将如下脚本添加到其中。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');

let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

上面的脚本具备与服务器代码相同的功能。

将如下脚本也添加到client_grpc_client_stream.js

function main() {
  let client = new employee_proto.Employee('localhost:4500',
    grpc.credentials.createInsecure());

  let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

  let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

  call.end();
}

main();

让咱们看看上面的脚本在作什么。

let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

在脚本的这一部分中,咱们正在建立一个调用对象并调用generateReport函数。一样在generateReport函数内部,咱们指示客户端一旦收到服务器的响应,应该怎么作。在这种状况下,咱们将打印服务器发送回的成功和失败报告。

let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

在脚本的以上部分中,咱们遍历了员工ID,并将消息流发送到服务器。咱们使用call.write将消息以流的形式发送到服务器。

最后,一旦咱们在流中发送了全部消息,就可使用call.end函数指示流已完成,以下所示:

call.end();

下面给出了client_grpc_client_stream的完整代码。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');

let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

function main() {
  let client = new employee_proto.Employee('localhost:4500',
    grpc.credentials.createInsecure());

  let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

  let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

  call.end();
}

main();

运行代码

打开命令提示符,而后使用如下脚本启动服务器。

node server.js

打开一个新的命令提示符,并使用如下脚本运行客户端。

node client_grpc_server_stream.js

在运行客户端时,咱们将得到如下输出。

Reports successfully generated for:  First1,First2
Reports failed since Following Employee Id\'s do not exist:  10

在这种状况下,客户端已向服务器发送了3个Id的1,10,2做为消息流。而后,服务器处理流中的消息,并将单个响应发送回客户端,以显示成功的报告数量和失败的报告数量。

相关文章
相关标签/搜索