summaryrefslogtreecommitdiff
path: root/project2/proj2_s4498062/dns/message.py
blob: baaba1746bc0f861da12bbaf5d930a514e5b9153 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
""" DNS messages

This module contains classes for DNS messages, their header section and
question fields. See section 4 of RFC 1035 for more info.
"""

import struct

from dns.domainname import Parser, Composer
from dns.resource import ResourceRecord


class Message(object):
    """ DNS message """

    def __init__(
            self, header, questions=None, answers=None, authorities=None,
            additionals=None):
        """ Create a new DNS message

        Args:
            header (Header): the header section
            questions ([Question]): the question section
            answers ([ResourceRecord]): the answer section
            authorities ([ResourceRecord]): the authority section
            additionals ([ResourceRecord]): the additional section
        """
        if questions is None:
            questions = []
        if answers is None:
            answers = []
        if authorities is None:
            authorities = []
        if additionals is None:
            additionals = []

        self.header = header
        self.questions = questions
        self.answers = answers
        self.authorities = authorities
        self.additionals = additionals

    @property
    def resources(self):
        """ Getter for all resource records """
        return self.answers + self.authorities + self.additionals

    def to_bytes(self):
        """ Convert Message to bytes """
        composer = Composer()

        # Add header
        result = self.header.to_bytes()

        # Add questions
        for question in self.questions:
            offset = len(result)
            result += question.to_bytes(offset, composer)

        # Add answers
        for answer in self.answers:
            offset = len(result)
            result += answer.to_bytes(offset, composer)

        # Add authorities
        for authority in self.authorities:
            offset = len(result)
            result += authority.to_bytes(offset, composer)

        # Add additionals
        for additional in self.additionals:
            offset = len(result)
            result += additional.to_bytes(offset, composer)

        return result

    @classmethod
    def from_bytes(cls, packet):
        """ Create Message from bytes

        Args:
            packet (bytes): byte representation of the message
        """
        parser = Parser()

        # Parse header
        header, offset = Header.from_bytes(packet), 12

        # Parse questions
        questions = []
        for _ in range(header.qd_count):
            question, offset = Question.from_bytes(packet, offset, parser)
            questions.append(question)

        # Parse answers
        answers = []
        for _ in range(header.an_count):
            answer, offset = ResourceRecord.from_bytes(packet, offset, parser)
            answers.append(answer)

        # Parse authorities
        authorities = []
        for _ in range(header.ns_count):
            auth, offset = ResourceRecord.from_bytes(packet, offset, parser)
            authorities.append(auth)

        # Parse additionals
        additionals = []
        for _ in range(header.ar_count):
            addit, offset = ResourceRecord.from_bytes(packet, offset, parser)
            additionals.append(addit)

        return cls(header, questions, answers, authorities, additionals)


class Header(object):
    """ The header section of a DNS message

    Contains a number of properties which are accessible as normal member
    variables.

    See section 4.1.1 of RFC 1035 for their meaning.
    """

    # pylint: disable=missing-docstring, invalid-name

    def __init__(self, ident, flags, qd_count, an_count, ns_count, ar_count):
        """ Create a new Header object

        Args:
            ident (int): identifier
            qd_count (int): number of entries in question section
            an_count (int): number of entries in answer section
            ns_count (int): number of entries in authority section
            ar_count (int): number of entries in additional section
        """
        self.ident = ident
        self._flags = flags
        self.qd_count = qd_count
        self.an_count = an_count
        self.ns_count = ns_count
        self.ar_count = ar_count

    def to_bytes(self):
        """ Convert header to bytes """
        return struct.pack("!6H",
                           self.ident,
                           self._flags,
                           self.qd_count,
                           self.an_count,
                           self.ns_count,
                           self.ar_count)

    @classmethod
    def from_bytes(cls, packet):
        """ Convert Header from bytes """
        if len(packet) < 12:
            raise ValueError("header is too short")
        return cls(*struct.unpack_from("!6H", packet))

    @property
    def flags(self):
        return self._flags
    @flags.setter
    def flags(self, value):
        if value >= (1 << 16):
            raise ValueError("value too big for flags")
        self._flags = value

    @property
    def qr(self):
        return self._flags & (1 << 15)
    @qr.setter
    def qr(self, value):
        if value:
            self._flags |= (1 << 15)
        else:
            self._flags &= ~(1 << 15)

    @property
    def opcode(self):
        return (self._flags & (((1 << 4) - 1) << 11)) >> 11
    @opcode.setter
    def opcode(self, value):
        if value > 0b1111:
            raise ValueError("invalid opcode")
        self._flags &= ~(((1 << 4) - 1) << 11)
        self._flags |= value << 11

    @property
    def aa(self):
        return self._flags & (1 << 10)
    @aa.setter
    def aa(self, value):
        if value:
            self._flags |= (1 << 10)
        else:
            self._flags &= ~(1 << 10)

    @property
    def tc(self):
        return self._flags & (1 << 9)
    @tc.setter
    def tc(self, value):
        if value:
            self._flags |= (1 << 9)
        else:
            self._flags &= ~(1 << 9)

    @property
    def rd(self):
        return self._flags & (1 << 8)
    @rd.setter
    def rd(self, value):
        if value:
            self._flags |= (1 << 8)
        else:
            self._flags &= ~(1 << 8)

    @property
    def ra(self):
        return self._flags & (1 << 7)
    @ra.setter
    def ra(self, value):
        if value:
            self._flags |= (1 << 7)
        else:
            self._flags &= ~(1 << 7)

    @property
    def z(self):
        return self._flags & (((1 << 3) - 1) << 4) >> 4
    @z.setter
    def z(self, value):
        if value:
            raise ValueError("non-zero zero flag")

    @property
    def rcode(self):
        return self._flags & ((1 << 4) - 1)
    @rcode.setter
    def rcode(self, value):
        if value > 0b1111:
            raise ValueError("invalid return code")
        self._flags &= ~((1 << 4) - 1)
        self._flags |= value


class Question(object):
    """ An entry in the question section.

    See section 4.1.2 of RFC 1035 for more info.
    """

    def __init__(self, qname, qtype, qclass):
        """ Create a new entry in the question section

        Args:
            qname (str): QNAME
            qtype (Type): QTYPE
            qclass (Class): QCLASS
        """
        self.qname = qname
        self.qtype = qtype
        self.qclass = qclass

    def to_bytes(self, offset, composer):
        """ Convert Question to bytes """
        bqname = composer.to_bytes(offset, [self.qname])
        bqtype = struct.pack("!H", self.qtype)
        bqclass = struct.pack("!H", self.qclass)
        return bqname + bqtype + bqclass

    @classmethod
    def from_bytes(cls, packet, offset, parser):
        """ Convert Question from bytes """
        qnames, offset = parser.from_bytes(packet, offset, 1)
        qname = qnames[0]
        qtype, qclass = struct.unpack_from("!2H", packet, offset)
        return cls(qname, qtype, qclass), offset + 4