# Anemon Dhcp # Copyright (C) 2005 Mathieu Ignacio -- mignacio@april.org # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA import operator from struct import unpack from struct import pack from dhcp_basic_packet import * from dhcp_constants import * from type_ipv4 import ipv4 from type_strlist import strlist class DhcpPacket(DhcpBasicPacket): # Useful function for debugging def PrintHeaders(self): print "# Header fields\n" print "readable_dhcp_headers = {" for opt in ['op','htype','hlen','hops','xid','secs','flags', 'ciaddr','yiaddr','siaddr','giaddr','chaddr','sname','file'] : begin = DhcpFields[opt][0] end = DhcpFields[opt][0]+DhcpFields[opt][1] data = self.packet_data[begin:end] if DhcpFieldsTypes[opt] == "int" : result = str(data[0]) if DhcpFieldsTypes[opt] == "int2" : result = str(data[0]*256+data[0]) if DhcpFieldsTypes[opt] == "int4" : result = str(ipv4(data).int()) if DhcpFieldsTypes[opt] == "str" : result = strlist(data).str() if DhcpFieldsTypes[opt] == "ipv4" : result = ipv4(data).str() if DhcpFieldsTypes[opt] == "hwmac" : result = "".join(map(chr,data)) line = "\t'"+opt+"':"+str(data)+",\t# "+result print line print "\t'end':'true'}" # Useful function for debugging def PrintOptions(self): print "# Options fields" print "readable_dhcp_options = {" for opt in self.options_data.keys(): data = self.options_data[opt] result = "" optnum = DhcpOptions[opt] if DhcpOptionsTypes[optnum] == "char" : result = str(data[0]) if DhcpOptionsTypes[optnum] == "16-bits" : result = str(data[0]*256+data[0]) if DhcpOptionsTypes[optnum] == "32bits" : result = str(ipv4(data).int()) if DhcpOptionsTypes[optnum] == "string" : result = strlist(data).str() if DhcpOptionsTypes[optnum] == "ipv4" : result = ipv4(data).str() if DhcpOptionsTypes[optnum] == "ipv4+" : for i in range(0,len(data),4) : if len(data[i:i+4]) == 4 : result += ipv4(data[i:i+4]).str() + " - " line = "\t'"+opt+"':"+str(data)+",\t# "+result print line print "\t'end':'true'}" # FIXME: This is called from IsDhcpSomethingPacket, but is this really # needed? Or maybe this testing should be done in # DhcpBasicPacket.DecodePacket(). # Test Packet Type def IsDhcpSomethingPacket(self,type): if self.IsDhcpPacket() == False : return False if self.IsOption("dhcp_message_type") == False : return False if self.GetOption("dhcp_message_type") != type : return False return True def IsDhcpDiscoverPacket(self): return self.IsDhcpSomethingPacket([1]) def IsDhcpOfferPacket(self): return self.IsDhcpSomethingPacket([2]) def IsDhcpRequestPacket(self): return self.IsDhcpSomethingPacket([3]) def IsDhcpDeclinePacket(self): return self.IsDhcpSomethingPacket([4]) def IsDhcpAckPacket(self): return self.IsDhcpSomethingPacket([5]) def IsDhcpNackPacket(self): return self.IsDhcpSomethingPacket([6]) def IsDhcpReleasePacket(self): return self.IsDhcpSomethingPacket([7]) def IsDhcpInformPacket(self): return self.IsDhcpSomethingPacket([8]) def GetMultipleOptions(self,options=()): result = {} for each in options: result[each] = self.GetOption(each) return result def SetMultipleOptions(self,options={}): for each in options.keys(): self.SetOption(each,options[each]) # Creating Response Packet # Server-side functions # From RFC 2132 page 28/29 def CreateDhcpOfferPacketFrom(self,src): # src = discover packet self.SetOption("htype",src.GetOption("htype")) self.SetOption("xid",src.GetOption("xid")) self.SetOption("flags",src.GetOption("flags")) self.SetOption("giaddr",src.GetOption("giaddr")) self.SetOption("chaddr",src.GetOption("chaddr")) self.SetOption("ip_address_lease_time",src.GetOption("ip_address_lease_time")) self.TransformToDhcpOfferPacket() def TransformToDhcpOfferPacket(self): self.SetOption("dhcp_message_type",[2]) self.SetOption("op",[2]) self.SetOption("hlen",[6]) self.DeleteOption("secs") self.DeleteOption("ciaddr") self.DeleteOption("request_ip_address") self.DeleteOption("parameter_request_list") self.DeleteOption("client_identifier") self.DeleteOption("maximum_message_size") """ Dhcp ACK packet creation """ def CreateDhcpAckPacketFrom(self,src): # src = request or inform packet self.SetOption("htype",src.GetOption("htype")) self.SetOption("xid",src.GetOption("xid")) self.SetOption("ciaddr",src.GetOption("ciaddr")) self.SetOption("flags",src.GetOption("flags")) self.SetOption("giaddr",src.GetOption("giaddr")) self.SetOption("chaddr",src.GetOption("chaddr")) self.SetOption("ip_address_lease_time_option",src.GetOption("ip_address_lease_time_option")) self.TransformToDhcpAckPacket() def TransformToDhcpAckPacket(self): # src = request or inform packet self.SetOption("op",[2]) self.SetOption("hlen",[6]) self.SetOption("dhcp_message_type",[5]) self.DeleteOption("secs") self.DeleteOption("request_ip_address") self.DeleteOption("parameter_request_list") self.DeleteOption("client_identifier") self.DeleteOption("maximum_message_size") """ Dhcp NACK packet creation """ def CreateDhcpNackPacketFrom(self,src): # src = request or inform packet self.SetOption("htype",src.GetOption("htype")) self.SetOption("xid",src.GetOption("xid")) self.SetOption("flags",src.GetOption("flags")) self.SetOption("giaddr",src.GetOption("giaddr")) self.SetOption("chaddr",src.GetOption("chaddr")) self.TransformToDhcpNackPacket() def TransformToDhcpNackPacket(self): self.SetOption("op",[2]) self.SetOption("hlen",[6]) self.DeleteOption("secs") self.DeleteOption("ciaddr") self.DeleteOption("yiaddr") self.DeleteOption("siaddr") self.DeleteOption("sname") self.DeleteOption("file") self.DeleteOption("request_ip_address") self.DeleteOption("ip_address_lease_time_option") self.DeleteOption("parameter_request_list") self.DeleteOption("client_identifier") self.DeleteOption("maximum_message_size") self.SetOption("dhcp_message_type",[6]) """ GetClientIdentifier """ def GetClientIdentifier(self) : if self.IsOption("client_identifier") : return self.GetOption("client_identifier") return [] def GetGiaddr(self) : return self.GetOption("giaddr") def GetHardwareAddress(self) : length = self.GetOption("hlen")[0] full_hw = self.GetOption("chaddr") if length!=[] and length