#!/usr/bin/env python """Python implementation of the Unix traceroute program""" import argparse import os import socket import struct import sys import time ICMP_ECHO_REQUEST = 8 ICMP_ECHO_REPLY = 0 ICMP_TIME_EXCEEDED = 11 def checksum(str_): """The ICMP checksum as defined in RFC 792""" str_ = bytearray(str_) csum = 0 count_to = (len(str_) // 2) * 2 for count in range(0, count_to, 2): this_val = str_[count+1] * 256 + str_[count] csum = csum + this_val csum = csum & 0xffffffff if count_to < len(str_): csum = csum + str_[-1] csum = csum & 0xffffffff csum = (csum >> 16) + (csum & 0xffff) csum = csum + (csum >> 16) answer = ~csum answer = answer & 0xffff answer = answer >> 8 | (answer << 8 & 0xff00) return answer def receive_one_ping(my_socket, rec_id, timeout): """Attempt to receive a ping. Arguments: my_socket -- a Python socket to receive from rec_id -- only receive packets with this identifier (in case of echo reply) timeout -- timeout in seconds Return value: A tuple (responding server, icmp message type) """ start_time = time.time() while (start_time + timeout - time.time()) > 0: try: rec_packet, _ = my_socket.recvfrom(1024) except socket.timeout: break # timed out # Fetch the ICMPHeader fromt the IP icmp_header = rec_packet[20:28] source_ip = '%d.%d.%d.%d' % struct.unpack("BBBB", rec_packet[12:16]) icmp_type, _, _, packet_id, _ = struct.unpack("bbHHh", icmp_header) if icmp_type == ICMP_TIME_EXCEEDED or \ icmp_type == ICMP_ECHO_REPLY and packet_id == rec_id: return (source_ip, icmp_type) return None, None def send_one_ping(my_socket, dest_addr, snd_id, ttl=30): """Send an ICMP echo message""" # Make a dummy packet with a 0 checksum header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, 0, snd_id, 1) data = struct.pack("d", time.time()) # Calculate the checksum on the data and the dummy header. my_checksum = socket.htons(checksum(header + data)) if sys.platform == 'darwin': my_checksum = my_checksum & 0xffff header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, snd_id, 1) packet = header + data my_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) my_socket.sendto(packet, (dest_addr, 1)) def do_one_ping(dest_addr, timeout, ttl=30): """Send one echo and receive either an echo reply or a time exceeded msg""" icmp = socket.getprotobyname("icmp") my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) my_socket.settimeout(timeout) my_id = os.getpid() & 0xffff send_one_ping(my_socket, dest_addr, my_id, ttl) response = receive_one_ping(my_socket, my_id, timeout) my_socket.close() return response def traceroute(host, timeout=1.0, hops=10): """Python version of the Unix traceroute program""" dest = socket.gethostbyname(host) print 'traceroute to %s (%s), %d hops max' % (host, dest, hops) i = 0 while i < hops: i += 1 address, icmp_type = do_one_ping(dest, timeout, ttl=i) if address == None: print '%2d *' % i else: name, _, _ = socket.gethostbyaddr(address) print '%2d %s (%s)' % (i, name, address) if icmp_type == ICMP_ECHO_REPLY: break def main(): """Wrapper for traceroute to get host from the command line""" parser = argparse.ArgumentParser(description='Traceroute') parser.add_argument( 'host', metavar='HOST', type=str, nargs=1, help='The host to traceroute to') parser.add_argument( '--timeout', type=float, default=1.0, help='Number of seconds to wait for response to a probe (default 1.0)') parser.add_argument( '--hops', type=int, default=10, help='Maximum number of hops (default 10)') args = parser.parse_args() traceroute(args.host[0], timeout=args.timeout, hops=args.hops) if __name__ == '__main__': main()