restapi做为先后端交互的枢纽:面对大批量的前端请求,须要确保回复的及时性。使用缓存是一项有效工具。咱们能够把多数前端请求的回复response存入缓存,特别是一些须要大量计算才能获取的回复值,更能够大大提升后端的反应速度。值得庆幸的是akka-http已经提供了对缓存的支持,是基于java8 caffein的一套缓存操做工具包的。下面就介绍一下akka-http的caching。前端
akka-http caching 有个依赖:java
"com.typesafe.akka" %% "akka-http-caching" % akkaHttpVersion,
先从缓存存储结构开始,看看下面的一段缓存结构定义:算法
import akka.http.scaladsl.util.FastFuture import akka.http.caching.scaladsl.Cache import akka.http.caching.scaladsl.CachingSettings import akka.http.caching.LfuCache val defaultCachingSettings = CachingSettings(sys) val lfuCacheSettings = //最少使用排除算法缓存
defaultCachingSettings.lfuCacheSettings .withInitialCapacity(128) //起始单位
.withMaxCapacity(1024) //最大单位
.withTimeToLive(1.hour) //最长存留时间
.withTimeToIdle(30.minutes) //最长未使用时间
val cachingSettings = defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings) //key -> String
val lfuCache: Cache[String, Option[Map[String, Any]]] = LfuCache(cachingSettings)
lfuCache是一种基于使用频率算法的缓存管理系统,这个咱们就没必要多说了。最好能拿个例子来示范解释:恰好手头有个获取用户信息的http请求样板:sql
val route = pathPrefix(pathName) { pathPrefix("getuserinfo") { (get & parameter('userid)) { userid => {
val userinfo = posRepo.getUserInfo(userid) userinfo match { case Some(ui) => complete(toJson(ui)) case None => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } } } } def getUserInfo(userid: String): Option[UserInfo] = { val sql = "SELECT CUSTOMERS.SHOPID AS SHOPID, TERMINALID, DEVICEID, IMPSVCURL FROM CUSTOMERS INNER JOIN TERMINALS " +
" ON CUSTOMERS.SHOPID=TERMINALS.SHOPID " +
" WHERE (CUSTOMERS.DISABLED=0 AND TERMINALS.DISABLED=0) " +
" AND (CUSTOMERS.EXPDATE > GETDATE() AND TERMINALS.EXPDATE > GETDATE()) AND TERMINALID='" + userid + "'" val rows = query[Map[String, Any]]("mpos", sql, rsc.resultSet2Map) val futUI: Future[Option[Map[String, Any]]] = rows.runWith(Sink.lastOption) Await.result(futUI, 3 seconds) }
当收到前端 http://mycom.com/pos/getuserinfo?userid=1234 这样的请求时须要从数据库里读取用户信息数据及进行一些转换处理。这个请求调用得频率较高、数据库读取也比较耗时,是个实在的例子。咱们来看看如何实现缓存管理:数据库
在akka-http里能够用两种方式来实现缓存管理:一、直接用cache工具,二、用akka-http提供的Directive: cache, alwaysCache后端
咱们先看看如何直接使用cache操做,先看看Cache的构建:api
abstract class Cache[K, V] extends akka.http.caching.javadsl.Cache[K, V] { cache =>
/** * Returns either the cached Future for the given key or evaluates the given value generating * function producing a `Future[V]`. */ def apply(key: K, genValue: () => Future[V]): Future[V]
Cache[K,V]是以K为键,一个()=> Future[V]为值的结构,也就是说咱们须要把一个获取Future值的函数存在缓存里:缓存
pathPrefix("getuserinfo") { (get & parameter('userid)) { userid => {
val userinfo = lfuCache.getOrLoad(userid, _ => posRepo.futureUserInfo(userid)) onComplete(userinfo) { _ match { case Success(oui) => oui match { case Some(ui) => complete(toJson(ui)) case None => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } case Failure(_) => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } } } } def futureUserInfo(userid: String): Future[Option[Map[String, Any]]] = { val sql = "SELECT CUSTOMERS.SHOPID AS SHOPID, TERMINALID, DEVICEID, IMPSVCURL FROM CUSTOMERS INNER JOIN TERMINALS " +
" ON CUSTOMERS.SHOPID=TERMINALS.SHOPID " +
" WHERE (CUSTOMERS.DISABLED=0 AND TERMINALS.DISABLED=0) " +
" AND (CUSTOMERS.EXPDATE > GETDATE() AND TERMINALS.EXPDATE > GETDATE()) AND TERMINALID='" + userid + "'" val rows = query[Map[String, Any]]("mpos", sql, rsc.resultSet2Map) rows.runWith(Sink.lastOption) }
首先咱们须要把getUserInfo修改为futureUserInfo,而后传入cache.getOrLoad():app
/** * Returns either the cached Future for the given key, or applies the given value loading * function on the key, producing a `Future[V]`. */ def getOrLoad(key: K, loadValue: K => Future[V]): Future[V]
跟着咱们再试试用akka-http的Directive, cache和alwaysCache。这两个是同一个东西,只是cache多了个是否使用缓存这么个控制,是经过request-header Cache-Control来实现的,如:Cache-Control`(`no-cache`)。cache函数是这样定义的;
函数
def cache[K](cache: Cache[K, RouteResult], keyer: PartialFunction[RequestContext, K]): Directive0
这个函数返回Directive0, 能够直接对应 { ... complete(...) },因此cache能够把一个route包嵌在里面如:
cache(myCache, simpleKeyer) { complete { i += 1 i.toString } }
simpleKeyer是个K对应函数:在咱们这个例子里K -> Uri, Cache[Uri,RouteResult]。这里有个现成的构建器:routeCache[Uri]
/** * Creates an [[LfuCache]] with default settings obtained from the system's configuration. */ def routeCache[K](implicit s: ActorSystem): Cache[K, RouteResult] = LfuCache[K, RouteResult](s)
不过这个LfuCache使用了application.conf里面的cachingSettings. 咱们想直接控制lfuCache构建,因此能够用:
val lfuCache = LfuCache[Uri,RouteResult](cachingSettings)
alwaysCache的具体使用和上面的cache.getOrLoad相同:
import akka.http.scaladsl.model.{HttpMethods, StatusCodes, Uri} import akka.http.scaladsl.util.FastFuture import akka.http.caching.scaladsl.Cache import akka.http.caching.scaladsl.CachingSettings import akka.http.caching.LfuCache import akka.http.scaladsl.server.RequestContext import akka.http.scaladsl.server.RouteResult import akka.http.scaladsl.server.directives.CachingDirectives._ import scala.concurrent.duration._ import scala.util._ val defaultCachingSettings = CachingSettings(sys) val lfuCacheSettings = //最少使用排除算法缓存
defaultCachingSettings.lfuCacheSettings .withInitialCapacity(128) //起始单位
.withMaxCapacity(1024) //最大单位
.withTimeToLive(1.hour) //最长存留时间
.withTimeToIdle(30.minutes) //最长未使用时间
val cachingSettings = defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings) //Uri->key, RouteResult -> value
val lfuCache = LfuCache[Uri,RouteResult](cachingSettings) //Example keyer for non-authenticated GET requests
val simpleKeyer: PartialFunction[RequestContext, Uri] = { val isGet: RequestContext => Boolean = _.request.method == HttpMethods.GET // val isAuthorized: RequestContext => Boolean = // _.request.headers.exists(_.is(Authorization.lowercaseName))
val result: PartialFunction[RequestContext, Uri] = { case r: RequestContext if isGet(r) => r.request.uri } result } val route = pathPrefix(pathName) { pathPrefix("getuserinfo") { (get & parameter('userid)) { userid => {
alwaysCache(lfuCache,simpleKeyer) { onComplete(posRepo.futureUserInfo(userid)) { _ match { case Success(oui) => oui match { case Some(ui) => complete(toJson(ui)) case None => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } case Failure(_) => complete(toJson(Map[String, Any](("TERMINALID" -> "")))) } } } } } } ~
好了,我觉着可能直接调用cache.getOrLoad会更好些,由于akka-http还在不停的变,java8caffein应该不会再调整了吧。