使用Akka HTTP构建微服务:CDC方法

欢迎你们前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~html

本文来自云+社区翻译社,做者阿小庆java

构建微服务并不容易,特别是当微服务变得愈来愈多时,并且好多微服务可能由不一样的团队提供和维护,这些微服务彼此交互而且变化很快。git

文档、团队交互和测试是得到成功的三大法宝,可是若是用错误的方式进行,它们会产生更多的复杂性,而不是一种优点。github

咱们可使用像Swagger(用于文档),Docker(用于测试环境),Selenium(用于端到端测试)等工具,可是咱们最终仍是会由于更改API而浪费大量时间,由于他们不是说谁适合来使用它们,或者设置合适的环境来执行集成测试,而是须要生产数据(但愿是匿名的),但生产数据可能须要很长时间才能完成。sql

对全部这些问题都没有正确的答案,但我认为有一件事能够帮助不少人:首先从用户角度出发!docker

这是什么意思?通常状况下,在开发Web应用程序的时候,从模型和流程定义开始,深刻到软件开发中,都是使用TDD(测试驱动开发)方法:先写测试,考虑咱们真正想要的,以及咱们如何使用它; 但微服务(microservices)呢?在这种状况下,它从消费者开始!消费者但愿从其余服务中得到什么以及它但愿如何互动?数据库

这就是我说的消费者驱动的契约(CDC)测试。采用这种方法,消费者本身会定义须要的数据格式以及交互细节,并驱动生成一份契约文件。而后生产者根据契约文件来实现本身的逻辑,并在持续集成环境中持续验证。json

商业案例

好比,咱们但愿在“个人图书馆”实现一项新功能,因此咱们须要介绍类别(Categories),而且咱们想知道其中有多少类别。这个想法是将逻辑分红两个服务,一个生产者(Producer)提供全部类别的列表,另外一个消费者(Consumer)对其进行计数。api

img

很是容易,但足以建立一个良好的基础结构和对CDC的理解。安全

技术栈

这篇文章,我选择了Scala做为语言,Akka HTTP做为框架。我认为这是一项很是好的技术,它能够知足构建微服务所需的全部基本要求:

  • 易于实现
  • 快速
  • 健壮性
  • 很好的支持和文档记录

在数据方面,我选择了Slick做为库,将数据库交互和FlyWay抽象为数据库迁移框架。它们既健壮又稳定,屡次使用也没有问题。

最后,也是很重要的一点,测试支持!我喜欢Scala Test,由于它始终是我在Scala的项目的一部分,但咱们的CDC呢?

对于CDC,有一个很是好的框架,可用于多平台:Pact

经过Pact,咱们能够定义咱们的消费者契约文件,并根据微服务接口的提供者和消费者进行验证。我建议花几分钟阅读官方Pact网站的主页,这很好地诠释了它背后的道理。

正如我所说的,Pact适用于不少平台,在咱们的例子中,用Scala编写Consumer和Producer,咱们只能使用一个实现:Scala-Pact

操做

为了简单起见,我已经建立了一个包含消费者和生产者的SBT项目,但它们能够很容易被分割并用做模板。你能够在github.com/mariniss/my…找到源代码。

让咱们以CDC风格开始咱们的微服务实现!首先,咱们必须定义咱们的项目。咱们能够轻松地使用SBT建立一个新的Scala项目并定义build.sbt,以下所示:

build.sbt

name := "myLibrary-contracts"
version := "0.1"
scalaVersion := "2.12.4"
enablePlugins(ScalaPactPlugin)
libraryDependencies ++= Seq(
 //Common dependencies
 "com.typesafe.akka"  %% "akka-stream"             % "2.4.20",
 "com.typesafe.akka"  %% "akka-http"               % "10.0.11", // Akka HTTP项目的标准依赖关系
 "com.typesafe.akka"  %% "akka-http-spray-json"    % "10.0.11", // 用于JSON序列化和反序列化
 "org.slf4j"           % "slf4j-simple"            % "1.7.25",  // 用于日志记录
 "org.scalatest"      %% "scalatest"               % "3.0.1"   % "test",  // 测试框架
 "org.scalamock"      %% "scalamock"               % "4.0.0"   % "test",  // 模拟框架
 "com.typesafe.akka"  %% "akka-stream-testkit"     % "2.4.20"  % "test",
 "com.typesafe.akka"  %% "akka-testkit"            % "2.4.20"  % "test",
 "com.typesafe.akka"  %% "akka-http-testkit"       % "10.0.11" % "test",
 "com.itv"            %% "scalapact-argonaut-6-2"  % "2.2.0"   % "test",
 "com.itv"            %% "scalapact-scalatest"     % "2.2.0"   % "test",
 "com.itv"            %% "scalapact-http4s-0-16-2" % "2.2.0"   % "test",
 //Producer dependencies
 "com.typesafe.slick" %% "slick"                   % "3.2.1",
 "com.typesafe.slick" %% "slick-hikaricp"          % "3.2.1",
 "com.h2database"      % "h2"                      % "1.4.196",
 "org.flywaydb"        % "flyway-core"             % "5.0.7"
)
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest,
 "-y", "org.scalatest.WordSpec",
 "-y", "org.scalatest.FunSpec")
parallelExecution in Test := false
复制代码

正如你所看到的,Akka HTTP项目的标准依赖关系(通用于提供者和消费者),spry-json用于JSON序列化和反序列化,SL4J用于日志记录,scalatest和scalamock做为测试和模拟框架,以及Scala协议为CDC测试。

生产者特定的依赖关系仅用于数据库支持,如您所见,我使用H2(在内存数据库中),但您能够轻松地将其替换为其余数据库支持。

测试环境也有特定的配置; 只是由于咱们在同一个项目中同时拥有生产者和客户端,因此并行执行被禁用,因此若是并行执行(咱们稍后会看到它),咱们可能会在Pact文件生成和使用过程当中遇到问题。另外,我已经用两种不一样的格式实现了测试,WordSpec和FunSpec,第一次用于全部的单元测试,第二次用于Pact测试,你能够按你的想法随意使用。

消费者(Consumer)操做

如今咱们有了基本的项目结构,咱们能够开始在消费者方面建立Pact测试,因此咱们能够定义咱们在给定特定场景/状态时对提供者(Provider)的指望。

*MyLibraryClientPactSpec.scala*

package com.fm.mylibrary.consumer.pact
import com.fm.mylibrary.consumer.MyLibraryClient
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._
import com.itv.scalapact.ScalaPactForger._
import org.scalatest.{FunSpec, Matchers}
import spray.json._
class MyLibraryClientPactSpec extends FunSpec with Matchers {
 describe("Connecting to the MyLibrary server") {
   it("should be able to fetch the categories"){
     val categories = List(Category("Java"), Category("DevOps"))
     forgePact
       .between("ScalaConsumer")
       .and("myLibraryServer")
       .addInteraction(
         interaction
           .description("Fetching categories")
           .given("Categories: [Java, DevOps]")
           .uponReceiving(
             method = GET,
             path = "/search/category",
             query = None)
           .willRespondWith(
             status = 200,
             headers = Map("Content-Type" -> "application/json"),
             body = categories.toJson.toString())
       )
       .runConsumerTest { mockConfig =>
         val results = new MyLibraryClient().fetchCategories()
         results.isDefined shouldEqual true
         results.get.size shouldEqual 2
         results.get.forall(c => categories.contains(c)) shouldEqual true
       }
   }
 }
}
复制代码

Scala-pact很是易于使用,这要归功于ScalaPactForger对象,能够经过几行代码构建契约定义和指望效果,更详细地说:

  • 契约参与者的定义: .between("ScalaConsumer") .and("myLibraryServer")
  • 参与者之间的相互做用的定义:
.addInteraction(interaction 
.description("Fetching categories") 
.given("Categories: [Java, DevOps]")
.uponReceiving(method = GET,path = "/search/category",query = None)
.willRespondWith( status = 200, headers = Map("Content-Type" -> "application/json"),body = categories.toJson.toString()))givenuponReceivingwillRespondWith
复制代码

真正重要的是描述系统状态,其中交互必须如所描述的那样工做,由消费者uponReceiving执行的请求和预期的响应。同时考虑到全部HTTP元素必须匹配(方法,url,标题,正文和查询)

  • 用于验证消费者契约的实际测试的定义: 此代码将针对之前的方案运行,虚拟服务器将响应 交互部分中定义的惟一HTTP请求(若是响应为deined),它将验证消费者(Consumer)是否将按照协议中的规定进行要求。也能够在消费者(Consumer)处理的结果值上添加更多的检查(声明)。

.runConsumerTest { mockConfig =>

val results = new MyLibraryClient().fetchCategories()

results.isDefined shouldEqual true

results.get.size shouldEqual 2

results.get.forall(c => categories.contains(c)) shouldEqual true}

固然,咱们能够添加更多场景和交互。咱们也能够为许多生产者定义更多的契约。我建议经过“基本路径”和标准错误情景来肯定描述正常使用状况下所需的基本情景和交互状况,可是留给单元测试全部详细的测试,以及与它们的实现相关的各类状况。

如今,您能够尝试编译并执行测试,但因为咱们没有客户端和模型,因此咱们须要添加基本逻辑来让测试经过。

我认为咱们能够经过两种方式进行,直接构建客户端(由于咱们已经进行了测试),或者改进咱们客户端的定义,建立单元测试并以纯TDD方式对其进行处理。咱们来看第二个选项:

MyLibraryClientSpec.scala

package com.fm.mylibrary.consumer
import akka.http.scaladsl.model._
import com.fm.mylibrary.model.Category
import scala.concurrent.Future
class MyLibraryClientSpec extends BaseTestAppClient {
 implicit val myLibraryServerUrl:String = "//test"
 "Fetch categories" must {
   "execute the HTTP request to get all categories and returns them" in {
     val request = HttpRequest(HttpMethods.GET, "//test/search/category")
     val responseEntity = HttpEntity(bytes = """[{"name": "Java"}, {"name": "DevOps"}]""".getBytes,
                                     contentType = ContentTypes.`application/json`)
     val response = HttpResponse(status = StatusCodes.OK, entity = responseEntity)
     requestExecutor.expects(request).returning(Future.successful(response))
     val results = new MyLibraryClient().fetchCategories()
     results.isDefined shouldEqual true
     results.get.size shouldEqual 2
     results.get.contains(Category("Java")) shouldEqual true
     results.get.contains(Category("DevOps")) shouldEqual true
   }
 }
}
复制代码

很是标准的测试; 咱们但愿抛出一个MyLibraryClient函数,该函数使用一个外部函数返回一个“Category”对象列表,该函数接受一个HttpRequest并返回一个HttpResponse。

正如你所看到的,没有明确提供这种外部依赖; 那是由于我想把它做为一个“隐含”价值。这是一种帮助建立可测试代码的方法,但我强烈建议不要使用它,由于它会使代码难以阅读,特别是对于那些新的Scala。

我也喜欢定义一个具备全部必要依赖项的特征来轻松构建测试用例:

BaseTestAppClient.scala

package com.fm.mylibrary.consumer
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import akka.testkit.{ImplicitSender, TestKit}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.concurrent.{ExecutionContextExecutor, Future}
class BaseTestAppClient extends TestKit(ActorSystem("BaseTestAppClient"))
           with WordSpecLike
           with ImplicitSender
           with Matchers
           with BeforeAndAfterAll
           with MockFactory {
 implicit val actorSystem: ActorSystem = system
 implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
 implicit val executionContext: ExecutionContextExecutor = system.dispatcher
 implicit val requestExecutor = mockFunction[HttpRequest, Future[HttpResponse]]
 override def afterAll {
   TestKit.shutdownActorSystem(system)
 }
}
复制代码

它定义了在咱们的测试中使用的actor系统和执行HTTP请求的函数。

如今咱们有了测试,让咱们来实现一些逻辑:

MyClientLibrary.scala

package com.fm.mylibrary.consumer
import akka.actor.ActorSystem
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._
import scala.concurrent.{ExecutionContextExecutor, Future}
class MyLibraryClient(implicit val myLibraryServerUrl: String,
                      implicit val actorSystem: ActorSystem,
                      implicit val materializer: ActorMaterializer,
                      implicit val executionContext: ExecutionContextExecutor,
                      implicit val requestExecutor: HttpRequest => Future[HttpResponse]) extends BaseHttpClient {
 def fetchCategories(): Option[List[Category]] = executeSyncRequest(
   RequestBuilding.Get(s"$myLibraryServerUrl/search/category"),
   response =>
     if(response.status == StatusCodes.OK)
       Unmarshal(response.entity).to[Option[List[Category]]]
     else
       Future.successful(None)
 )
}
复制代码

Category.scala

package com.fm.mylibrary.model
case class Category (name: String)
复制代码

这个相对容易实现。而且我使用了隐式声明依赖关系,但能够显性地提升代码的可读性。

接下来我建立了一个特征,它为每一个HTTP客户端(如今只有一个)定义了基本组件,并具备一个以同步方式执行HTTP请求的功能:

BaseHttpClient.scala

package com.fm.mylibrary.consumer
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.language.postfixOps
trait BaseHttpClient {
 implicit def actorSystem: ActorSystem
 implicit def materializer: ActorMaterializer
 implicit def executionContext: ExecutionContextExecutor
 implicit def requestExecutor: HttpRequest => Future[HttpResponse]
 val awaitTime: FiniteDuration = 5000 millis
 def executeSyncRequest[T](request: HttpRequest, responseHandler: HttpResponse => Future[T]): T = {
   val response: Future[T] = requestExecutor(request).flatMap({ response =>
     responseHandler(response)
   })
   Await.result(response, awaitTime)
 }
}
复制代码

如今咱们很好地执行单元测试,若是咱们没有犯错误,咱们应该获得一个成功的执行。随意添加更多测试并重构客户端以便根据您的喜爱调整结构(您能够在此处找到更多测试)。

咱们也能够尝试执行Pact test(MyLibraryClientPactSpec),但它会失败,由于它应该执行一个真正的HTTP调用,scala-pact框架将启动一个真实的HTTP服务器,接受和响应协议中描述的请求。

咱们差很少完成了咱们想要的实现,它基本上是定义了actor系统和执行HTTP调用的函数的元素:

MyLibraryAppClient.scala

package com.fm.mylibrary.consumer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import scala.concurrent.{ExecutionContextExecutor, Future}
object MyLibraryAppClient {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 implicit val requestExecutor: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)
}
复制代码

它是一个对象,因此咱们能够将它导入到任何咱们必须使用咱们的客户端的地方,正如您在Pact测试中看到的那样: import com.fm.mylibrary.consumer.app.MyLibraryAppClient._

固然,您可使用其余方法,但请**在选择时保持一致,**并避免在相同或相似项目中使用不一样的方法/结构。

咱们终于能够执行协议测试了!若是你很幸运,你应该获得这样的输出:

> Adding interactions:
> - Interaction(None,Some(Categories: [Java, DevOps]),Fetching categories,InteractionRequest(Some(GET),Some(/search/category),None,None,None,None),InteractionResponse(Some(200),Some(Map(Content-Type -> application/json)),Some([{"name":"Java"},{"name":"DevOps"}]),None))
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.nio1.NIO1SocketServerGroup - Service bound to address /127.0.0.1:55653
> ScalaPact stub running at: http://localhost:55653
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.ServerChannelGroup - Connection to /127.0.0.1:55666 accepted at Tue Feb 13 11:43:08 GMT 2018.
[http4s-blaze-client-1] INFO org.http4s.client.PoolManager - Shutting down connection pool: allocated=1 idleQueue.size=1 waitQueue.size=0
[DEBUG] [02/13/2018 11:43:09.376] [ScalaTest-run-running-MyLibraryClientPactSpec] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [02/13/2018 11:43:09.377] [ScalaTest-run-running-MyLibraryClientPactSpec] [EventStream(akka://default)] Default Loggers started
[DEBUG] [02/13/2018 11:43:09.595] [ScalaTest-run-running-MyLibraryClientPactSpec] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [02/13/2018 11:43:09.598] [ScalaTest-run-running-MyLibraryClientPactSpec] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@db2cd5
[DEBUG] [02/13/2018 11:43:09.834] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] (Re-)starting host connection pool to localhost:55653
[DEBUG] [02/13/2018 11:43:10.123] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] InputBuffer (max-open-requests = 32) now filled with 1 request after enqueuing GET /search/category Empty
[DEBUG] [02/13/2018 11:43:10.127] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] Unconnected -> Loaded(1)
[DEBUG] [02/13/2018 11:43:10.137] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] <unconnected> Establishing connection...
[DEBUG] [02/13/2018 11:43:10.167] [default-akka.actor.default-dispatcher-2] [default/Pool(shared->http://localhost:55653)] [0] <unconnected> pushing request to connection: GET /search/category Empty
[DEBUG] [02/13/2018 11:43:10.179] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] Resolving localhost before connecting
[DEBUG] [02/13/2018 11:43:10.200] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-DNS] Resolution request for localhost from Actor[akka://default/system/IO-TCP/selectors/$a/0#871918912]
[DEBUG] [02/13/2018 11:43:10.209] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:55653]
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.ServerChannelGroup - Connection to /127.0.0.1:55669 accepted at Tue Feb 13 11:43:10 GMT 2018.
[DEBUG] [02/13/2018 11:43:10.212] [default-akka.actor.default-dispatcher-5] [akka://default/system/IO-TCP/selectors/$a/0] Connection established to [localhost:55653]
[DEBUG] [02/13/2018 11:43:10.291] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> Received response: GET /search/category Empty -> 200 OK Strict(35 bytes)
[DEBUG] [02/13/2018 11:43:10.296] [default-akka.actor.default-dispatcher-8] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> Finished reading response entity for GET /search/category Empty -> 200 OK Strict(35 bytes)
[DEBUG] [02/13/2018 11:43:10.298] [default-akka.actor.default-dispatcher-5] [default/Pool(shared->http://localhost:55653)] [0] Loaded(1) -> Idle
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.ServerChannel - Closing NIO1 channel /127.0.0.1:55653 at Tue Feb 13 11:43:10 GMT 2018
[ScalaTest-run-running-MyLibraryClientPactSpec] INFO org.http4s.blaze.channel.nio1.NIO1SocketServerGroup - Closing NIO1SocketServerGroup
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-0
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-1
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-2
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-3
[blaze-nio1-acceptor] INFO org.http4s.blaze.channel.nio1.SelectorLoop - Shutting down SelectorLoop blaze-nio-fixed-selector-pool-4
[DEBUG] [02/13/2018 11:43:10.355] [default-akka.actor.default-dispatcher-3] [default/Pool(shared->http://localhost:55653)] [0] </127.0.0.1:55669->localhost:55653> connection was closed by peer while no requests were in flight
[DEBUG] [02/13/2018 11:43:10.360] [default-akka.actor.default-dispatcher-3] [default/Pool(shared->http://localhost:55653)] [0] Idle -> Unconnected
Process finished with exit code 0
复制代码

我已经使用IntelliJ IDEA CE来执行测试,可是您能够直接使用这些命令来使用sbt:

  • sbt test:它执行扩展了FunSpec和WordSpec的全部测试(如在build.sbt定义)
  • sbt pactTest:它执行全部pacts测试

该测试验证了消费者协议,并生成提供者必须遵照的契约/协议。你能够找到它们,它们是遵循特定Pact结构的JSON文件。生成的应该是这样的:target/pacts

ScalaConsumer_myLibraryServer.json

{
 "provider" : {
   "name" : "myLibraryServer"
 },
 "consumer" : {
   "name" : "ScalaConsumer"
 },
 "interactions" : [
   {
     "request" : {
       "method" : "GET",
       "path" : "/search/category"
     },
     "description" : "Fetching categories",
     "response" : {
       "status" : 200,
       "headers" : {
         "Content-Type" : "application/json"
       },
       "body" : [
         {
           "name" : "Java"
         },
         {
           "name" : "DevOps"
         }
       ]
     },
     "providerState" : "Categories: [Java, DevOps]"
   }
 ]
}
复制代码

正如你所看到的,这很是简单,两个参与者(提供者和消费者)的定义与可能的交互。

迄今为止已经很好好。但您能够添加更多的逻辑,更多的客户端,更多的契约,更多的服务等.Git仓库中的项目还包含一个小型服务,其中包含业务逻辑,计算类别的详细任务。这里是代码: CategoriesServiceSpec.scala

package com.fm.mylibrary.consumer.service
import com.fm.mylibrary.consumer.MyLibraryClient
import com.fm.mylibrary.model.Category
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, WordSpec}
class CategoriesServiceSpec extends WordSpec with Matchers with MockFactory {
 private val mockMyLibraryClient = mock[MyLibraryClient]
 private val service = new CategoriesService(mockMyLibraryClient)
 "Count Categories" must {
   "return the number of all categories fetched form MyLibrary" in {
     val javaCategory = Category("Java")
     val devopsCategory = Category("DevOps")
     (mockMyLibraryClient.fetchCategories _).expects().returning(Some(List(javaCategory, devopsCategory)))
     val result = service.countCategories()
     result shouldBe 2
   }
   "return 0 in case of the fetch form MyLibrary fails" in {
     (mockMyLibraryClient.fetchCategories _).expects().returning(None)
     val result = service.countCategories()
     result shouldBe 0
   }
 }
}
复制代码

CategoriesService.scala

package com.fm.mylibrary.consumer.service
import com.fm.mylibrary.consumer.MyLibraryClient
class CategoriesService(val myLibraryClient: MyLibraryClient) extends {
 def countCategories(): Int = myLibraryClient.fetchCategories() match {
   case None => 0
   case Some(categories) =>
     categories.size
 }
}
复制代码

我没有使用任何依赖注入框架,由于我相信,若是微服务须要一个DI框架,那会使它变得很是庞大而复杂,可是若是你不像我这样想,能够随意使用它。我过去使用过Google Guice,看起来至关不错。

生产者(Provider)实现

一旦咱们用契约文件定义了咱们的消费者(Consumer),咱们就能够转移到生产者并使用消费者产生的关联来实现它。

与往常同样,咱们从测试开始。至于生产者,咱们将有两种类型的测试,一种是验证协议,另外一种是详细验证业务逻辑(单元测试)。服务器的实现一般比客户端要大得多,因此我认为最好从单元测试开始,一旦咱们有了一个完整的应用程序,咱们就能够建立测试来验证pact(或契约)。

另外,我老是建议采用增量方法(即便是小型项目),因此在这种状况下,咱们能够构建一个服务器来公开一个API并返回两个类别的静态列表(如Pact文件中定义的),而后添加配置支持,数据库支持,迁移支持等。

在这里,咱们将对咱们的API进行单元测试:

CategoriesRoutesSpec.scala

package com.fm.mylibrary.producer
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.model.JsonProtocol._
class CategoriesRoutesSpec extends BaseTestAppServer {
 "The service" should {
   "return an empty JSon array if there are no categories" in {
     Get("/search/category") ~> routes ~> check {
       responseAs[List[Category]] shouldBe List(Category("DevOps"), Category("Java"))
     }
   }
 }
}
复制代码

以及具备全部测试依赖性的基本测试类BaseTestAppServer:

BaseTestAppServer.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.testkit.ScalatestRouteTest
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import scala.concurrent.ExecutionContextExecutor
class BaseTestAppServer extends WordSpec
           with ScalatestRouteTest
           with Matchers
           with MockFactory
           with Routes
           with BeforeAndAfterAll {
 implicit val executionContext: ExecutionContextExecutor = system.dispatcher
}
复制代码

该测试是使用Akka HTTP Route TestKit实现的,您能够在这里找到官方文档,它容许在这种格式的路由上构建测试:

REQUEST ~> ROUTE ~> check {
    ASSERTIONS 
}
复制代码

BaseTestAppServer的类包含基本的依赖WordSpecScalatestRouteTestMatchersMockFactoryBeforeAndAfterAll和定义应用程序的路由的性状:Routes

固然它不会编译也不会传递,由于尚未实现,因此让咱们定义咱们的路由:

Routes.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.Materializer
import com.fm.mylibrary.model.Category
import scala.concurrent.ExecutionContext
import spray.json._
import com.fm.mylibrary.model.JsonProtocol._
trait Routes {
 implicit val materializer: Materializer
 implicit val executionContext: ExecutionContext
 val searchRoutes: Route = {
   pathPrefix("search" / "category") {
     get {
       complete(
         List(Category("DevOps"), Category("Java")).toJson
       )
     }
   }
 }
 val routes: Route = searchRoutes
}
复制代码

我为json编组/解组使用了spray-json,而且它须要定义用于转换的协议(或格式),您能够在代码import com.fm.mylibrary.model.JsonProtocol._中看到此对象的导入:; 还须要导入其中import spray.json._提供转换的全部功能; 在这种状况下,我正在使用toJson寻找它将要转换的特定对象的协议(或格式)的隐式定义。

JsonProtocol.scala

package com.fm.mylibrary.model
import spray.json._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
object JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
 implicit val categoryFormat = jsonFormat1(Category)
}
复制代码

没有必要为对象定义转换器ListArrayOptions,等等,由于它们是由DefaultJsonProtocol中的,spry-json提供。

还有其余相似的库,如ArgonautJSON4S,能够按你想法评估全部这些库,并选择最适合您需求的库。

若是咱们再次执行测试,咱们如今应该获得一条绿线。再次,添加更多的测试,以涵盖每个案例。在此以前,为了检查咱们的服务是否符合消费者契约,咱们必须完成定义Akka HTTP应用程序的基本服务:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.Routes
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = "localhost", port = 9000).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}
复制代码

这个类定义了两个方法,一个是启动咱们的服务器所必需的,另外一个是中止服务器的方法,它还定义了将在路由处理中使用的actor系统和执行上下文。

它扩展了提供主要方法的特征scala.App,因此你能够执行这个类,它将启动一个提供定义路由的http服务器。

但首先,让咱们来检查一下协议是否被知足,咱们能够很容易地用这样的测试类来验证它:

MyLibraryServerPactSpec.scala

package com.fm.mylibrary.producer.pact
import com.fm.mylibrary.producer.app.MyLibraryAppServer
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import com.itv.scalapact.ScalaPactVerify._
class MyLibraryServerPactSpec extends FunSpec with Matchers with BeforeAndAfterAll {
 override def beforeAll() {
   MyLibraryAppServer.main(Array())
 }
 override def afterAll() {
   MyLibraryAppServer.stopApplication()
 }
 describe("Verifying MyLibrary server") {
   it("should be able to respect the contract"){
     verifyPact
       .withPactSource(loadFromLocal("target/pacts"))
       .noSetupRequired
       .runVerificationAgainst("localhost", 9999)
   }
 }
}
复制代码

它使用能够以像相似forgePact方式使用的对象verifyPact,Pact文件的来源target/pacts在咱们的例子中定义(但能够是共享位置或Pact Broker),设置执行所需的数据或环境所需的最终代码全部交互,而后是服务器正在侦听请求的主机和端口。

所以,根据Consumer测试,咱们但愿scala-pact执行真正的HTTP调用,因此咱们须要设置应用程序以处理此调用。咱们能够经过多种方式作到这一点,我为我选择了安全和简单的解决方案,即在生产中启动服务器,调用以前执行测试MyLibraryAppServer的主要方法,而且以后关闭它。若是应用程序很简单,咱们可使用这种方法,若是不是这样,咱们能够为这种测试实现特定的测试运行器,但我建议尽量与生产案例相似。

执行测试,咱们应该获得一个pass和一个这样的输出:

[DEBUG] [02/13/2018 16:45:09.053] [ScalaTest-run] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [02/13/2018 16:45:09.054] [ScalaTest-run] [EventStream(akka://default)] Default Loggers started
[DEBUG] [02/13/2018 16:45:09.110] [ScalaTest-run] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [02/13/2018 16:45:09.112] [ScalaTest-run] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@1bb571c
[DEBUG] [02/13/2018 16:45:10.244] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:9000
[INFO] [02/13/2018 16:45:10.256] [default-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(default)] application is up and running at 127.0.0.1:9000
Attempting to use local pact files at: 'target/pacts'
Looking for pact files in: target/pacts
Found directory: C:\Dev\git-1.0.6\home\src-rnd\myLibrary-contracts\target\pacts
Loading pact file: ScalaConsumer_myLibraryServer.json
Verifying against 'localhost' on port '9000' with a timeout of 2 second(s).
--------------------
Attempting to run provider state: Categories: [Java, DevOps]
Provider state ran successfully
--------------------
[DEBUG] [02/13/2018 16:45:10.883] [default-akka.actor.default-dispatcher-4] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [02/13/2018 16:45:11.146] [default-akka.actor.default-dispatcher-2] [akka.actor.ActorSystemImpl(default)] log: Response for
  Request : HttpRequest(HttpMethod(GET),http://localhost:9000/search/category,List(Host: localhost:9000, User-Agent: scala-pact/0.16.2, Timeout-Access: <function1>),HttpEntity.Strict(none/none,ByteString()),HttpProtocol(HTTP/1.1))
  Response: Complete(HttpResponse(200 OK,List(),HttpEntity.Strict(application/json,[{"name":"DevOps"},{"name":"Java"}]),HttpProtocol(HTTP/1.1)))
[http4s-blaze-client-1] INFO org.http4s.client.PoolManager - Shutting down connection pool: allocated=1 idleQueue.size=1 waitQueue.size=0
[DEBUG] [02/13/2018 16:45:11.262] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/1] Closing connection due to IO error java.io.IOException: An existing connection was forcibly closed by the remote host
Results for pact between ScalaConsumer and myLibraryServer
 - [  OK  ] Fetching categories
[DEBUG] [02/13/2018 16:45:11.391] [default-akka.actor.default-dispatcher-9] [EventStream] shutting down: StandardOutLogger started
[DEBUG] [02/13/2018 16:45:11.391] [default-akka.actor.default-dispatcher-7] [akka://default/system/IO-TCP/selectors/$a/0] Monitored actor [Actor[akka://default/user/StreamSupervisor-0/$a#-487633161]] terminated
Process finished with exit code 0
复制代码

若是你不能执行,请确保在其中包含协议文件。target/pactsMyLibraryClientPactSpec

消费者协议彷佛受到尊重,因此咱们能够继续实现,添加外部配置文件,数据库支持和数据库迁移支持。

添加外部配置是很容易的,只须要在建立文件下,配置它全部的配置值,即:application.confsrc/main/resources

application.conf

akka {
 loglevel = DEBUG
}
http {
 interface = "0.0.0.0"
 port = 9000
}
database = {
 url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"
 driver = org.h2.Driver
 connectionPool = disabled
 keepAliveConnection = true
 user = "sa"
 password = ""
}
复制代码

而后,您能够建立一个处理它的特征,从而加载配置和相应的命名常量:

Config.scala

package com.fm.mylibrary.producer
import com.typesafe.config.ConfigFactory
trait Config {
 private val config = ConfigFactory.load()
 private val httpConfig = config.getConfig("http")
 private val databaseConfig = config.getConfig("database")
 val httpInterface: String = httpConfig.getString("interface")
 val httpPort: Int = httpConfig.getInt("port")
 val databaseUrl: String = databaseConfig.getString("url")
 val databaseUser: String = databaseConfig.getString("user")
 val databasePassword: String = databaseConfig.getString("password")
}
复制代码

默认状况下,ConfigFactory.load()src/main/resources/application.conf该位置加载配置

咱们也能够将测试的配置版本放在:src/test/resources

application.conf

akka {
 loglevel = DEBUG
}
http {
 interface = "localhost"
 port = 9999
}
database = {
 url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"
 driver = org.h2.Driver
 connectionPool = disabled
 keepAliveConnection = true
 user = "sa"
 password = ""
}
复制代码

在这种状况下没有太大的不一样,由于我正在使用内存数据库。

在主类中使用它很是容易; 只需将其添加为类特征,并将静态值替换为相应的常量便可:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.{Config, Routes}
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with Config
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}
复制代码

您也能够在Pact测试中使用该配置,以便使用正确的服务器地址:

MyLibraryServerPactSpec.scala

package com.fm.mylibrary.producer.pact
import com.fm.mylibrary.producer.Config
import com.fm.mylibrary.producer.app.MyLibraryAppServer
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import com.itv.scalapact.ScalaPactVerify._
class MyLibraryServerPactSpec extends FunSpec with Matchers with BeforeAndAfterAll with Config {
 override def beforeAll() {
   MyLibraryAppServer.main(Array())
 }
 override def afterAll() {
   MyLibraryAppServer.stopApplication()
 }
 describe("Verifying MyLibrary server") {
   it("should be able to respect the contract"){
     verifyPact
       .withPactSource(loadFromLocal("target/pacts"))
       .noSetupRequired
       .runVerificationAgainst(httpInterface, httpPort)
   }
 }
}
复制代码

如今咱们终于能够经过迁移来添加数据库支持。

首先,咱们必须定义咱们的实体(或表),在咱们的例子中,咱们只须要一个:Category

CategoryEntity.scala

package com.fm.mylibrary.producer.entity
import com.fm.mylibrary.model.Category
import slick.jdbc.H2Profile.api._
trait CategoryEntity {
 class Categories(tag: Tag) extends Table[Category](tag, "CATEGORY") {
   def name = column[String]("NAME", O.PrimaryKey)
   def * = name  <> (Category.apply, Category.unapply)
 }
 protected val categories = TableQuery[Categories]
}
复制代码

这是一个标准的光滑表格定义; 你能够看到这个表只有一列也是主键,它和类的类别有关Table[Category]

它能够从Category类中实例化,如定义:def * = name <> (Category.apply, Category.unapply),确保模型类同时实现了apply和unapply,最简单的方法是定义模型类的案例类

最后一条指令是定义TableQuery对象,该对象对于该表执行任何类型的查询都是必需的。让咱们来定义咱们的任何数据库交互的主要入口点,我已经实现了它能够被任何类须要数据库访问使用的特征:

DatabaseSupport.scala

package com.fm.mylibrary.producer.db
import slick.jdbc.H2Profile
import slick.jdbc.H2Profile.api._
trait DatabaseSupport {
 val db: H2Profile.backend.Database = Database.forConfig("database")
 def closeDB(): Unit = db.close
}
复制代码

咱们如今能够定义在类别表DAO上操做所必需的图层。我已经在CategoryEntity的相同的文件中建立了它,可是若是您想要使用不一样的包,则能够将它移动到不一样的文件中:

CategoryEntity.scala

package com.fm.mylibrary.producer.entity
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.producer.db.DatabaseSupport
import slick.jdbc.H2Profile.api._
import scala.concurrent.Future
trait CategoryEntity {
 class Categories(tag: Tag) extends Table[Category](tag, "CATEGORY") {
   def name = column[String]("NAME", O.PrimaryKey)
   def * = name  <> (Category.apply, Category.unapply)
 }
 protected val categories = TableQuery[Categories]
}
class CategoryDAO extends CategoryEntity with DatabaseSupport {
 def insertOrUpdate(category: Category): Future[Int] =
       db.run(categories.insertOrUpdate(category))
 def findAll(): Future[Seq[Category]] =
       db.run(categories.result)
}
复制代码

CategoryDAO同时扩展DatabaseSupportCategoryEntity,首先是要得到分类表查询的对象,第二个是要获得数据库实例用来执行查询。

我只实现了两种方法,对咱们的测试来讲已经足够了。正如您所看到的,我使用Slick提供的基本方法,而且因为实体Categories和模型Category相互关联,所以DAO能够直接返回模型而不显式转换。您能够在官方文档中找到更多关于如何在Slick中实现实体和DAO的示例和信息。

若是他们实现库提供的标准查询,我一般不会实现DAO测试,我没有看到测试外部库方法的任何一点,而且它们已经被路由测试覆盖了。可是,若是DAO实现了涉及多个表的复杂查询,我强烈建议对全部可能的案例进行单元测试。

为了如今开始咱们的应用程序,须要一个带有分类表的数据库,而且咱们能够手动完成,或者让机器为咱们完成工做。因此咱们能够实现一个数据库迁移,它可以在启动时应用任何须要的数据库更改来执行应用程序。

正如咱们为数据库支持所作的那样,咱们能够实现一个提供执行迁移功能的特性:

DatabaseMigrationSupport.scala

package com.fm.mylibrary.producer.db
import com.fm.mylibrary.producer.Config
import org.flywaydb.core.Flyway
trait DatabaseMigrationSupport extends Config {
 private val flyway = new Flyway()
 flyway.setDataSource(databaseUrl, databaseUser, databasePassword)
 def migrateDB(): Unit = {
   flyway.migrate()
 }
 def reloadSchema(): Unit = {
   flyway.clean()
   flyway.migrate()
 }
}
复制代码

这暴露了两种方法,一种是增量迁移,一种是从新执行整个迁移。它使用特征来获取数据库链接信息。Config

默认状况下,Flayway会在src/main/resources/db/migration中查找迁移的sql脚本文件,它须要具备特定名称格式的文件:

img

官方迁移文档获取更多信息。

因此,咱们的第一个迁移脚本是建立分类表:

V1__Create_Category.sql

CREATE TABLE category (
 name VARCHAR(255) NOT NULL PRIMARY KEY
);
复制代码

咱们能够在服务器启动时执行它:

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.db.DatabaseMigrationSupport
import com.fm.mylibrary.producer.{Config, Routes}
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with Config
         with DatabaseMigrationSupport
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   migrateDB()
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}
复制代码

咱们在HTTP绑定以前添加了DatabaseMigrationSupport和migrateDB()的调用。

最后一件事是将咱们的新数据源与业务逻辑关联起来,改变路线以便从DB中检索类别:

Routes.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.Materializer
import com.fm.mylibrary.producer.entity.CategoryDAO
import scala.concurrent.ExecutionContext
import spray.json._
import com.fm.mylibrary.model.JsonProtocol._
trait Routes {
 implicit val materializer: Materializer
 implicit val executionContext: ExecutionContext
 private val categoryEntityDAO = new CategoryDAO()
 val searchRoutes: Route = {
   pathPrefix("search" / "category") {
     get {
       complete(
         categoryEntityDAO.findAll()
             .map(_.toJson)
       )
     }
   }
 }
 val routes: Route = searchRoutes
}
复制代码

咱们刚刚调用dao中的findAll方法替换了静态列表。

你能够看到dao在trait中被实例化,若是逻辑变得更复杂,我建议将它做为必需的参数(隐式或类属性)移动,以便从外部注入它们。在咱们如今的状况下,没有必要,由于逻辑很是简单,在测试方面,咱们使用的是内存数据库,因此没有必要对它进行模拟。

回到测试路径上,它会失败,由于没有数据,因此咱们要添加它们。咱们能够很容易地用一种方法的特征来实现,这个特征实现了一个方法,添加了几个类别::

MockData.data

package com.fm.mylibrary.producer.db
import com.fm.mylibrary.model.Category
import com.fm.mylibrary.producer.entity.CategoryDAO
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.Duration
trait MockData {
 implicit val executionContext: ExecutionContext
 def addMockCategories(): Unit = {
   val categoryEntityDAO = new CategoryDAO()
   val setupFuture = for {
     c1 <- categoryEntityDAO.insertOrUpdate(Category("Java"))
     c2 <- categoryEntityDAO.insertOrUpdate(Category("DevOps"))
   } yield c1 + c2
   Await.result(setupFuture, Duration.Inf)
 }
}
复制代码

将它添加进来,以便咱们可使用路由测试和Pact测试轻松验证应用程序:BaseAppServerTestAppMyLibraryAppServer

MyLibraryAppServer.scala

package com.fm.mylibrary.producer.app
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.stream.ActorMaterializer
import com.fm.mylibrary.producer.db.{DatabaseMigrationSupport, MockData}
import com.fm.mylibrary.producer.{Config, Routes}
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object MyLibraryAppServer extends App
         with Routes
         with Config
         with DatabaseMigrationSupport
         with MockData
         with DebuggingDirectives {
 implicit val actorSystem: ActorSystem = ActorSystem()
 implicit val materializer: ActorMaterializer = ActorMaterializer()
 implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
 val log = actorSystem.log
 def startApplication(): Unit = {
   migrateDB()
   addMockCategories()
   Http().bindAndHandle(handler = logRequestResult("log")(routes), interface = httpInterface, port = httpPort).onComplete {
     case Success(b) => log.info(s"application is up and running at ${b.localAddress.getHostName}:${b.localAddress.getPort}")
     case Failure(e) => log.error(s"could not start application: {}", e.getMessage)
   }
 }
 def stopApplication(): Unit = {
   actorSystem.terminate()
 }
 startApplication()
}
复制代码

BaseTestAppServer.scala

package com.fm.mylibrary.producer
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.fm.mylibrary.producer.db.{DatabaseMigrationSupport, MockData}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import scala.concurrent.ExecutionContextExecutor
class BaseTestAppServer extends WordSpec
           with ScalatestRouteTest
           with Matchers
           with MockFactory
           with DatabaseMigrationSupport
           with MockData
           with Routes
           with BeforeAndAfterAll {
 implicit val executionContext: ExecutionContextExecutor = system.dispatcher
 override def beforeAll(): Unit = {
   migrateDB()
   addMockCategories()
 }
}
复制代码

若是咱们执行全部测试,咱们应该没有问题; 你能够用sbt test命令来作到这一点

若是咱们启动服务器,用sbt run命令,并执行GET /search/category,咱们应该获得咱们的两个类别:

img

总结

消费者驱动的契约测试是一项很是棒的技术,能够节省不少时间和与集成测试相关的问题。

全部的实现都是*“以契约为中心”的,*因此它意味着咱们强制首先考虑如何让消费者得到特定的服务,而且咱们必须提供特定的服务,而后咱们不须要设置基础设施来执行集成测试服务。

另外一方面,Scala协议没有很好的文档记录,所以设置复杂测试会颇有挑战性,而我发现的惟一方法是浏览它的示例和源代码。

咱们已经看到了一个很是简单的例子,不多在真实环境中使用,可是但愿您能够将它用做下一个微服务的起点。

更多关于CDC和Pact

我已经向你展现了Pact的最基本用法,对于一个真正的环境来讲这多是不够的,由于有许多团队,每一个团队都与许多生产者和消费者进行*“并发”*工做,其中通讯很是重要,以及自动化和用于解决它的工具。

在CDC和Pact的状况下,您必须自动执行契约处理(发布/验证),并将其与CI / CD(持续集成/持续交付)流程相连接,以便在没有相关生产商的状况下客户没法投入生产尊重他们的契约,若是违反了某些契约,任何生产者都不能生产。

因此,我强烈建议您将Pact的官方文档和介绍人Pact Broker带入您的CI / CD流程,它是一个提供如下功能的应用程序(来自官方文档):

  • 经过独立部署您的服务并避免集成测试的瓶颈,您能够快速,放心地利用客户价值
  • 解决了如何在消费者和提供者项目之间共享契约验证结果的问题
  • 告诉您能够将应用程序的哪一个版本安全地部署在一块儿,自动地将您的合同版本部署在一块儿
  • 容许您确保多个消费者版本和提供者版本之间的向后兼容性(例如,在移动或多租户环境中)
  • 提供保证为最新的应用程序的API文档
  • 向您展现您的服务如何互动的真实例子
  • 容许您可视化服务之间的关系

您能够随时提出任何问题,若是您须要建议,我将很是乐意提供帮助。

扩展阅读:https://www.cnblogs.com/jinjiangongzuoshi/p/7815243.html


问答

微服务架构:跨服务数据共享如何实现?

相关阅读

在微服务之间进行通讯

服务集成时需避免的两个错误

基于 RabbitMQ 和 AMQP 进行消息传递


此文已由做者受权腾讯云+社区发布,原文连接:https://cloud.tencent.com/developer/article/1149167?fromSource=waitui

欢迎你们前往腾讯云+社区或关注云加社区微信公众号(QcloudCommunity),第一时间获取更多海量技术实践干货哦~

相关文章
相关标签/搜索