以前在k8s与网络--Flannel解读一文中,咱们主要讲了Flannel总体的工做原理。今天主要针对Flannel v0.10.0版本进行源码分析。首先须要理解三个比较重要的概念:ios
总体的代码组织以下:正则表达式
除了可执行文件的入口 main.go以外,有backend,network,pkg和subnet这么几个代码相关的文件夹。docker
name | 默认值 | 说明 |
---|---|---|
etcd-endpoints | http://127.0.0.1:4001,http://127.0.0.1:2379 | etcd终端节点列表 |
etcd-prefix | /coreos.com/network | etcd 前缀 |
etcd-keyfile | 无 | SSL key文件 |
etcd-certfile | 无 | SSL certification 文件 |
etcd-cafile | 无 | SSL Certificate Authority 文件 |
etcd-username | 无 | 经过BasicAuth访问etcd 的用户名 |
etcd-password | 无 | 经过BasicAuth访问etcd 的密码 |
iface | 无 | 完整的网卡名或ip地址 |
iface-regex | 无 | 正则表达式表示的网卡名或ip地址 |
subnet-file | /run/flannel/subnet.env | 存放运行时须要的一些变量 (subnet, MTU, ... )的文件名 |
public-ip | 无 | 主机IP |
subnet-lease-renew-margin | 60分钟 | 在租约到期以前多长时间进行更新 |
ip-masq | false | 是否为覆盖网络外部的流量设置IP假装规则 |
kube-subnet-mgr | false | 是否使用k8s做为subnet的实现方式 |
kube-api-url | "" | Kubernetes API server URL ,若是集群内部署,则不须要设置,作好rbac受权便可 |
kubeconfig-file | "" | kubeconfig file 位置,若是集群内部署,则不须要设置,作好rbac受权便可 |
healthz-ip | 0.0.0.0 | 要监听的healthz服务器的IP地址 |
healthz-port | 0 | 要监听的healthz服务器的端口,0 表示停用 |
从main函数开始分析,主要步骤以下:json
if opts.subnetLeaseRenewMargin >= 24*60 || opts.subnetLeaseRenewMargin <= 0 { log.Error("Invalid subnet-lease-renew-margin option, out of acceptable range") os.Exit(1) }
须要小于等于24h,大于0。segmentfault
假如主机有多个网卡,flannel会使用哪个?
这就和我们前面提到的iface和iface-regex两个参数有关。这两个参数每个能够指定多个。flannel将按照下面的优先顺序来选取:
1) 若是”–iface”和”—-iface-regex”都未指定时,则直接选取默认路由所使用的输出网卡后端
2) 若是”–iface”参数不为空,则依次遍历其中的各个实例,直到找到和该网卡名或IP匹配的实例为止api
3) 若是”–iface-regex”参数不为空,操做方式和2)相同,惟一不一样的是使用正则表达式去匹配服务器
最后,对于集群间交互的Public IP,咱们一样能够经过启动参数”–public-ip”进行指定。不然,将使用上文中获取的网卡的IP做为Public IP。网络
外部接口的定义以下:数据结构
type ExternalInterface struct { Iface *net.Interface IfaceAddr net.IP ExtAddr net.IP }
func newSubnetManager() (subnet.Manager, error) { if opts.kubeSubnetMgr { return kube.NewSubnetManager(opts.kubeApiUrl, opts.kubeConfigFile) } cfg := &etcdv2.EtcdConfig{ Endpoints: strings.Split(opts.etcdEndpoints, ","), Keyfile: opts.etcdKeyfile, Certfile: opts.etcdCertfile, CAFile: opts.etcdCAFile, Prefix: opts.etcdPrefix, Username: opts.etcdUsername, Password: opts.etcdPassword, } // Attempt to renew the lease for the subnet specified in the subnetFile prevSubnet := ReadSubnetFromSubnetFile(opts.subnetFile) return etcdv2.NewLocalManager(cfg, prevSubnet) }
子网管理器负责子网的建立、更新、添加、删除、监听等,主要和 etcd 打交道,定义:
type Manager interface { GetNetworkConfig(ctx context.Context) (*Config, error) AcquireLease(ctx context.Context, attrs *LeaseAttrs) (*Lease, error) RenewLease(ctx context.Context, lease *Lease) error WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) WatchLeases(ctx context.Context, cursor interface{}) (LeaseWatchResult, error) Name() string }
config, err := getConfig(ctx, sm) if err == errCanceled { wg.Wait() os.Exit(0) }
这个配置主要是管理网络的配置,须要在flannel启动以前写到etcd中。例如:
{ "Network": "10.0.0.0/8", "SubnetLen": 20, "SubnetMin": "10.10.0.0", "SubnetMax": "10.99.0.0", "Backend": { "Type": "udp", "Port": 7890 } }
/coreos.com/network/config 保存着上面网络配置数据。
详细解读一下:
bm := backend.NewManager(ctx, sm, extIface) be, err := bm.GetBackend(config.BackendType) if err != nil { log.Errorf("Error fetching backend: %s", err) cancel() wg.Wait() os.Exit(1) } bn, err := be.RegisterNetwork(ctx, config) if err != nil { log.Errorf("Error registering network: %s", err) cancel() wg.Wait() os.Exit(1) } ... log.Info("Running backend.") wg.Add(1) go func() { bn.Run(ctx) wg.Done() }()
backend管理器
type manager struct { ctx context.Context sm subnet.Manager extIface *ExternalInterface mux sync.Mutex active map[string]Backend wg sync.WaitGroup }
主要是提供了GetBackend(backendType string) (Backend, error)方法,根据配置文件的设置backend标志,生产对应的backend。
此处注意
go func() { <-bm.ctx.Done() // TODO(eyakubovich): this obviosly introduces a race. // GetBackend() could get called while we are here. // Currently though, all backends' Run exit only // on shutdown bm.mux.Lock() delete(bm.active, betype) bm.mux.Unlock() bm.wg.Done() }()
在生产backend之后,会启动一个协程,在flanneld退出运行以前,将会执行激活的backend map中删除操做。
最后run方法:
func (n *RouteNetwork) Run(ctx context.Context) { wg := sync.WaitGroup{} log.Info("Watching for new subnet leases") evts := make(chan []subnet.Event) wg.Add(1) go func() { subnet.WatchLeases(ctx, n.SM, n.SubnetLease, evts) wg.Done() }() n.routes = make([]netlink.Route, 0, 10) wg.Add(1) go func() { n.routeCheck(ctx) wg.Done() }() defer wg.Wait() for { select { case evtBatch := <-evts: n.handleSubnetEvents(evtBatch) case <-ctx.Done(): return } } }
run方法中主要是执行:
事件主要是subnet.EventAdded和subnet.EventRemoved两个。
添加子网事件发生时的处理步骤:检查参数是否正常,根据参数构建路由表项,把路由表项添加到主机,把路由表项添加到本身的数据结构中。
删除子网事件发生时的处理步骤:检查参数是否正常,根据参数构建路由表项,把路由表项从主机删除,把路由表项从管理的数据结构中删除
除了上面的核心的逻辑,还有一些iptables规则和SubnetFile相关的操做。
// Set up ipMasq if needed if opts.ipMasq { go network.SetupAndEnsureIPTables(network.MasqRules(config.Network, bn.Lease())) } // Always enables forwarding rules. This is needed for Docker versions >1.13 (https://docs.docker.com/engine/userguide/networking/default_network/container-communication/#container-communication-between-hosts) // In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT. // In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP. go network.SetupAndEnsureIPTables(network.ForwardRules(config.Network.String()))
能够看出主要是调用了network文件里的SetupAndEnsureIPTables方法。
PS在Docker 1.13及更高版本中,Docker设置了FORWARD的默认策略是drop,因此须要flannel作一些工做。