#!/usr/bin/env python
"""Python implementation of the Unix traceroute program"""
import argparse
import os
import socket
import struct
import sys
import time
ICMP_ECHO_REQUEST = 8
ICMP_ECHO_REPLY = 0
ICMP_TIME_EXCEEDED = 11
def checksum(str_):
"""The ICMP checksum as defined in RFC 792"""
str_ = bytearray(str_)
csum = 0
count_to = (len(str_) // 2) * 2
for count in range(0, count_to, 2):
this_val = str_[count+1] * 256 + str_[count]
csum = csum + this_val
csum = csum & 0xffffffff
if count_to < len(str_):
csum = csum + str_[-1]
csum = csum & 0xffffffff
csum = (csum >> 16) + (csum & 0xffff)
csum = csum + (csum >> 16)
answer = ~csum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_one_ping(my_socket, rec_id, timeout):
"""Attempt to receive a ping.
Arguments:
my_socket -- a Python socket to receive from
rec_id -- only receive packets with this identifier (in case of echo reply)
timeout -- timeout in seconds
Return value:
A tuple (responding server, icmp message type)
"""
start_time = time.time()
while (start_time + timeout - time.time()) > 0:
try:
rec_packet, _ = my_socket.recvfrom(1024)
except socket.timeout:
break # timed out
# Fetch the ICMPHeader fromt the IP
icmp_header = rec_packet[20:28]
source_ip = '%d.%d.%d.%d' % struct.unpack("BBBB", rec_packet[12:16])
icmp_type, _, _, packet_id, _ = struct.unpack("bbHHh", icmp_header)
if icmp_type == ICMP_TIME_EXCEEDED or \
icmp_type == ICMP_ECHO_REPLY and packet_id == rec_id:
return (source_ip, icmp_type)
return None, None
def send_one_ping(my_socket, dest_addr, snd_id, ttl=30):
"""Send an ICMP echo message"""
# Make a dummy packet with a 0 checksum
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, 0, snd_id, 1)
data = struct.pack("d", time.time())
# Calculate the checksum on the data and the dummy header.
my_checksum = socket.htons(checksum(header + data))
if sys.platform == 'darwin':
my_checksum = my_checksum & 0xffff
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, snd_id, 1)
packet = header + data
my_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
my_socket.sendto(packet, (dest_addr, 1))
def do_one_ping(dest_addr, timeout, ttl=30):
"""Send one echo and receive either an echo reply or a time exceeded msg"""
icmp = socket.getprotobyname("icmp")
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
my_socket.settimeout(timeout)
my_id = os.getpid() & 0xffff
send_one_ping(my_socket, dest_addr, my_id, ttl)
response = receive_one_ping(my_socket, my_id, timeout)
my_socket.close()
return response
def traceroute(host, timeout=1.0, hops=10):
"""Python version of the Unix traceroute program"""
dest = socket.gethostbyname(host)
print 'traceroute to %s (%s), %d hops max' % (host, dest, hops)
i = 0
while i < hops:
i += 1
address, icmp_type = do_one_ping(dest, timeout, ttl=i)
if address == None:
print '%2d *' % i
else:
name, _, _ = socket.gethostbyaddr(address)
print '%2d %s (%s)' % (i, name, address)
if icmp_type == ICMP_ECHO_REPLY:
break
def main():
"""Wrapper for traceroute to get host from the command line"""
parser = argparse.ArgumentParser(description='Traceroute')
parser.add_argument(
'host', metavar='HOST', type=str, nargs=1,
help='The host to traceroute to')
parser.add_argument(
'--timeout', type=float, default=1.0,
help='Number of seconds to wait for response to a probe (default 1.0)')
parser.add_argument(
'--hops', type=int, default=10,
help='Maximum number of hops (default 10)')
args = parser.parse_args()
traceroute(args.host[0], timeout=args.timeout, hops=args.hops)
if __name__ == '__main__':
main()