go http请求转发

go http请求转发

1.说明

  • 平常开发中会遇到须要将请求转发到其它服务器的需求:前端

    • 1.若是是前端进行转发,须要解决跨域的问题;
    • 2.后端转发到目标服务器,并返回数据到client;

咱们只讨论后端如何处理转发。后端

2. 原理

  • 转发须要对数据流解决的问题:跨域

    • 1.向目标服务器发送请求,并获取数据
    • 2.将数据返回到client
    • 3.对于client整个过程透明,不被感知到请求被转发

3. 方案

  • 解决方案:服务器

    • 1.服务端使用tcp链接模拟http请求
    • 2.使用标准库提供的设置项进行代理
  • 实现cookie

    • 1.tcp模拟转发并发

      • 先和目标服务器创建tcp链接
      • 开启goroutinue,而后将请求数据发送到目标服务器,同时将接受到的数据进行回写
      • 链接采用链接池的方式进行处理,代码较为简单,不做过多赘述
      • 此方法由于彻底屏蔽了http细节,因此有数据读取超时问题,其实就是不知道数据流是否读写结束。在http协议中通常经过Content-Length可知数据体长度,可是在未设置此头部信息时(流传输),就很难肯定数据是否读写结束。看到有文提出经过判断最后的“\r\n\r\n\r\n\r\n”来肯定结束,可是这并不严谨。在没有绝对的协商约束下,会不经意的截断body中内容,致使数据丢失。
      • 可采用设置读写超时来结束读写操做阻塞的问题,可是时间设置长短可能影响并发性能。(SetDeadLine,SetReadDeadLine,SetWriteDeadLine)
      package proxy
       
       import (
           "io"
           "log"
           "net"
           "sync"
           "sync/atomic"
           "time"
       )
       
       /**
       封装代理服务,对于http链接反馈又超时处理,注意超时问题
       */
       
       var pool = make(chan net.Conn, 100)
       
       type conn struct {
           conn  net.Conn
           wg    *sync.WaitGroup
           lock  sync.Mutex
           state int32
       }
       
       const (
           maybeValid = iota
           isValid
           isInvalid
           isInPool
           isClosed
       )
       
       type timeoutErr interface {
           Timeout() bool
       }
       
       func isTimeoutError(err error) bool {
           timeoutErr, _ := err.(timeoutErr)
           if timeoutErr == nil {
               return false
           }
           return timeoutErr.Timeout()
       }
       
       func (cn *conn) Read(b []byte) (n int, err error) {
           n, err = cn.conn.Read(b)
           if err != nil {
               if !isTimeoutError(err) {
                   atomic.StoreInt32(&cn.state, isInvalid)
               }
           } else {
               atomic.StoreInt32(&cn.state, isValid)
           }
           return
       }
       
       func (cn *conn) Write(b []byte) (n int, err error) {
           n, err = cn.conn.Write(b)
           if err != nil {
               if !isTimeoutError(err) {
                   atomic.StoreInt32(&cn.state, isInvalid)
               }
           } else {
               atomic.StoreInt32(&cn.state, isValid)
           }
           return
       }
       
       func (cn *conn) Close() error {
           atomic.StoreInt32(&cn.state, isClosed)
           return cn.conn.Close()
       }
       
       func getConn() (*conn, error) {
           var cn net.Conn
           var err error
           select {
           case cn = <-pool:
               //service.Logger.Info().Msg("get conn from pool")
           default:
               cn, err = net.Dial("tcp", "127.0.0.1:8090")
               //service.Logger.Info().Msg("get conn by new")
           }
           if err != nil {
               service.Logger.Error().Err(err).Msgf("dial to dest %s failed ", "127.0.0.1:8090")
               return nil, err
           }
           return &conn{
               conn:  cn,
               wg:    &sync.WaitGroup{},
               state: maybeValid,
           }, nil
       }
       
       func release(cn *conn) error {
           state := atomic.LoadInt32(&cn.state)
           switch state {
           case isInPool, isClosed:
               return nil
           case isInvalid:
               return cn.conn.Close()
           }
           cn.lock.Lock()
           defer cn.lock.Unlock()
           select {
           case pool <- cn.conn:
               //service.Logger.Info().Msgf("%d  %d put conn to pool",os.Getpid(),os.Getppid())
               atomic.StoreInt32(&cn.state, isInPool)
               return nil
           default:
               return cn.Close()
           }
       }
       
       func Handle(conn net.Conn) {
           if conn == nil {
               return
           }
           defer conn.Close()
           conn.SetDeadline(time.Now().Add(time.Millisecond * 100))  //设置读写超时
           client, err := getConn()
           if err != nil {
               return
           }
       
           defer release(client)
           client.conn.SetDeadline(time.Now().Add(time.Millisecond * 100)) //设置读写超时
       
           client.wg.Add(2)
           //进行转发
           go func() {
               if _, err := io.Copy(client, conn); err != nil {
                   service.Logger.Err(err).Msg("copy data to svr")
               }
               client.wg.Done()
           }()
           go func() {
               if _, err := io.Copy(conn, client); err != nil {
                   service.Logger.Err(err).Msg("copy data to conn")
               }
               client.wg.Done()
           }()
       
           client.wg.Wait()
       }
       
       func StartProxySvr() <-chan struct{} {
           exit := make(chan struct{}, 1)
           proxy_server, err := net.Listen("tcp", "8889")
           if err != nil {
               log.Printf("proxy server listen error: %v\n", err)
               exit <- struct{}{}
               return exit
           }
       
           for {
               conn, err := proxy_server.Accept()
               if err != nil {
                   log.Printf("proxy server accept error: %v\n", err)
                   exit <- struct{}{}
                   return exit
               }
               go Handle(conn)
           }
       }
    • 2.使用原生提供的http代理app

      • http.Client中的Transport可用来设置目标服务的addr
      • 详细内容请看源码说明,下文提供一个中间件样例来进行请求转发tcp

        type Client struct {
             // Transport specifies the mechanism by which individual
             // HTTP requests are made.
             // If nil, DefaultTransport is used.
             Transport RoundTripper
         
             // CheckRedirect specifies the policy for handling redirects.
             // If CheckRedirect is not nil, the client calls it before
             // following an HTTP redirect. The arguments req and via are
             // the upcoming request and the requests made already, oldest
             // first. If CheckRedirect returns an error, the Client's Get
             // method returns both the previous Response (with its Body
             // closed) and CheckRedirect's error (wrapped in a url.Error)
             // instead of issuing the Request req.
             // As a special case, if CheckRedirect returns ErrUseLastResponse,
             // then the most recent response is returned with its body
             // unclosed, along with a nil error.
             //
             // If CheckRedirect is nil, the Client uses its default policy,
             // which is to stop after 10 consecutive requests.
             CheckRedirect func(req *Request, via []*Request) error
         
             // Jar specifies the cookie jar.
             //
             // The Jar is used to insert relevant cookies into every
             // outbound Request and is updated with the cookie values
             // of every inbound Response. The Jar is consulted for every
             // redirect that the Client follows.
             //
             // If Jar is nil, cookies are only sent if they are explicitly
             // set on the Request.
             Jar CookieJar
         
             // Timeout specifies a time limit for requests made by this
             // Client. The timeout includes connection time, any
             // redirects, and reading the response body. The timer remains
             // running after Get, Head, Post, or Do return and will
             // interrupt reading of the Response.Body.
             //
             // A Timeout of zero means no timeout.
             //
             // The Client cancels requests to the underlying Transport
             // as if the Request's Context ended.
             //
             // For compatibility, the Client will also use the deprecated
             // CancelRequest method on Transport if found. New
             // RoundTripper implementations should use the Request's Context
             // for cancelation instead of implementing CancelRequest.
             Timeout time.Duration
         }
         
         //中间件样例
             http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                     proxy := func(_ *http.Request) (*url.URL, error) {
                         return url.Parse("target ip:port")//127.0.0.1:8099
                     }
                     transport := &http.Transport{
                         Proxy: proxy,
                         DialContext: (&net.Dialer{
                             Timeout:   30 * time.Second,
                             KeepAlive: 30 * time.Second,
                             DualStack: true,
                         }).DialContext,
                         MaxIdleConns:          100,
                         IdleConnTimeout:       90 * time.Second,
                         TLSHandshakeTimeout:   10 * time.Second,
                         ExpectContinueTimeout: 1 * time.Second,
                         MaxIdleConnsPerHost:   100,
                     }
         
                     client := &http.Client{Transport: transport}
                     url := "http://" + r.RemoteAddr + r.RequestURI
                     req, err := http.NewRequest(r.Method, url, r.Body)
                     //注: 设置Request头部信息
                     for k, v := range r.Header {
                         for _, vv := range v {
                             req.Header.Add(k, vv)
                         }
                     }
         
                     resp, err := client.Do(req)
                     if err != nil {
                         return
                     }
                     defer resp.Body.Close()
                     //注: 设置Response头部信息
                     for k, v := range resp.Header {
                         for _, vv := range v {
                             w.Header().Add(k, vv)
                         }
                     }
                     data, _ := ioutil.ReadAll(resp.Body)
                     w.Write(data)
         
             })

结束

本文是我的对工做中遇到的问题的总结,不够全面和深刻还请多多指教。谢谢!性能