diff options
Diffstat (limited to 'project2/proj2_s4498062/dns_tests.py')
-rwxr-xr-x | project2/proj2_s4498062/dns_tests.py | 74 |
1 files changed, 73 insertions, 1 deletions
diff --git a/project2/proj2_s4498062/dns_tests.py b/project2/proj2_s4498062/dns_tests.py index 4a054b6..6a5a16e 100755 --- a/project2/proj2_s4498062/dns_tests.py +++ b/project2/proj2_s4498062/dns_tests.py @@ -1,19 +1,26 @@ #!/usr/bin/env python2 """ Tests for the DNS resolver and server """ +# pylint: disable=too-many-public-methods, invalid-name + +from random import randint +import socket import sys import time import unittest from dns.cache import RecordCache from dns.classes import Class +from dns.message import Header, Message, Question from dns.resolver import Resolver from dns.resource import ResourceRecord, CNAMERecordData from dns.types import Type + portnr = 5300 server = '127.0.0.1' + class TestResolver(unittest.TestCase): """Test cases for the resolver with caching disabled""" @@ -40,6 +47,11 @@ class TestResolverCache(unittest.TestCase): TTL = 3 + def __init__(self, *pargs): + self.resolv = None + self.cache = None + super(TestResolverCache, self).__init__(*pargs) + def setup_resolver(self): """Setup a resolver with an invalid cache""" self.resolv = Resolver(True, self.TTL) @@ -68,8 +80,68 @@ class TestResolverCache(unittest.TestCase): class TestServer(unittest.TestCase): - pass + """Test cases for the server""" + + TIMEOUT = 5 + + def setUp(self): + self.resolv = Resolver(False, 0) + def do_query(self, hostname, type_=Type.A, class_=Class.IN): + """Send a query to the server""" + ident = randint(0, 65535) + header = Header(ident, 0, 1, 0, 0, 0) + header.qr = 0 + header.opcode = 0 + header.rd = 1 + req = Message(header, [Question(hostname, type_, class_)]) + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(self.TIMEOUT) + sock.sendto(req.to_bytes(), ('127.0.0.1', portnr)) + + try: + data = sock.recv(512) + resp = Message.from_bytes(data) + if resp.header.ident == ident: + return resp.answers, resp.authorities, resp.additionals + except socket.timeout: + pass + return [], [], [] + + # pylint: disable=too-many-arguments + def assert_match(self, rec, name=None, type_=None, class_=None, data=None): + """Assert that a ResourceRecord matches some properties""" + self.assertTrue( + (name is None or rec.name == name) and + (type_ is None or rec.type_ == type_) and + (class_ is None or rec.class_ == class_) and + (data is None or rec.rdata.data == data)) + + def test_solve_auth(self): + """Test solving a name for which we are authoritative""" + answs, auths, adds = self.do_query('cloogle.org') + self.assertEqual(answs, []) + self.assert_match( + auths[0], 'cloogle.org', Type.A, Class.IN, '84.22.111.158') + self.assertEqual(adds, []) + + def test_solve_zone(self): + """Test solving a name in our zone""" + answs, auths, adds = self.do_query('sub.cloogle.org') + self.assertEquals(answs + auths + adds, []) + + def test_solve_outside(self): + """Test solving a name outside our zone""" + for name in ['camilstaps.nl', 'cname.camilstaps.nl']: + answs, auths, adds = self.do_query(name) + _, aliases, addrs = self.resolv.gethostbyname(name) + + for record in answs + auths + adds: + if record.type_ == Type.A: + self.assertIn(record.rdata.data, addrs) + elif record.type_ == Type.CNAME: + self.assertIn(record.rdata.data, aliases) if __name__ == "__main__": # Parse command line arguments |