1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
#!/usr/bin/env python3
import socket
import struct
import math
def parse_tcp(packet):
header_length = packet[12] * 4
header = packet[:14] # We don't care about the variable length options
data = packet[header_length:]
src_port, dst_port, seqn, ackn, flags = struct.unpack("!HHIIxB", header)
return src_port, dst_port, seqn, ackn, flags, data
def parse_udp(packet):
header_length = 8
header = packet[:header_length]
data = packet[header_length:]
src_port, dst_port, data_len, checksum = struct.unpack("!HHHH", header)
return src_port, dst_port, data_len, data, checksum
def parse_ip(packet):
header_length_in_bytes = (packet[0] & 0x0f) * 4
header = packet[:20]
data = packet[header_length_in_bytes:]
length, protocol, src, dst = struct.unpack("!xxHxxxxxBxx4s4s", header)
header = {'length': length,
'protocol': protocol,
'source': src,
'destination': dst}
return header_length_in_bytes, header, data
def format_ip(addr):
return '.'.join('%d'%i for i in addr)
def parse_eth(packet):
if (packet[13:14] == b'\x81\x00'):
dst, src, typecode = struct.unpack("!6s6sxxxxH", packet[:18])
data = packet[18:]
else:
dst, src, typecode = struct.unpack("!6s6sH", packet[:14])
data = packet[14:]
return dst, src, typecode, data
def format_mac(addr):
return ':'.join('%02x'%i for i in addr)
def main():
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
while True:
raw, address = s.recvfrom(2 ** 16 - 1)
eth_dst, eth_src, eth_type, eth_data = parse_eth(raw)
print("ETH: {} --> {} ({})".format(
format_mac(eth_dst), format_mac(eth_src), eth_type))
if eth_type == 0x0800: # IP
ip_header_len, ip_header, ip_payload = parse_ip(eth_data)
print("IP: {} --> {} ({:04x})".format(
format_ip(ip_header['source']),
format_ip(ip_header['destination']),
ip_header['protocol']))
if ip_header['protocol'] == 0x11: # UDP
src_port, dst_port, _, udp_data, _ = parse_udp(ip_payload)
print("UDP: :{} --> :{}".format(
src_port, dst_port))
elif ip_header['protocol'] == 0x06: # TCP
src_port, dst_port, seqn, ackn, flags, tcp_data = parse_tcp(
ip_payload)
print("TCP: :{} --> :{} / SEQ:{} ACK:{} ({:#02x})".format(
src_port, dst_port, seqn, ackn, flags))
print()
if __name__ == "__main__":
main()
|