OVS-Ryu实现流量监控+拓扑发现

笔记 SDN OVS mininet Ryu

测试环境Mininet,用真实环境或虚拟机也是可以的。 Ryu控制器(`hurray_monitor4.py`)去除了流表下发的过程,所以只是发现物理topo而不能真实连通(因为没有路由规则)。 ``` # Copyright (C) 2011 Nippon Telegraph and Telephone Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.ofproto import ofproto_v1_3 from ryu.lib.packet import packet from ryu.lib.packet import ethernet from ryu.lib.packet import ether_types from operator import attrgetter from ryu.lib import hub from ryu.topology.api import get_switch,get_link from ryu.topology import event,switches import logging app_manager.LOG.setLevel(logging.ERROR) class HurraySwitch(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self, *args, **kwargs): super(HurraySwitch, self).__init__(*args, **kwargs) self.mac_to_port = {} self.datapaths = {} self.monitor_thread = hub.spawn(self._monitor) self.topology_api_app = self self.switchs = [] self.links = [] #logging.basicConfig(level=logging.INFO) #help(self.logger) self.logger.setLevel(logging.INFO) @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER) def switch_features_handler(self, ev): datapath = ev.msg.datapath ofproto = datapath.ofproto parser = datapath.ofproto_parser # install table-miss flow entry # # We specify NO BUFFER to max_len of the output action due to # OVS bug. At this moment, if we specify a lesser number, e.g., # 128, OVS will send Packet-In with invalid buffer_id and # truncated packet data. In that case, we cannot output packets # correctly. The bug has been fixed in OVS v2.1.0. match = parser.OFPMatch() actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath, 0, match, actions) def add_flow(self, datapath, priority, match, actions, buffer_id=None): ofproto = datapath.ofproto parser = datapath.ofproto_parser inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)] if buffer_id: mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id, priority=priority, match=match, instructions=inst) else: mod = parser.OFPFlowMod(datapath=datapath, priority=priority, match=match, instructions=inst) datapath.send_msg(mod) #@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) def _packet_in_handler(self, ev): # If you hit this you might want to increase # the "miss_send_length" of your switch if ev.msg.msg_len < ev.msg.total_len: self.logger.debug("packet truncated: only %s of %s bytes", ev.msg.msg_len, ev.msg.total_len) msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto parser = datapath.ofproto_parser in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) eth = pkt.get_protocols(ethernet.ethernet)[0] if eth.ethertype == ether_types.ETH_TYPE_LLDP: # ignore lldp packet return dst = eth.dst src = eth.src dpid = datapath.id self.mac_to_port.setdefault(dpid, {}) self.logger.debug("packet in %s %s %s %s", dpid, src, dst, in_port) # learn a mac address to avoid FLOOD next time. self.mac_to_port[dpid][src] = in_port if dst in self.mac_to_port[dpid]: out_port = self.mac_to_port[dpid][dst] else: out_port = ofproto.OFPP_FLOOD actions = [parser.OFPActionOutput(out_port)] # install a flow to avoid packet_in next time if out_port != ofproto.OFPP_FLOOD: match = parser.OFPMatch(in_port=in_port, eth_dst=dst) # verify if we have a valid buffer_id, if yes avoid to send both # flow_mod & packet_out ################################################### # if msg.buffer_id != ofproto.OFP_NO_BUFFER: # self.add_flow(datapath, 1, match, actions, msg.buffer_id) # return # else: # self.add_flow(datapath, 1, match, actions) data = None if msg.buffer_id == ofproto.OFP_NO_BUFFER: data = msg.data out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id, in_port=in_port, actions=actions, data=data) datapath.send_msg(out) @set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev): datapath = ev.datapath if ev.state == MAIN_DISPATCHER: if datapath.id not in self.datapaths: self.logger.info('register datapath: %016x', datapath.id) self.datapaths[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.datapaths: self.logger.info('unregister datapath: %016x', datapath.id) del self.datapaths[datapath.id] def _monitor(self): while True: for dp in self.datapaths.values(): self._request_stats(dp) self.get_topology() hub.sleep(10) def _request_stats(self, datapath): self.logger.debug('send stats request: %016x', datapath.id) ofproto = datapath.ofproto parser = datapath.ofproto_parser req = parser.OFPFlowStatsRequest(datapath) datapath.send_msg(req) req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY) datapath.send_msg(req) @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER) def _flow_stats_reply_handler(self, ev): body = ev.msg.body self.logger.info('datapath ' 'in-port eth-dst ' 'out-port packets bytes') self.logger.info('---------------- ' '-------- ----------------- ' '-------- -------- --------') for stat in sorted([flow for flow in body if flow.priority == 1], key=lambda flow: (flow.match['in_port'], flow.match['eth_dst'])): self.logger.info('%016x %8x %17s %8x %8d %8d', ev.msg.datapath.id, stat.match['in_port'], stat.match['eth_dst'], stat.instructions[0].actions[0].port, stat.packet_count, stat.byte_count) @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER) def _port_stats_reply_handler(self, ev): body = ev.msg.body self.logger.info('datapath port ' 'rx-pkts rx-bytes rx-error ' 'tx-pkts tx-bytes tx-error') self.logger.info('---------------- -------- ' '-------- -------- -------- ' '-------- -------- --------') for stat in sorted(body, key=attrgetter('port_no')): self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d', ev.msg.datapath.id, stat.port_no, stat.rx_packets, stat.rx_bytes, stat.rx_errors, stat.tx_packets, stat.tx_bytes, stat.tx_errors) #@set_ev_cls(event.EventSwitchEnter) def get_topology(self): self.logger.debug('start') switch_list=get_switch(self.topology_api_app,None) # assign mac for swtich to easy read links_list=get_link(self.topology_api_app,None) # self.logger.info('==============================') # self.logger.info([link.to_dict() for link in links_list]) # self.logger.info('==============================') # self.logger.info([switch.to_dict() for switch in switch_list]) # self.logger.info('==============================') self.switchs = [] for switch in switch_list: dic = switch.to_dict() dpid = dic['dpid'] self.switchs += [dpid] self.links = [] for link in links_list: dic = link.to_dict() src = dic['src']['dpid'] dst = dic['dst']['dpid'] self.links += [(src, dst)] self.logger.info('************************ SWITCHS *************************') self.logger.info(self.switchs) self.logger.info('************************ LINKS *************************') self.logger.info(self.links) self.logger.info('*************************************************') ``` 如果将83行的注释去掉,则和`simple_switch_13.py`是一样的,自动实现流表,可以ping通,但拓扑不能有环(这个文件里没有生成树实现)。 控制器运行(带Gui,可以到8080网页端查看拓扑): ``` ryu-manager --verbose --observe-link gui_topology/gui_topology.py hurray_monitor4.py ``` mininet运行(控制器地址请不要填错) ``` mn --topo=tree,3 --controller remote,ip=???? 或 mn --topo=torus,3,3 --controller remote 或 mn --topo=linear,6 --controller remote 等 ``` 控制器每10s监控一次拓扑结构和流表流量(如果有流表的话) 运行结果如图: ![](https://file.hurray0.com/uploads/menu/54/d19177a410148959b9a45925b24acf75.png)