1# Test cases for X.509 certificate checking
2# Copyright (c) 2019, The Linux Foundation
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import os
8try:
9    import OpenSSL
10    openssl_imported = True
11except ImportError:
12    openssl_imported = False
13
14from utils import HwsimSkip
15import hostapd
16from test_ap_eap import check_domain_suffix_match, check_altsubject_match_support, check_domain_match
17
18def check_cert_check_support():
19    if not openssl_imported:
20        raise HwsimSkip("OpenSSL python method not available")
21
22def start_hapd(apdev, server_cert="auth_serv/server.pem"):
23    params = {"ssid": "cert-check", "wpa": "2", "wpa_key_mgmt": "WPA-EAP",
24              "rsn_pairwise": "CCMP", "ieee8021x": "1",
25              "eap_server": "1", "eap_user_file": "auth_serv/eap_user.conf",
26              "ca_cert": "auth_serv/ca.pem",
27              "server_cert": server_cert,
28              "private_key": "auth_serv/server.key",
29              "dh_file": "auth_serv/dh.conf"}
30    hapd = hostapd.add_ap(apdev, params)
31    return hapd
32
33def load_certs():
34    with open("auth_serv/ca.pem", "rb") as f:
35        res = f.read()
36        cacert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
37                                                 res)
38
39    with open("auth_serv/ca-key.pem", "rb") as f:
40        res = f.read()
41        cakey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, res)
42
43    with open("auth_serv/server.pem", "rb") as f:
44        res = f.read()
45        servercert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, res)
46
47    return cacert, cakey, servercert
48
49def start_cert(servercert, cacert, cn='server.w1.fi', v3=True):
50    cert = OpenSSL.crypto.X509()
51    cert.set_serial_number(12345)
52    cert.gmtime_adj_notBefore(-10)
53    cert.gmtime_adj_notAfter(1000)
54    cert.set_pubkey(servercert.get_pubkey())
55    dn = cert.get_subject()
56    dn.CN = cn
57    cert.set_subject(dn)
58    if v3:
59        cert.set_version(2)
60        cert.add_extensions([
61            OpenSSL.crypto.X509Extension(b"basicConstraints", True,
62                                         b"CA:FALSE"),
63            OpenSSL.crypto.X509Extension(b"subjectKeyIdentifier", False,
64                                         b"hash", subject=cert),
65            OpenSSL.crypto.X509Extension(b"authorityKeyIdentifier", False,
66                                         b"keyid:always", issuer=cacert),
67        ])
68    return cert
69
70def sign_cert(cert, cert_file, cakey, cacert):
71    cert.set_issuer(cacert.get_subject())
72    cert.sign(cakey, "sha256")
73    with open(cert_file, 'wb') as f:
74        f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
75                                                cert))
76
77def check_connect(dev, fail=False, wait_error=None, **kwargs):
78    dev.connect("cert-check", key_mgmt="WPA-EAP", eap="TTLS",
79                identity="pap user", anonymous_identity="ttls",
80                password="password",
81                ca_cert="auth_serv/ca.pem", phase2="auth=PAP",
82                scan_freq="2412", wait_connect=False, **kwargs)
83    ev = dev.wait_event(["CTRL-EVENT-EAP-STARTED"], timeout=10)
84    if ev is None:
85        raise Exception("EAP not started")
86    if fail:
87        if wait_error:
88            ev = dev.wait_event([wait_error], timeout=5)
89            if ev is None:
90                raise Exception("Specific error not reported")
91        ev = dev.wait_event(["CTRL-EVENT-EAP-FAILURE"], timeout=5)
92        if ev is None:
93            raise Exception("EAP failure not reported")
94    else:
95        dev.wait_connected()
96    dev.request("REMOVE_NETWORK all")
97    dev.request("ABORT_SCAN")
98    dev.wait_disconnected()
99    dev.dump_monitor()
100
101def test_cert_check_basic(dev, apdev, params):
102    """Basic test with generated X.509 server certificate"""
103    check_cert_check_support()
104    cert_file = os.path.join(params['logdir'], "cert_check_basic.pem")
105    cacert, cakey, servercert = load_certs()
106
107    cert = start_cert(servercert, cacert, v3=False)
108    sign_cert(cert, cert_file, cakey, cacert)
109    hapd = start_hapd(apdev[0], server_cert=cert_file)
110    check_connect(dev[0])
111
112def test_cert_check_v3(dev, apdev, params):
113    """Basic test with generated X.509v3 server certificate"""
114    check_cert_check_support()
115    cert_file = os.path.join(params['logdir'], "cert_check_v3.pem")
116    cacert, cakey, servercert = load_certs()
117
118    cert = start_cert(servercert, cacert)
119    sign_cert(cert, cert_file, cakey, cacert)
120    hapd = start_hapd(apdev[0], server_cert=cert_file)
121    check_connect(dev[0])
122
123def test_cert_check_dnsname(dev, apdev, params):
124    """Certificate check with multiple dNSName values"""
125    check_cert_check_support()
126    check_domain_suffix_match(dev[0])
127    check_domain_match(dev[0])
128    cert_file = os.path.join(params['logdir'], "cert_check_dnsname.pem")
129    cacert, cakey, servercert = load_certs()
130
131    cert = start_cert(servercert, cacert, cn="server")
132    dns = ["DNS:one.example.com", "DNS:two.example.com",
133           "DNS:three.example.com"]
134    cert.add_extensions([OpenSSL.crypto.X509Extension(b"subjectAltName", False,
135                                                      ",".join(dns).encode())])
136    sign_cert(cert, cert_file, cakey, cacert)
137    hapd = start_hapd(apdev[0], server_cert=cert_file)
138    check_connect(dev[0])
139
140    tests = ["two.example.com",
141             "one.example.com",
142             "tWo.Example.com",
143             "three.example.com",
144             "no.match.example.com;two.example.com;no.match.example.org",
145             "no.match.example.com;example.com;no.match.example.org",
146             "no.match.example.com;no.match.example.org;example.com",
147             "example.com",
148             "com"]
149    for match in tests:
150        check_connect(dev[0], domain_suffix_match=match)
151
152    tests = ["four.example.com",
153             "foo.one.example.com",
154             "no.match.example.org;no.match.example.com",
155             "xample.com"]
156    for match in tests:
157        check_connect(dev[0], fail=True,
158                      wait_error="CTRL-EVENT-EAP-TLS-CERT-ERROR",
159                      domain_suffix_match=match)
160
161    tests = ["one.example.com",
162             "two.example.com",
163             "three.example.com",
164             "no.match.example.com;two.example.com;no.match.example.org",
165             "tWo.Example.Com"]
166    for match in tests:
167        check_connect(dev[0], domain_match=match)
168
169    tests = ["four.example.com",
170             "foo.one.example.com",
171             "example.com",
172             "xample.com",
173             "no.match.example.org;no.match.example.com",
174             "ne.example.com"]
175    for match in tests:
176        check_connect(dev[0], fail=True,
177                      wait_error="CTRL-EVENT-EAP-TLS-CERT-ERROR",
178                      domain_match=match)
179
180def test_cert_check_dnsname_wildcard(dev, apdev, params):
181    """Certificate check with multiple dNSName wildcard values"""
182    check_cert_check_support()
183    check_domain_suffix_match(dev[0])
184    check_domain_match(dev[0])
185    cert_file = os.path.join(params['logdir'], "cert_check_dnsname.pem")
186    cacert, cakey, servercert = load_certs()
187
188    cert = start_cert(servercert, cacert, cn="server")
189    dns = ["DNS:*.one.example.com", "DNS:two.example.com",
190           "DNS:*.three.example.com"]
191    cert.add_extensions([OpenSSL.crypto.X509Extension(b"subjectAltName", False,
192                                                      ",".join(dns).encode())])
193    sign_cert(cert, cert_file, cakey, cacert)
194    hapd = start_hapd(apdev[0], server_cert=cert_file)
195    check_connect(dev[0])
196
197    tests = ["two.example.com",
198             "one.example.com",
199             "tWo.Example.com",
200             "three.example.com",
201             "no.match.example.com;two.example.com;no.match.example.org",
202             "no.match.example.com;example.com;no.match.example.org",
203             "no.match.example.com;no.match.example.org;example.com",
204             "example.com",
205             "com"]
206    for match in tests:
207        check_connect(dev[0], domain_suffix_match=match)
208
209    tests = ["four.example.com",
210             "foo.one.example.com",
211             "no.match.example.org;no.match.example.com",
212             "xample.com"]
213    for match in tests:
214        check_connect(dev[0], fail=True,
215                      wait_error="CTRL-EVENT-EAP-TLS-CERT-ERROR",
216                      domain_suffix_match=match)
217
218    tests = ["*.one.example.com",
219             "two.example.com",
220             "*.three.example.com",
221             "no.match.example.com;two.example.com;no.match.example.org",
222             "tWo.Example.Com"]
223    for match in tests:
224        check_connect(dev[0], domain_match=match)
225
226    tests = ["four.example.com",
227             "foo.one.example.com",
228             "example.com",
229             "xample.com",
230             "no.match.example.org;no.match.example.com",
231             "one.example.com"]
232    for match in tests:
233        check_connect(dev[0], fail=True,
234                      wait_error="CTRL-EVENT-EAP-TLS-CERT-ERROR",
235                      domain_match=match)
236
237def test_cert_check_dnsname_alt(dev, apdev, params):
238    """Certificate check with multiple dNSName values using altsubject_match"""
239    check_cert_check_support()
240    check_altsubject_match_support(dev[0])
241    cert_file = os.path.join(params['logdir'], "cert_check_dnsname_alt.pem")
242    cacert, cakey, servercert = load_certs()
243
244    cert = start_cert(servercert, cacert, cn="server")
245    dns = ["DNS:*.one.example.com", "DNS:two.example.com",
246           "DNS:*.three.example.com"]
247    cert.add_extensions([OpenSSL.crypto.X509Extension(b"subjectAltName", False,
248                                                      ",".join(dns).encode())])
249    sign_cert(cert, cert_file, cakey, cacert)
250    hapd = start_hapd(apdev[0], server_cert=cert_file)
251
252    tests = ["DNS:*.one.example.com",
253             "DNS:two.example.com",
254             "DNS:*.three.example.com",
255             "DNS:*.three.example.com;DNS:two.example.com;DNS:*.one.example.com",
256             "DNS:foo.example.org;DNS:two.example.com;DNS:bar.example.org"]
257    for alt in tests:
258        check_connect(dev[0], altsubject_match=alt)
259
260    tests = ["DNS:one.example.com",
261             "DNS:four.example.com;DNS:five.example.com"]
262    for alt in tests:
263        check_connect(dev[0], fail=True,
264                      wait_error="CTRL-EVENT-EAP-TLS-CERT-ERROR",
265                      altsubject_match=alt)
266
267def test_cert_check_dnsname_cn(dev, apdev, params):
268    """Certificate check with dNSName in CN"""
269    check_cert_check_support()
270    check_domain_suffix_match(dev[0])
271    check_domain_match(dev[0])
272    cert_file = os.path.join(params['logdir'], "cert_check_dnsname_cn.pem")
273    cacert, cakey, servercert = load_certs()
274
275    cert = start_cert(servercert, cacert, cn="server.example.com")
276    sign_cert(cert, cert_file, cakey, cacert)
277    hapd = start_hapd(apdev[0], server_cert=cert_file)
278    check_connect(dev[0])
279
280    tests = ["server.example.com",
281             "example.com",
282             "eXample.Com",
283             "no.match.example.com;example.com;no.match.example.org",
284             "no.match.example.com;server.example.com;no.match.example.org",
285             "com"]
286    for match in tests:
287        check_connect(dev[0], domain_suffix_match=match)
288
289    tests = ["aaa.example.com",
290             "foo.server.example.com",
291             "no.match.example.org;no.match.example.com",
292             "xample.com"]
293    for match in tests:
294        check_connect(dev[0], fail=True,
295                      wait_error="CTRL-EVENT-EAP-TLS-CERT-ERROR",
296                      domain_suffix_match=match)
297
298    tests = ["server.example.com",
299             "no.match.example.com;server.example.com;no.match.example.org",
300             "sErver.Example.Com"]
301    for match in tests:
302        check_connect(dev[0], domain_match=match)
303
304    tests = ["aaa.example.com",
305             "foo.server.example.com",
306             "example.com",
307             "no.match.example.org;no.match.example.com",
308             "xample.com"]
309    for match in tests:
310        check_connect(dev[0], fail=True,
311                      wait_error="CTRL-EVENT-EAP-TLS-CERT-ERROR",
312                      domain_match=match)
313