-
#!/usr/bin/env python
-
-
# Copyright (c) 2007, Bartosz Ptaszynski
-
#
-
# macsearch.py is free software; you can redistribute it and/or modify
-
# it under the terms of the GNU General Public License as published by
-
# the Free Software Foundation; either version 2 of the License, or
-
# (at your option) any later version.
-
-
"""
-
-
@author: Bartosz Ptaszynski
-
@copyright: (C) 2007 Bartosz Ptaszynski
-
@license: GNU General Public License (GPL)
-
"""
-
-
# Standard Library imports
-
from os import popen
-
from sys import argv
-
from optparse import OptionParser
-
from datetime import datetime
-
-
# 3rd Party Library imports
-
from sqlobject import *
-
from IPy import IP
-
-
-
class MAC(SQLObject):
-
ip = StringCol(length=16)
-
mac = StringCol(length=18)
-
device_type = StringCol(length=6)
-
last_seen = DateTimeCol(default=datetime.now())
-
site_name = StringCol(default="")
-
-
class Subnet(SQLObject):
-
subnet = StringCol(length=20, alternateID=True)
-
-
class MACDiscoveryAgent(object):
-
snmp_site_name = " SNMPv2-MIB::sysName.0"
-
snmp_macs = " enterprises.161.19.3.3.4.1.1"
-
snmp_device_type = " enterprises.161.19.3.3.1.7.0"
-
-
def __init__(self, community="Canopy", snmp_version="2c", db_uri="sqlite:/:memory:"):
-
self.community = community
-
self.snmp_version = snmp_version
-
self.snmpwalk = str("snmpwalk -c "+community+" -v "+snmp_version+" ")
-
self.db_uri = db_uri
-
self._db_connect()
-
-
def _init_db(self):
-
try:
-
MAC.createTable()
-
Subnet.createTable()
-
return True
-
except:
-
return False
-
-
def _db_connect(self):
-
""" Connect to the Database
-
"""
-
sqlhub.processConnection = connectionForURI(self.db_uri)
-
self._init_db()
-
-
def _db_addMAC(self, ip, mac, device_type, site_name):
-
""" Add a MAC to the Database, refresh the last_seen if it already exists
-
"""
-
oldmac = MAC.selectBy(mac=mac, ip=ip, device_type=device_type)
-
-
try:
-
if oldmac.last_seen:
-
oldmac.last_seen = datetime.now()
-
except AttributeError:
-
m = MAC(ip=ip, mac=mac, device_type=device_type, site_name=site_name)
-
#print m
-
-
def _db_removeSubnet(self, subnet):
-
""" Removes a subnet that the MAC lookup should sweep from the Database
-
"""
-
pass
-
-
def _db_removeMAC(self, mac):
-
""" Completely removes a MAC address from the database
-
"""
-
pass
-
-
def _db_lookupMAC(self, mac):
-
""" Looks up a MAC address in the Database
-
"""
-
results = MAC.selectBy(mac=mac)
-
return results
-
-
def _normalizeMAC(self, mac):
-
# TODO: use regexp here
-
if "-" in mac:
-
mac = mac.split("-")
-
elif ":" in mac:
-
mac = mac.split(":")
-
elif " " in mac:
-
mac = mac.split(" ")
-
mac = ”.join(mac)
-
mac = mac.upper()
-
return mac
-
-
def _getMACs(self, ip):
-
""" Get all MAC addresses from the device
-
"""
-
print ip
-
device_type = popen( str(self.snmpwalk + ip + self.snmp_device_type) )
-
device_type = device_type.read()
-
device_type = device_type[52:-2] # TODO: I should use regexp here!
-
macs = popen( str(self.snmpwalk + ip + self.snmp_macs) )
-
macs = macs.readlines()
-
site_name = popen( str(self.snmpwalk + ip + self.snmp_site_name) )
-
site_name = site_name.read()
-
for mac in macs:
-
self._db_addMAC(ip=ip, mac=self._normalizeMAC(mac[-19:-2]), device_type=device_type, site_name=site_name[32:]) # TODO: use regexp!
-
-
def addSubnet(self, subnet):
-
""" Add a subnet that the MAC lookup should sweep to the Database
-
"""
-
try:
-
Subnet(subnet=subnet)
-
return True
-
except:
-
return False
-
-
def sweep(self):
-
for subnet in Subnet.select():
-
for ip in IP(subnet.subnet):
-
self._getMACs(str(ip))
-
-
def lookupMAC(self, mac):
-
""" Lookup a MAC address in the Database
-
-
If successful it returns IP address of the SM with provided MAC address.
-
If not successful it returns a list with all APs and BHs that have provided MAC address.
-
It’s a list of dictionaries. This list has a format [ {’device_type’: string, ‘ip’, string}, … ]
-
-
"""
-
mac = self._normalizeMAC(mac)
-
results = self._db_lookupMAC(mac)
-
for result in results:
-
if "SM" in str(result.device_type):
-
return result
-
return results
-
-
# Example usage below…
-
-
m = MACDiscoveryAgent()
-
m.addSubnet("192.168.105.44/30")
-
print "Sweeping the network…"
-
m.sweep()
-
print "Sweeping completed."
-
print "Looking up the MAC address…"
-
res = m.lookupMAC(‘00 E0 4C A0 28 CC’)
-
if type(res) is MAC:
-
print "Found it:", res.device_type,":", res.ip, ":",res.site_name,": last seen:", res.last_seen
-
else:
-
print "Couldn’t find the SM, But found following items with this MAC in their MAC table"
-
for i in res:
-
print i.device_type, ":", i.ip, ":", i.site_name, ": last seen ", i.last_seen