gRPC实战包含一系列文章,包括原创和翻译。最终会造成一个完整的系列,后续会不断完善,增长新的内容:node
=============================================================segmentfault
本文将说明如何在NodeJS应用程序的GRPC中使用流。服务器
gRPC中的流可帮助咱们在单个RPC调用中发送消息流。并发
gRPC 的流式,分为三种类型:负载均衡
在本文中,咱们将重点关注如下流:ide
如今让咱们为服务器流gRPC建立服务器和客户端代码。函数
建立一个名为proto的文件夹。在该文件夹中建立一个名为employee.proto的文件。将如下内容复制到employee.proto中:post
syntax = "proto3"; package employee; service Employee { rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {} } message EmployeeRequest { repeated int32 employeeIdList = 1; } message EmployeeResponse{ string message = 1; }
请参阅个人grpc基础文章,以了解有关.proto文件和协议缓冲区的更多信息。ui
在这里,咱们建立一个名为paySalary的rpc,它接受EmployeeRequest做为请求并发送EmployeeResponse流做为响应。咱们使用关键字流来指示服务器将发送消息流。spa
上面也定义了EmployeeRequest和EmployeeResponse。 repeate关键字表示将发送数据列表。
在此示例中,paySalary的请求将是员工ID的列表。服务器将经过一条消息流作出响应,告知是否已向员工支付薪水。
建立一个名为data.js的文件,并将如下代码复制到其中。
//Hardcode some data for employees let employees = [{ id: 1, email: "abcd@abcd.com", firstName: "First1", lastName: "Last1" }, { id: 2, email: "xyz@xyz.com", firstName: "First2", lastName: "Last2" }, { id: 3, email: "temp@temp.com", firstName: "First3", lastName: "Last3" }, ]; exports.employees = employees;
咱们将其用做服务器的数据源。
建立一个名为server.js的文件。将如下代码复制到server.js中
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition)
接下来,将如下代码片断添加到server.js中
let { paySalary } = require('./pay_salary.js'); function main() { let server = new grpc.Server(); server.addService(employee_proto.Employee.service, { paySalary: paySalary } ); server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure()); server.start(); } main();
在上面的脚本中,咱们将启动GRPC Server并将Employee Service和paySalary实现一块儿添加到其中。
可是paySalary函数在pay_salary.js文件中定义。
让咱们建立一个pay_salary.js文件。
将如下脚本添加到pay_salary.js文件中
let { employees } = require('./data.js'); const _ = require('lodash'); function paySalary(call) { let employeeIdList = call.request.employeeIdList; _.each(employeeIdList, function (employeeId) { let employee = _.find(employees, { id: employeeId }); if (employee != null) { let responseMessage = "Salary paid for ".concat( employee.firstName, ", ", employee.lastName); call.write({ message: responseMessage }); } else{ call.write({message: "Employee with Id " + employeeId + " not found in record"}); } }); call.end(); } exports.paySalary = paySalary;
paySalary函数将调用做为输入。 call.request将包含客户端发送的请求。
call.request.employeeIdList将包含客户端发送的员工ID的列表。
而后,咱们遍历EmployeeId,并为每一个员工ID进行一些处理。
对于每一个员工ID,咱们最后都调用call.write函数。 call.write将在流中将单个消息写回到客户端。
在这种状况下,对于每位员工,call.write都会发回工资是否已经支付。
处理完全部员工编号后,咱们将调用call.end函数。 call.end指示流已完成。
这是最终的server.js文件
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition) let { paySalary } = require('./pay_salary.js'); function main() { let server = new grpc.Server(); server.addService(employee_proto.Employee.service, { paySalary: paySalary } ); server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure()); server.start(); } main();
建立一个名为client_grpc_server_stream.js的文件。将如下代码复制到文件中。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
接下来,将如下脚本片断添加到客户端。
function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let employeeIdList = [1,10,2]; let call = client.paySalary({employeeIdList: employeeIdList}); call.on('data',function(response){ console.log(response.message); }); call.on('end',function(){ console.log('All Salaries have been paid'); }); } main();
client变量将具备存根,这将有助于咱们在服务器中调用该函数。
employeeIdList是提供给服务器的输入。
let call = client.paySalary({employeeIdList: employeeIdList});
脚本调用服务器中的paySalary函数,并将employeeIdList做为请求传递。因为服务器将要发送消息流,所以调用对象将帮助咱们侦听流事件。
咱们会侦听呼叫对象中的“数据”事件,以查看流中来自服务器的任何消息。以下面的脚本所示。
call.on('data',function(response){ console.log(response.message); });
在这里,只要咱们从服务器收到任何消息,咱们就只打印响应消息。
咱们在调用对象中侦听“结束”事件,以了解服务器流什么时候结束。以下面的脚本所示。
call.on('end',function(){ console.log('All Salaries have been paid'); });
在此流结束时,咱们正在打印“已支付全部薪水”。
这是client_gprc_server_stream.js的完整代码。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee; function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let employeeIdList = [1,10,2]; let call = client.paySalary({employeeIdList: employeeIdList}); call.on('data',function(response){ console.log(response.message); }); call.on('end',function(){ console.log('All Salaries have been paid'); }); } main();
打开命令提示符,而后使用如下脚本启动服务器。
node server.js
打开一个新的命令提示符,并使用如下脚本运行客户端。
node client_grpc_server_stream.js
在运行客户端时,咱们将得到如下输出。
Salary paid for First1, Last1 Employee with Id 10 not found in record Salary paid for First2, Last2 All Salaries have been paid
在这种状况下,客户端已向服务器发送了3个Id的1,10,2。服务器一一处理ID,而后将消息流发送给客户端。流中的全部消息完成后,将显示消息“已支付全部薪水”。
如今,让咱们为客户端流GRPC建立服务器和客户端代码。
在先前建立的employee.proto文件中,添加如下内容
service Employee { rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {} rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {} } message ReportEmployeeRequest { int32 id = 1; } message ReportEmployeeResponse{ string successfulReports = 1; string failedReports = 2; }
在这里,咱们添加了一个名为generateReport的新rpc,它接受ReportEmployeeRequest流做为请求并返回ReportEmployeeResponse做为响应。
所以,向rpc输入的内容是员工ID的流,服务器的响应将是单个响应,其中指出生成了多少报告以及有多少报告失败。
这是咱们更改后的完整的employee.proto文件:
syntax = "proto3"; package employee; service Employee { rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {} rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {} } message EmployeeRequest { repeated int32 employeeIdList = 1; } message EmployeeResponse{ string message = 1; } message ReportEmployeeRequest { int32 id = 1; } message ReportEmployeeResponse{ string successfulReports = 1; string failedReports = 2; }
这是添加了新rpc的完整server.js代码:
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee; let { paySalary } = require('./pay_salary.js'); let { generateReport } = require('./generate_report.js'); function main() { let server = new grpc.Server(); server.addService(employee_proto.Employee.service, { paySalary: paySalary , generateReport: generateReport } ); server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure()); server.start(); } main();
在上面的脚本中,咱们能够看到咱们还向grpc服务器添加了generateReport函数。咱们还能够看到generateReport函数来自generate_report.js文件。
建立一个名为generate_report.js的文件。
将如下脚本添加到文件中:
let { employees } = require('./data.js'); const _ = require('lodash'); function generateReport(call, callback){ let successfulReports = []; let failedReports = []; call.on('data',function(employeeStream){ let employeeId = employeeStream.id; let employee = _.find(employees, { id: employeeId }); if (employee != null) { successfulReports.push(employee.firstName); } else{ failedReports.push(employeeId); } }); call.on('end',function(){ callback(null,{ successfulReports: successfulReports.join(), failedReports: failedReports.join() }) }) } exports.generateReport = generateReport;
generateReport函数接受两个输入,即调用和回调
为了从客户端获取消息流,咱们须要在调用对象中监听数据事件。这是在如下脚本中完成的。
call.on('data',function(employeeStream){ let employeeId = employeeStream.id; let employee = _.find(employees, { id: employeeId }); if (employee != null) { successfulReports.push(employee.firstName); } else{ failedReports.push(employeeId); } });
来自客户端的每条消息都会调用data事件。该消息存在于employeeStream变量中。收到消息后,咱们尝试生成报告,并肯定报告是成功仍是失败。
调用对象上的结束事件表示客户端流已结束。如下代码显示了如何监听结束事件。
call.on('end',function(){ callback(null,{ successfulReports: successfulReports.join(), failedReports: failedReports.join() }) })
在这种状况下,当结束事件发生时,咱们将全部成功和失败报告组合到一个响应对象中,并使用回调对象将其发送回客户端。
建立一个名为client_grpc_client_stream.js的文件。将如下脚本添加到其中。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); const _ = require('lodash'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
上面的脚本具备与服务器代码相同的功能。
将如下脚本也添加到client_grpc_client_stream.js。
function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let call = client.generateReport(function (error, response) { console.log("Reports successfully generated for: ", response.successfulReports); console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports); }); let employeeIdList = [1, 10, 2]; _.each(employeeIdList, function (employeeId) { call.write({ id: employeeId }); }) call.end(); } main();
让咱们看看上面的脚本在作什么。
let call = client.generateReport(function (error, response) { console.log("Reports successfully generated for: ", response.successfulReports); console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports); });
在脚本的这一部分中,咱们正在建立一个调用对象并调用generateReport函数。一样在generateReport函数内部,咱们指示客户端一旦收到服务器的响应,应该怎么作。在这种状况下,咱们将打印服务器发送回的成功和失败报告。
let employeeIdList = [1, 10, 2]; _.each(employeeIdList, function (employeeId) { call.write({ id: employeeId }); })
在脚本的以上部分中,咱们遍历了员工ID,并将消息流发送到服务器。咱们使用call.write将消息以流的形式发送到服务器。
最后,一旦咱们在流中发送了全部消息,就可使用call.end函数指示流已完成,以下所示:
call.end();
下面给出了client_grpc_client_stream的完整代码。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); const _ = require('lodash'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee; function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let call = client.generateReport(function (error, response) { console.log("Reports successfully generated for: ", response.successfulReports); console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports); }); let employeeIdList = [1, 10, 2]; _.each(employeeIdList, function (employeeId) { call.write({ id: employeeId }); }) call.end(); } main();
打开命令提示符,而后使用如下脚本启动服务器。
node server.js
打开一个新的命令提示符,并使用如下脚本运行客户端。
node client_grpc_server_stream.js
在运行客户端时,咱们将得到如下输出。
Reports successfully generated for: First1,First2 Reports failed since Following Employee Id\'s do not exist: 10
在这种状况下,客户端已向服务器发送了3个Id的1,10,2做为消息流。而后,服务器处理流中的消息,并将单个响应发送回客户端,以显示成功的报告数量和失败的报告数量。