1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
#!/usr/bin/env python
"""Python implementation of the Unix traceroute program"""
import argparse
import os
import socket
import struct
import sys
import time
ICMP_ECHO_REQUEST = 8
ICMP_ECHO_REPLY = 0
ICMP_TIME_EXCEEDED = 11
def checksum(str_):
"""The ICMP checksum as defined in RFC 792"""
str_ = bytearray(str_)
csum = 0
count_to = (len(str_) // 2) * 2
for count in range(0, count_to, 2):
this_val = str_[count+1] * 256 + str_[count]
csum = csum + this_val
csum = csum & 0xffffffff
if count_to < len(str_):
csum = csum + str_[-1]
csum = csum & 0xffffffff
csum = (csum >> 16) + (csum & 0xffff)
csum = csum + (csum >> 16)
answer = ~csum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def receive_one_ping(my_socket, rec_id, timeout):
"""Attempt to receive a ping.
Arguments:
my_socket -- a Python socket to receive from
rec_id -- only receive packets with this identifier (in case of echo reply)
timeout -- timeout in seconds
Return value:
A tuple (responding server, icmp message type)
"""
start_time = time.time()
while (start_time + timeout - time.time()) > 0:
try:
rec_packet, _ = my_socket.recvfrom(1024)
except socket.timeout:
break # timed out
# Fetch the ICMPHeader fromt the IP
icmp_header = rec_packet[20:28]
source_ip = '%d.%d.%d.%d' % struct.unpack("BBBB", rec_packet[12:16])
icmp_type, _, _, packet_id, _ = struct.unpack("bbHHh", icmp_header)
if icmp_type == ICMP_TIME_EXCEEDED or \
icmp_type == ICMP_ECHO_REPLY and packet_id == rec_id:
return (source_ip, icmp_type)
return None, None
def send_one_ping(my_socket, dest_addr, snd_id, ttl=30):
"""Send an ICMP echo message"""
# Make a dummy packet with a 0 checksum
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, 0, snd_id, 1)
data = struct.pack("d", time.time())
# Calculate the checksum on the data and the dummy header.
my_checksum = socket.htons(checksum(header + data))
if sys.platform == 'darwin':
my_checksum = my_checksum & 0xffff
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, snd_id, 1)
packet = header + data
my_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
my_socket.sendto(packet, (dest_addr, 1))
def do_one_ping(dest_addr, timeout, ttl=30):
"""Send one echo and receive either an echo reply or a time exceeded msg"""
icmp = socket.getprotobyname("icmp")
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
my_socket.settimeout(timeout)
my_id = os.getpid() & 0xffff
send_one_ping(my_socket, dest_addr, my_id, ttl)
response = receive_one_ping(my_socket, my_id, timeout)
my_socket.close()
return response
def traceroute(host, timeout=1.0, hops=10):
"""Python version of the Unix traceroute program"""
dest = socket.gethostbyname(host)
print 'traceroute to %s (%s), %d hops max' % (host, dest, hops)
i = 0
while i < hops:
i += 1
address, icmp_type = do_one_ping(dest, timeout, ttl=i)
if address == None:
print '%2d *' % i
else:
name, _, _ = socket.gethostbyaddr(address)
print '%2d %s (%s)' % (i, name, address)
if icmp_type == ICMP_ECHO_REPLY:
break
def main():
"""Wrapper for traceroute to get host from the command line"""
parser = argparse.ArgumentParser(description='Traceroute')
parser.add_argument(
'host', metavar='HOST', type=str, nargs=1,
help='The host to traceroute to')
parser.add_argument(
'--timeout', type=float, default=1.0,
help='Number of seconds to wait for response to a probe (default 1.0)')
parser.add_argument(
'--hops', type=int, default=10,
help='Maximum number of hops (default 10)')
args = parser.parse_args()
traceroute(args.host[0], timeout=args.timeout, hops=args.hops)
if __name__ == '__main__':
main()
|