FloodlightProvider 使用 Netty 库来处理到交换机的线程和链接。
每一个 OpenFlow 消息将经过一个 Netty 的线程进行处理,并执行与全部模块的消息相关联的全部逻辑
其余模块也能够注册相似交换机链接或断开和端口状态通知特定时间。
为了使模块注册为基于 OpenFlow 消息的,必须实现 IOFMessageListener 接口java
要监听 OpenFlow 消息,要先向 FloodlightProvider 注册
调用 IFloodlightProviderService(具体由 Controller 类实现)的 addOFMessageListener 方法进行注册订阅
核心工做是在 ListenerDispatcher 类来完成。
每次增长观察者都会判断是不是终结点(也就是不被其余的 Listener 所依赖),由于最终肯定这些观察者顺序的时候就是由这些终结点开始往前进行 DFS 遍历获得node
@Override public synchronized void addOFMessageListener(OFType type, IOFMessageListener listener) { //先判断与type对应的 ListenerDispatcher对象是否存在 ListenerDispatcher<OFType, IOFMessageListener> ldd = messageListeners.get(type); if (ldd == null) { ldd = new ListenerDispatcher<OFType, IOFMessageListener>(); messageListeners.put(type, ldd); } //注册监听type这个消息; ldd.addListener(type, listener); }
volatile List<T> listeners = new ArrayList<T>();
//每一个OF msg都有惟一的ListenerDispatcher对象,观察者存在listeners链表中缓存
private boolean ispre(U type, T l1, T l2) { return (l2.isCallbackOrderingPrereq(type, l1.getName()) || l1.isCallbackOrderingPostreq(type, l2.getName())); }
返回两个传入的监听器的顺序安全
public void addListener(U type, T listener) { List<T> newlisteners = new ArrayList<T>(); if (listeners != null) newlisteners.addAll(listeners); newlisteners.add(listener); // Find nodes without outgoing edges // 查找没有出边的节点 List<T> terminals = new ArrayList<T>(); for (T i : newlisteners) { boolean isterm = true; for (T j : newlisteners) { if (ispre(type, i, j)) { //两个都不关心先后顺序的时候 isterm = false; break; } } if (isterm) { //关乎有先后顺序的监听模块存入 terminals.add(i); } } if (terminals.size() == 0) { logger.error("No listener dependency solution: " + "No listeners without incoming dependencies"); listeners = newlisteners; return; } // visit depth-first traversing in the opposite order from // the dependencies. Note we will not generally detect cycles /** * 以相反顺序访问深度优先遍历依赖。 注意咱们一般不会检测周期 */ HashSet<T> visited = new HashSet<T>(); List<T> ordering = new ArrayList<T>(); for (T term : terminals) { //进行排序 visit(newlisteners, type, visited, ordering, term); } listeners = ordering; }
private void visit(List<T> newlisteners, U type, HashSet<T> visited, List<T> ordering, T listener) { if (!visited.contains(listener)) { visited.add(listener); for (T i : newlisteners) { if (ispre(type, i, listener)) { visit(newlisteners, type, visited, ordering, i); } } ordering.add(listener); // } }
public interface IListener<T> public enum Command { CONTINUE, STOP } 状态值,用来判断是否继续执行 public String getName(); //用来判断 name 的这个模块是否要在当前对象以前执行 public boolean isCallbackOrderingPrereq(T type, String name); //用来判断 name 的这个模块是否要在当前对象以后执行 public boolean isCallbackOrderingPostreq(T type, String name); IOFMessageListener接口继承了 IListener 接口,同时定义了 receive 方法 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx); 返回 CONTINUE 或者 STOP,继续看每一个继承这个接口的模块的重写
TopologyManager 模块的IOFMessageListener 重写的方法:网络
@Override public String getName() { return MODULE_NAME; //此处为 topology,每一个模块都有本身的 MODULE_NAME } @Override public boolean isCallbackOrderingPrereq(OFType type, String name) { //今后处能够看出,在执行这个模块以前,须要先执行 MODULE_NAME 为 linkiscovery 的模块 return "linkdiscovery".equals(name); } @Override public boolean isCallbackOrderingPostreq(OFType type, String name) { return false; } @Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment();//计数器,加一 //调用这里的执行方法 return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
经过 Type Hierarchy 能够找到Packet-In消息处理顺序的几个模块数据结构
基本数据结构,这是一个上下文对象,Floodlight代码监听器能够注册它,稍后能够检索与事件相关联的上下文信息app
public class FloodlightContext { protected ConcurrentHashMap<String, Object> storage = new ConcurrentHashMap<String, Object>(); public ConcurrentHashMap<String, Object> getStorage() { return storage; } }
建立了一个 HashMap storage,tcp
public class FloodlightContextStore<V> { @SuppressWarnings("unchecked") public V get(FloodlightContext bc, String key) { return (V)bc.storage.get(key); } public void put(FloodlightContext bc, String key, V value) { bc.storage.put(key, value); } public void remove(FloodlightContext bc, String key) { bc.storage.remove(key); } }
一个FloodlightContextStore对象,可用于PACKET-IN有效内容,消息对象是Ethernet类型ide
public static final FloodlightContextStore<Ethernet> bcStore = new FloodlightContextStore<Ethernet>();
IOFMessageListener 的 receive 方法oop
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment(); return this.handlePacketIn(sw.getId(), (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
主要使用了 handlePacketIn()方法
protected Command handlePacketIn(DatapathId sw, OFPacketIn pi, FloodlightContext cntx) { //提取 Packet-In 的有效分组内容 Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT)); if (eth.getPayload() instanceof BSN) { BSN bsn = (BSN) eth.getPayload(); if (bsn == null) return Command.STOP; if (bsn.getPayload() == null) return Command.STOP; // It could be a packet other than BSN LLDP, therefore // continue with the regular processing. // 它能够是除BSN LLDP以外的分组,所以继续进行常规处理。 if (bsn.getPayload() instanceof LLDP == false) return Command.CONTINUE; return handleLldp((LLDP) bsn.getPayload(), sw, inPort, false, cntx); } else if (eth.getPayload() instanceof LLDP) { return handleLldp((LLDP) eth.getPayload(), sw, inPort, true, cntx); } else if (eth.getEtherType().getValue() < 1536 && eth.getEtherType().getValue() >= 17) { long destMac = eth.getDestinationMACAddress().getLong(); if ((destMac & LINK_LOCAL_MASK) == LINK_LOCAL_VALUE) { ctrLinkLocalDrops.increment(); if (log.isTraceEnabled()) { log.trace("Ignoring packet addressed to 802.1D/Q " + "reserved address."); } return Command.STOP; } } else if (eth.getEtherType().getValue() < 17) { log.error("Received invalid ethertype of {}.", eth.getEtherType()); return Command.STOP; } if (ignorePacketInFromSource(eth.getSourceMACAddress())) { ctrIgnoreSrcMacDrops.increment(); return Command.STOP; } // If packet-in is from a quarantine port, stop processing. NodePortTuple npt = new NodePortTuple(sw, inPort); if (quarantineQueue.contains(npt)) { ctrQuarantineDrops.increment(); return Command.STOP; } return Command.CONTINUE; }
IOFMessageListener 的 receive 方法 @Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment(); return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; } 主要使用了processPacketInMessage()方法 protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { // get the packet-in switch. Ethernet eth = IFloodlightProviderService.bcStore. get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD); if (eth.getPayload() instanceof BSN) { BSN bsn = (BSN) eth.getPayload(); if (bsn == null) return Command.STOP; if (bsn.getPayload() == null) return Command.STOP; // 可能不是 BSN LLDP,继续常规处理 if (bsn.getPayload() instanceof LLDP == false) return Command.CONTINUE; doFloodBDDP(sw.getId(), pi, cntx); return Command.STOP; } else { return dropFilter(sw.getId(), pi, cntx); } }
设备管理器经过 PACKET-IN 消息请求了解设备,经过 PACKET-IN 消息获取信息,根据实体如何创建进行分类。默认状况下,entity classifies 使用 MAC 地址和 VLAN 来识别设备。这两个属性定义一个独一无二的设备。设备管理器将了解其余属性,如 IP 地址。
信息中的一个重要的部分是设备的链接点,若是一个交换机接受到一个 PACKET-IN 消息,则交换机将会建立一个链接点,设备也会根据时间清空链接点,IP 地址,以及设备自己,最近看到的时间戳是用来保持清空过程的控制
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: cntIncoming.increment(); return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
主要使用了processPacketInMessage()方法
protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD); OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT)); // Extract source entity information Entity srcEntity = getSourceEntityFromPacket(eth, sw.getId(), inPort); if (srcEntity == null) { cntInvalidSource.increment(); return Command.STOP; } // Learn from ARP packet for special VRRP settings. // In VRRP settings, the source MAC address and sender MAC // addresses can be different. In such cases, we need to learn // the IP to MAC mapping of the VRRP IP address. The source // entity will not have that information. Hence, a separate call // to learn devices in such cases. learnDeviceFromArpResponseData(eth, sw.getId(), inPort); // Learn/lookup device information Device srcDevice = learnDeviceByEntity(srcEntity); if (srcDevice == null) { cntNoSource.increment(); return Command.STOP; } // Store the source device in the context fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice); // Find the device matching the destination from the entity // classes of the source. if (eth.getDestinationMACAddress().getLong() == 0) { cntInvalidDest.increment(); return Command.STOP; } Entity dstEntity = getDestEntityFromPacket(eth); Device dstDevice = null; if (dstEntity != null) { dstDevice = findDestByEntity(srcDevice.getEntityClass(), dstEntity); if (dstDevice != null) fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice); else cntNoDest.increment(); } else { cntNoDest.increment(); } if (logger.isTraceEnabled()) { logger.trace("Received PI: {} on switch {}, port {} *** eth={}" + " *** srcDev={} *** dstDev={} *** ", new Object[] { pi, sw.getId().toString(), inPort, eth, srcDevice, dstDevice }); } snoopDHCPClientName(eth, srcDevice); return Command.CONTINUE; }
在 Floodlight 启动时,没有虚拟网络建立,这时主机之间不能相互通讯。
一旦用户建立虚拟网络,则主机就可以被添加。
在 PACKET-IN 消息转发实现前,模块将启动。
一旦,一条 PACKET-IN 消息被接受,模块将查看源 MAC 地址和目的 MAC 地址,若是2个 MAC 地址是同一个虚拟网络,模块将返回 Command.CONINUE消息,而且继续处理流。若是MAC 地址不在同一个虚拟网络则返回 Command.STOP 消息,并丢弃包
该模块可用于 OpenStack 的部署
包含此模块的默认配置文件位置:
src/main/resources/neutron.properties
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: return processPacketIn(sw, (OFPacketIn)msg, cntx); default: break; } log.warn("Received unexpected message {}", msg); return Command.CONTINUE; }
主要使用了processPacketIn()方法
protected Command processPacketIn(IOFSwitch sw, OFPacketIn msg, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); Command ret = Command.STOP; String srcNetwork = macToGuid.get(eth.getSourceMACAddress()); // If the host is on an unknown network we deny it. // We make exceptions for ARP and DHCP. if (eth.isBroadcast() || eth.isMulticast() || isDefaultGateway(eth) || isDhcpPacket(eth)) { ret = Command.CONTINUE; } else if (srcNetwork == null) { log.trace("Blocking traffic from host {} because it is not attached to any network.", eth.getSourceMACAddress().toString()); ret = Command.STOP; } else if (oneSameNetwork(eth.getSourceMACAddress(), eth.getDestinationMACAddress())) { // if they are on the same network continue ret = Command.CONTINUE; } if (log.isTraceEnabled()) log.trace("Results for flow between {} and {} is {}", new Object[] {eth.getSourceMACAddress(), eth.getDestinationMACAddress(), ret}); /* * TODO - figure out how to still detect gateways while using * drop mods if (ret == Command.STOP) { if (!(eth.getPayload() instanceof ARP)) doDropFlow(sw, msg, cntx); } */ return ret; }
IOFMessageListener 的 receive 方法
@Override public net.floodlightcontroller.core.IListener.Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: return processPacketIn(sw, (OFPacketIn)msg, cntx); default: break; } log.warn("Received unexpected message {}", msg); return Command.CONTINUE; }
主要使用了processPacketIn()方法
private net.floodlightcontroller.core.IListener.Command processPacketIn(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); IPacket pkt = eth.getPayload(); if (eth.isBroadcast() || eth.isMulticast()) { // handle ARP for VIP if (pkt instanceof ARP) { // retrieve arp to determine target IP address ARP arpRequest = (ARP) eth.getPayload(); IPv4Address targetProtocolAddress = arpRequest.getTargetProtocolAddress(); if (vipIpToId.containsKey(targetProtocolAddress.getInt())) { String vipId = vipIpToId.get(targetProtocolAddress.getInt()); vipProxyArpReply(sw, pi, cntx, vipId); return Command.STOP; } } } else { // currently only load balance IPv4 packets - no-op for other traffic if (pkt instanceof IPv4) { IPv4 ip_pkt = (IPv4) pkt; // If match Vip and port, check pool and choose member int destIpAddress = ip_pkt.getDestinationAddress().getInt(); if (vipIpToId.containsKey(destIpAddress)){ IPClient client = new IPClient(); client.ipAddress = ip_pkt.getSourceAddress(); client.nw_proto = ip_pkt.getProtocol(); if (ip_pkt.getPayload() instanceof TCP) { TCP tcp_pkt = (TCP) ip_pkt.getPayload(); client.srcPort = tcp_pkt.getSourcePort(); client.targetPort = tcp_pkt.getDestinationPort(); } if (ip_pkt.getPayload() instanceof UDP) { UDP udp_pkt = (UDP) ip_pkt.getPayload(); client.srcPort = udp_pkt.getSourcePort(); client.targetPort = udp_pkt.getDestinationPort(); } if (ip_pkt.getPayload() instanceof ICMP) { client.srcPort = TransportPort.of(8); client.targetPort = TransportPort.of(0); } LBVip vip = vips.get(vipIpToId.get(destIpAddress)); if (vip == null) // fix dereference violations return Command.CONTINUE; LBPool pool = pools.get(vip.pickPool(client)); if (pool == null) // fix dereference violations return Command.CONTINUE; LBMember member = members.get(pool.pickMember(client)); if(member == null) //fix dereference violations return Command.CONTINUE; // for chosen member, check device manager and find and push routes, in both directions pushBidirectionalVipRoutes(sw, pi, cntx, client, member); // packet out based on table rule pushPacket(pkt, sw, pi.getBufferId(), (pi.getVersion().compareTo(OFVersion.OF_12) < 0) ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT), OFPort.TABLE, cntx, true); return Command.STOP; } } } // bypass non-load-balanced traffic for normal processing (forwarding) return Command.CONTINUE; }
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: IRoutingDecision decision = null; if (cntx != null) { decision = RoutingDecision.rtStore.get(cntx, IRoutingDecision.CONTEXT_DECISION); } return this.processPacketInMessage(sw, (OFPacketIn) msg, decision, cntx); default: break; } return Command.CONTINUE; }
主要使用了processPacketInMessage()方法
public abstract Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, IRoutingDecision decision, FloodlightContext cntx);
全部继承了 ForwardingBase 的子类Forwarding重写了这个方法,实现具体的操做
@Override public Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, IRoutingDecision decision, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); // We found a routing decision (i.e. Firewall is enabled... it's the only thing that makes RoutingDecisions) if (decision != null) { if (log.isTraceEnabled()) { log.trace("Forwarding decision={} was made for PacketIn={}", decision.getRoutingAction().toString(), pi); } switch(decision.getRoutingAction()) { case NONE: // don't do anything return Command.CONTINUE; case FORWARD_OR_FLOOD: case FORWARD: doForwardFlow(sw, pi, decision, cntx, false); return Command.CONTINUE; case MULTICAST: // treat as broadcast doFlood(sw, pi, decision, cntx); return Command.CONTINUE; case DROP: doDropFlow(sw, pi, decision, cntx); return Command.CONTINUE; default: log.error("Unexpected decision made for this packet-in={}", pi, decision.getRoutingAction()); return Command.CONTINUE; } } else { // No routing decision was found. Forward to destination or flood if bcast or mcast. if (log.isTraceEnabled()) { log.trace("No decision was made for PacketIn={}, forwarding", pi); } if (eth.isBroadcast() || eth.isMulticast()) { doFlood(sw, pi, decision, cntx); } else { doForwardFlow(sw, pi, decision, cntx, false); } } return Command.CONTINUE; }