__all__ = ["ethcard"]
import tuned.logs
from subprocess import *
import re
log = tuned.logs.get()
class Nettool:
_advertise_values = { # [ half, full ]
10 : [ 0x001, 0x002 ],
100 : [ 0x004, 0x008 ],
1000 : [ 0x010, 0x020 ],
2500 : [ 0, 0x8000 ],
10000 : [ 0, 0x1000 ],
"auto" : 0x03F
}
_disabled = False
def __init__(self, interface):
self._interface = interface;
self.update()
log.debug("%s: speed %s, full duplex %s, autoneg %s, link %s" % (interface, self.speed, self.full_duplex, self.autoneg, self.link))
log.debug("%s: supports: autoneg %s, modes %s" % (interface, self.supported_autoneg, self.supported_modes))
log.debug("%s: advertises: autoneg %s, modes %s" % (interface, self.advertised_autoneg, self.advertised_modes))
# def __del__(self):
# if self.supported_autoneg:
# self._set_advertise(self._advertise_values["auto"])
def _clean_status(self):
self.speed = 0
self.full_duplex = False
self.autoneg = False
self.link = False
self.supported_modes = []
self.supported_autoneg = False
self.advertised_modes = []
self.advertised_autoneg = False
def _calculate_mode(self, modes):
mode = 0;
for m in modes:
mode += self._advertise_values[m[0]][ 1 if m[1] else 0 ]
return mode
def _set_autonegotiation(self, enable):
if self.autoneg == enable:
return True
if not self.supported_autoneg:
return False
return 0 == call(["ethtool", "-s", self._interface, "autoneg", "on" if enable else "off"], close_fds=True)
def _set_advertise(self, value):
if not self._set_autonegotiation(True):
return False
return 0 == call(["ethtool", "-s", self._interface, "advertise", "0x%03x" % value], close_fds=True)
def get_max_speed(self):
max = 0
for mode in self.supported_modes:
if mode[0] > max: max = mode[0]
if max > 0:
return max
else:
return 1000
def set_max_speed(self):
if self._disabled or not self.supported_autoneg:
return False
#if self._set_advertise(self._calculateMode(self.supported_modes)):
if self._set_advertise(self._advertise_values["auto"]):
self.update()
return True
else:
return False
def set_speed(self, speed):
if self._disabled or not self.supported_autoneg:
return False
mode = 0
for am in self._advertise_values:
if am == "auto": continue
if am <= speed:
mode += self._advertise_values[am][0];
mode += self._advertise_values[am][1];
effective_mode = mode & self._calculate_mode(self.supported_modes)
log.debug("%s: set_speed(%d) - effective_mode 0x%03x" % (self._interface, speed, effective_mode))
if self._set_advertise(effective_mode):
self.update()
return True
else:
return False
def update(self):
if self._disabled:
return
# run ethtool and preprocess output
p_ethtool = Popen(["ethtool", self._interface], \
stdout=PIPE, stderr=PIPE, close_fds=True, \
universal_newlines = True)
p_filter = Popen(["sed", "s/^\s*//;s/:\s*/:\\n/g"], \
stdin=p_ethtool.stdout, stdout=PIPE, \
universal_newlines = True, \
close_fds=True)
output = p_filter.communicate()[0]
errors = p_ethtool.communicate()[1]
if errors != "":
log.warning("%s: some errors were reported by 'ethtool'" % self._interface)
log.debug("%s: %s" % (self._interface, errors.replace("\n", r"\n")))
self._clean_status()
self._disabled = True
return
# parses output - kind of FSM
self._clean_status()
re_speed = re.compile(r"(\d+)")
re_mode = re.compile(r"(\d+)baseT/(Half|Full)")
state = "wait"
for line in output.split("\n"):
if line.endswith(":"):
section = line[:-1]
if section == "Speed": state = "speed"
elif section == "Duplex": state = "duplex"
elif section == "Auto-negotiation": state = "autoneg"
elif section == "Link detected": state = "link"
elif section == "Supported link modes": state = "supported_modes"
elif section == "Supports auto-negotiation": state = "supported_autoneg"
elif section == "Advertised link modes": state = "advertised_modes"
elif section == "Advertised auto-negotiation": state = "advertised_autoneg"
else: state = "wait"
del section
elif state == "speed":
# Try to determine speed. If it fails, assume 1gbit ethernet
try:
self.speed = re_speed.match(line).group(1)
except:
self.speed = 1000
state = "wait"
elif state == "duplex":
self.full_duplex = line == "Full"
state = "wait"
elif state == "autoneg":
self.autoneg = (line == "yes" or line == "on")
state = "wait"
elif state == "link":
self.link = line == "yes"
state = "wait"
elif state == "supported_modes":
# Try to determine supported modes. If it fails, assume 1gibt ethernet fullduplex works
try:
for m in line.split():
(s, d) = re_mode.match(m).group(1,2)
self.supported_modes.append( (int(s), d == "Full") )
del m,s,d
except:
self.supported_modes.append((1000, True))
elif state == "supported_autoneg":
self.supported_autoneg = line == "Yes"
state = "wait"
elif state == "advertised_modes":
# Try to determine advertised modes. If it fails, assume 1gibt ethernet fullduplex works
try:
if line != "Not reported":
for m in line.split():
(s, d) = re_mode.match(m).group(1,2)
self.advertised_modes.append( (int(s), d == "Full") )
del m,s,d
except:
self.advertised_modes.append((1000, True))
elif state == "advertised_autoneg":
self.advertised_autoneg = line == "Yes"
state = "wait"
def ethcard(interface):
if not interface in ethcard.list:
ethcard.list[interface] = Nettool(interface)
return ethcard.list[interface]
ethcard.list = {}