AbtestingGateway 复制请求到其余服务上

perface

小道一句:本文为原创文章,某SDN博主抄袭个人文章不带出处的。html

最近有需求,须要把服务a发给服务b的请求复制给服务c,服务a发给服务b的时候会通过nginx,这个nginx是有lua脚原本辅助工做的。说白了,这个nginx+lua就是abtestingGateway。
架构图以下:
nginx

下面看看怎么实现abtestingGateway来复制流量。redis

分流条件

customercode和user字段的数据都在请求里的json数据里面。json

  1. 匹配到customercode等于咱们指定的值后把流量分到服务B。
  2. 在条件1的基础上,判断是否有users这个字段,有的话服务b,服务C同时发,不然只发服务B。
  3. 若是没有匹配到customercode,那么就转发到服务C上。

分流策略添加

这个请参考咱们的另外一篇博文 abtestingGateway 分流策略添加架构

编写复制请求的代码

我在abtestingGateway下,在customercode.lua(lib/abtesting/diversion目录下)文件里面添加代码函数

逻辑是这样的:
当abtestingGateway拿到customercode后,从redis里面拿取对应的upstream,若是upstream是服务b的upstream,那么就须要复制流量咯,反之。
因此咱们在getUpstream里添加一段代码post

_M.getUpstream = function(self, customercode)
    if not tostring(customercode) then
        return nil
    end
    
    local database, key = self.database, self.policyLib
    
    local backend, err = database:hget(key, customercode)
    if not backend then error{ERRORINFO.REDIS_ERROR, err} end
    
    if backend == ngx.null then backend = nil end
    
    -- 下面就是新添加的代码 begin
    if backend == "newacm" then   -- newacm就是咱们配置的upstream名字
        copyRequest()
    end     
    -- end

    local new_acm_uri = ngx.var.new_acm_uri
    if new_acm_uri and backend then
        backend = backend..new_acm_uri
    end   
    return backend
end

匹配到指定的upstream后,那么就走copyRequest的方法了,代码以下:测试

_M.copyRequestToOldAcm = function(self)
    if  ngx.var.copy_switch == "on" then -- 分流开关,on为打开
        copyRequest()
    end
end

function copyRequest() -- 匹配到咱们指定的customerCode之后,那么就复制一份请求发送到老的ACM上,缘由是由于发送给老的acm就可新老一块同步数据了
        users_exist = false  -- 有这个users那么这个标志位为true,没有的话就是false
        action = ngx.var.request_method

        local postData = ngx.req.get_post_args()  --若是post请求里面没有这个customercode参数,
        if postData then  -- post 请求
            local errinfo =  ERRORINFO.UNKNOWN_ERROR
            if postData then
                for k, v in pairs(postData) do  --这个k为未json反序列化的数据,v为布尔值
                    local after_json_k = cjson.decode(k)  --提交上来的数据为json格式的,须要反序列化一下
                    if after_json_k then
                        for k1 ,v1 in pairs(after_json_k) do
                            if k1 == "customer" then
                               if v1.users then
                                   users_exist = true
                               end
                            end
                        end
                    end
                end
            end
        end

        if users_exist == true then   --没有匹配到users在post请求数据里,那么就不须要复制一份请求到老的的acm上了,直接return就好了。不然须要复制
            log:info("the request has users arguments ,so  needn't copy the request to the old acm!")
            return false
        else
            log:info("the request doesn't have users arguments, so need to copy the request to  the old acm!")           
        end

        if action == "POST" then
                --ngx.req.read_body() -- 解析 body 参数以前必定要先读取 body
                local arg = ngx.req.get_post_args() -- post须要参数传递
                arry = {method = ngx.HTTP_POST, body = ngx.var.request_body, args=arg}
        else
                arry = {method = ngx.HTTP_GET}
        end

        oldacm = ngx.location.capture_multi {
            { "/oldacm" .. ngx.var.request_uri, arry},
        }
        if oldacm.status == ngx.HTTP_OK then -- 只返回online server的结果
            -- ngx.say(online.body)
            log:info("copy request result is ok!!")
            return true    
        else
            -- ngx.status = ngx.HTTP_NOT_FOUND
            --for i,r in pairs(oldacm) do   end
            log:info("copy request result is failed!! the response body is -->"..oldacm.body)
            return false
        end     
                                                
end

而后咱们在diversion/diversion.lua里面添加下这段代码,添加的这段代码起到这个做用的,abtestingGateway在运行的时候,只要匹配到了咱们的规则,好比customercode=123456,匹配到了customercode=123456,那么abtestingGateway会从redis获取这个规则对应的upstream,而后把这个upstream放在nginx的内存里面,在接下来的60秒之内,只要匹配到规则就直接从内存里面拿取,不走getUpstream这个函数了,因此也触发不了复制请求的代码块了。这就是为何咱们还要添加下面这些代码:ui

95 local copyRequest = function()
 96     local cm =    require("abtesting.diversion.customercode")
 97     local cr   = cm:copyRequestToOldAcm()
 98     return cr
 99 end
264 local info = "get upstream ["..ups.."] according to ["..idx.."] userinfo ["..usertable[idx].."] in cache 1"
266 log:info(info)
267 if string.find(ups,"newacm") == 1 then
268     copyRequest()
269 end

左边的数字是代码所处的行数,能够参考行数来添加代码。lua

最后咱们须要在nginx的配置文件里面添加oldacm这个location,否则会提示404咯

location /oldacm/ {
        proxy_pass http://stable/;  # 服务B ip
    }

而后重启nginx便可咯。测试,没啥问题。

相关文章
相关标签/搜索