summaryrefslogtreecommitdiff
path: root/project2/proj2_s4498062/dns_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'project2/proj2_s4498062/dns_tests.py')
-rwxr-xr-xproject2/proj2_s4498062/dns_tests.py74
1 files changed, 73 insertions, 1 deletions
diff --git a/project2/proj2_s4498062/dns_tests.py b/project2/proj2_s4498062/dns_tests.py
index 4a054b6..6a5a16e 100755
--- a/project2/proj2_s4498062/dns_tests.py
+++ b/project2/proj2_s4498062/dns_tests.py
@@ -1,19 +1,26 @@
#!/usr/bin/env python2
""" Tests for the DNS resolver and server """
+# pylint: disable=too-many-public-methods, invalid-name
+
+from random import randint
+import socket
import sys
import time
import unittest
from dns.cache import RecordCache
from dns.classes import Class
+from dns.message import Header, Message, Question
from dns.resolver import Resolver
from dns.resource import ResourceRecord, CNAMERecordData
from dns.types import Type
+
portnr = 5300
server = '127.0.0.1'
+
class TestResolver(unittest.TestCase):
"""Test cases for the resolver with caching disabled"""
@@ -40,6 +47,11 @@ class TestResolverCache(unittest.TestCase):
TTL = 3
+ def __init__(self, *pargs):
+ self.resolv = None
+ self.cache = None
+ super(TestResolverCache, self).__init__(*pargs)
+
def setup_resolver(self):
"""Setup a resolver with an invalid cache"""
self.resolv = Resolver(True, self.TTL)
@@ -68,8 +80,68 @@ class TestResolverCache(unittest.TestCase):
class TestServer(unittest.TestCase):
- pass
+ """Test cases for the server"""
+
+ TIMEOUT = 5
+
+ def setUp(self):
+ self.resolv = Resolver(False, 0)
+ def do_query(self, hostname, type_=Type.A, class_=Class.IN):
+ """Send a query to the server"""
+ ident = randint(0, 65535)
+ header = Header(ident, 0, 1, 0, 0, 0)
+ header.qr = 0
+ header.opcode = 0
+ header.rd = 1
+ req = Message(header, [Question(hostname, type_, class_)])
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.settimeout(self.TIMEOUT)
+ sock.sendto(req.to_bytes(), ('127.0.0.1', portnr))
+
+ try:
+ data = sock.recv(512)
+ resp = Message.from_bytes(data)
+ if resp.header.ident == ident:
+ return resp.answers, resp.authorities, resp.additionals
+ except socket.timeout:
+ pass
+ return [], [], []
+
+ # pylint: disable=too-many-arguments
+ def assert_match(self, rec, name=None, type_=None, class_=None, data=None):
+ """Assert that a ResourceRecord matches some properties"""
+ self.assertTrue(
+ (name is None or rec.name == name) and
+ (type_ is None or rec.type_ == type_) and
+ (class_ is None or rec.class_ == class_) and
+ (data is None or rec.rdata.data == data))
+
+ def test_solve_auth(self):
+ """Test solving a name for which we are authoritative"""
+ answs, auths, adds = self.do_query('cloogle.org')
+ self.assertEqual(answs, [])
+ self.assert_match(
+ auths[0], 'cloogle.org', Type.A, Class.IN, '84.22.111.158')
+ self.assertEqual(adds, [])
+
+ def test_solve_zone(self):
+ """Test solving a name in our zone"""
+ answs, auths, adds = self.do_query('sub.cloogle.org')
+ self.assertEquals(answs + auths + adds, [])
+
+ def test_solve_outside(self):
+ """Test solving a name outside our zone"""
+ for name in ['camilstaps.nl', 'cname.camilstaps.nl']:
+ answs, auths, adds = self.do_query(name)
+ _, aliases, addrs = self.resolv.gethostbyname(name)
+
+ for record in answs + auths + adds:
+ if record.type_ == Type.A:
+ self.assertIn(record.rdata.data, addrs)
+ elif record.type_ == Type.CNAME:
+ self.assertIn(record.rdata.data, aliases)
if __name__ == "__main__":
# Parse command line arguments