1# WPA2-Personal OCV tests
2# Copyright (c) 2018, Mathy Vanhoef
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details
6
7from remotehost import remote_compatible
8import binascii, struct
9import logging, time
10logger = logging.getLogger()
11
12import hostapd
13from wpasupplicant import WpaSupplicant
14import hwsim_utils
15from utils import *
16from test_erp import start_erp_as
17from test_ap_ft import ft_params1, ft_params2
18from test_ap_psk import parse_eapol, build_eapol, pmk_to_ptk, eapol_key_mic, recv_eapol, send_eapol, reply_eapol, build_eapol_key_3_4, aes_wrap, pad_key_data
19
20#TODO: Refuse setting up AP with OCV but without MFP support
21#TODO: Refuse to connect to AP that advertises OCV but not MFP
22
23def make_ocikde(op_class, channel, seg1_idx):
24    WLAN_EID_VENDOR_SPECIFIC = 221
25    RSN_KEY_DATA_OCI = b"\x00\x0f\xac\x0d"
26
27    data = RSN_KEY_DATA_OCI + struct.pack("<BBB", op_class, channel, seg1_idx)
28    ocikde = struct.pack("<BB", WLAN_EID_VENDOR_SPECIFIC, len(data)) + data
29
30    return ocikde
31
32def ocv_setup_ap(apdev, params):
33    ssid = "test-wpa2-ocv"
34    passphrase = "qwertyuiop"
35    params.update(hostapd.wpa2_params(ssid=ssid, passphrase=passphrase))
36    try:
37        hapd = hostapd.add_ap(apdev, params)
38    except Exception as e:
39        if "Failed to set hostapd parameter ocv" in str(e):
40            raise HwsimSkip("OCV not supported")
41        raise
42    return hapd, ssid, passphrase
43
44def build_eapol_key_1_2(kck, key_data, replay_counter=3, key_info=0x1382,
45                        extra_len=0, descr_type=2, key_len=16):
46    msg = {}
47    msg['version'] = 2
48    msg['type'] = 3
49    msg['length'] = 95 + len(key_data) + extra_len
50
51    msg['descr_type'] = descr_type
52    msg['rsn_key_info'] = key_info
53    msg['rsn_key_len'] = key_len
54    msg['rsn_replay_counter'] = struct.pack('>Q', replay_counter)
55    msg['rsn_key_nonce'] = binascii.unhexlify('0000000000000000000000000000000000000000000000000000000000000000')
56    msg['rsn_key_iv'] = binascii.unhexlify('00000000000000000000000000000000')
57    msg['rsn_key_rsc'] = binascii.unhexlify('0000000000000000')
58    msg['rsn_key_id'] = binascii.unhexlify('0000000000000000')
59    msg['rsn_key_data_len'] = len(key_data)
60    msg['rsn_key_data'] = key_data
61    eapol_key_mic(kck, msg)
62    return msg
63
64def build_eapol_key_2_2(kck, key_data, replay_counter=3, key_info=0x0302,
65                        extra_len=0, descr_type=2, key_len=16):
66    return build_eapol_key_1_2(kck, key_data, replay_counter, key_info,
67                               extra_len, descr_type, key_len)
68
69@remote_compatible
70def test_wpa2_ocv(dev, apdev):
71    """OCV on 2.4 GHz"""
72    params = {"channel": "1",
73              "ieee80211w": "2",
74              "ocv": "1"}
75    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
76    for ocv in range(2):
77        dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv=str(ocv),
78                       ieee80211w="1")
79        dev[0].request("REMOVE_NETWORK all")
80        dev[0].wait_disconnected()
81
82@remote_compatible
83def test_wpa2_ocv_5ghz(dev, apdev):
84    """OCV on 5 GHz"""
85    try:
86        run_wpa2_ocv_5ghz(dev, apdev)
87    finally:
88        set_world_reg(apdev[0], apdev[1], dev[0])
89        dev[0].flush_scan_cache()
90
91def run_wpa2_ocv_5ghz(dev, apdev):
92    params = {"hw_mode": "a",
93              "channel": "40",
94              "ieee80211w": "2",
95              "country_code": "US",
96              "ocv": "1"}
97    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
98    for ocv in range(2):
99        dev[0].connect(ssid, psk=passphrase, scan_freq="5200", ocv=str(ocv),
100                       ieee80211w="1")
101        dev[0].wait_regdom(country_ie=True)
102        dev[0].request("REMOVE_NETWORK all")
103        dev[0].wait_disconnected()
104
105@remote_compatible
106def test_wpa2_ocv_ht20(dev, apdev):
107    """OCV with HT20 channel"""
108    params = {"channel": "6",
109              "ieee80211n": "1",
110              "ieee80211w": "1",
111              "ocv": "1"}
112    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
113    for ocv in range(2):
114        dev[0].connect(ssid, psk=passphrase, scan_freq="2437", ocv=str(ocv),
115                       ieee80211w="1", disable_ht="1")
116        dev[1].connect(ssid, psk=passphrase, scan_freq="2437", ocv=str(ocv),
117                       ieee80211w="1")
118        dev[0].request("REMOVE_NETWORK all")
119        dev[1].request("REMOVE_NETWORK all")
120        dev[0].wait_disconnected()
121        dev[1].wait_disconnected()
122
123@remote_compatible
124def test_wpa2_ocv_ht40(dev, apdev):
125    """OCV with HT40 channel"""
126    try:
127        run_wpa2_ocv_ht40(dev, apdev)
128    finally:
129        set_world_reg(apdev[0], apdev[1], dev[0])
130        dev[0].flush_scan_cache()
131        dev[1].flush_scan_cache()
132
133def run_wpa2_ocv_ht40(dev, apdev):
134    clear_scan_cache(apdev[0])
135    for channel, capab, freq, mode in [("6", "[HT40-]", "2437", "g"),
136                                       ("6", "[HT40+]", "2437", "g"),
137                                       ("40", "[HT40-]", "5200", "a"),
138                                       ("36", "[HT40+]", "5180", "a")]:
139        params = {"hw_mode": mode,
140                  "channel": channel,
141                  "country_code": "US",
142                  "ieee80211n": "1",
143                  "ht_capab": capab,
144                  "ieee80211w": "1",
145                  "ocv": "1"}
146        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
147        dev[0].flush_scan_cache()
148        dev[1].flush_scan_cache()
149        for ocv in range(2):
150            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
151                           ieee80211w="1", disable_ht="1")
152            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
153                           ieee80211w="1")
154            dev[0].wait_regdom(country_ie=True)
155            dev[0].request("REMOVE_NETWORK all")
156            dev[1].request("REMOVE_NETWORK all")
157            dev[0].wait_disconnected()
158            dev[1].wait_disconnected()
159        hapd.disable()
160
161@remote_compatible
162def test_wpa2_ocv_vht40(dev, apdev):
163    """OCV with VHT40 channel"""
164    try:
165        run_wpa2_ocv_vht40(dev, apdev)
166    finally:
167        set_world_reg(apdev[0], apdev[1], dev[0])
168        dev[0].flush_scan_cache()
169        dev[1].flush_scan_cache()
170        dev[2].flush_scan_cache()
171
172def run_wpa2_ocv_vht40(dev, apdev):
173    clear_scan_cache(apdev[0])
174    for channel, capab, freq in [("40", "[HT40-]", "5200"),
175                                 ("36", "[HT40+]", "5180")]:
176        params = {"hw_mode": "a",
177                  "channel": channel,
178                  "country_code": "US",
179                  "ht_capab": capab,
180                  "ieee80211n": "1",
181                  "ieee80211ac": "1",
182                  "vht_oper_chwidth": "0",
183                  "vht_oper_centr_freq_seg0_idx": "38",
184                  "ieee80211w": "1",
185                  "ocv": "1"}
186        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
187        dev[0].flush_scan_cache()
188        dev[1].flush_scan_cache()
189        dev[2].flush_scan_cache()
190        for ocv in range(2):
191            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
192                           ieee80211w="1", disable_ht="1")
193            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
194                           ieee80211w="1", disable_vht="1")
195            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
196                           ieee80211w="1")
197            dev[0].wait_regdom(country_ie=True)
198            dev[0].request("REMOVE_NETWORK all")
199            dev[1].request("REMOVE_NETWORK all")
200            dev[2].request("REMOVE_NETWORK all")
201            dev[0].wait_disconnected()
202            dev[1].wait_disconnected()
203            dev[2].wait_disconnected()
204        hapd.disable()
205
206@remote_compatible
207def test_wpa2_ocv_vht80(dev, apdev):
208    """OCV with VHT80 channel"""
209    try:
210        run_wpa2_ocv_vht80(dev, apdev)
211    finally:
212        set_world_reg(apdev[0], apdev[1], dev[0])
213        dev[0].flush_scan_cache()
214        dev[1].flush_scan_cache()
215        dev[2].flush_scan_cache()
216
217def run_wpa2_ocv_vht80(dev, apdev):
218    clear_scan_cache(apdev[0])
219    for channel, capab, freq in [("40", "[HT40-]", "5200"),
220                                 ("36", "[HT40+]", "5180")]:
221        params = {"hw_mode": "a",
222                  "channel": channel,
223                  "country_code": "US",
224                  "ht_capab": capab,
225                  "ieee80211n": "1",
226                  "ieee80211ac": "1",
227                  "vht_oper_chwidth": "1",
228                  "vht_oper_centr_freq_seg0_idx": "42",
229                  "ieee80211w": "1",
230                  "ocv": "1"}
231        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
232        for ocv in range(2):
233            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
234                           ieee80211w="1", disable_ht="1")
235            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
236                           ieee80211w="1", disable_vht="1")
237            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
238                           ieee80211w="1")
239            dev[0].wait_regdom(country_ie=True)
240            dev[0].request("REMOVE_NETWORK all")
241            dev[1].request("REMOVE_NETWORK all")
242            dev[2].request("REMOVE_NETWORK all")
243            dev[0].wait_disconnected()
244            dev[1].wait_disconnected()
245            dev[2].wait_disconnected()
246        hapd.disable()
247
248@remote_compatible
249def test_wpa2_ocv_vht160(dev, apdev):
250    """OCV with VHT160 channel"""
251    try:
252        run_wpa2_ocv_vht160(dev, apdev)
253    finally:
254        set_world_reg(apdev[0], apdev[1], dev[0])
255        dev[0].flush_scan_cache()
256        dev[1].flush_scan_cache()
257        dev[2].flush_scan_cache()
258
259def run_wpa2_ocv_vht160(dev, apdev):
260    for channel, capab, freq in [("100", "[HT40+]", "5500"),
261                                 ("104", "[HT40-]", "5520")]:
262        params = {"hw_mode": "a",
263                  "channel": channel,
264                  "country_code": "ZA",
265                  "ht_capab": capab,
266                  "vht_capab": "[VHT160]",
267                  "ieee80211n": "1",
268                  "ieee80211ac": "1",
269                  "vht_oper_chwidth": "2",
270                  "vht_oper_centr_freq_seg0_idx": "114",
271                  "ieee80211w": "1",
272                  "ocv": "1"}
273        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
274        for ocv in range(2):
275            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
276                           ieee80211w="1", disable_ht="1")
277            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
278                           ieee80211w="1", disable_vht="1")
279            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
280                           ieee80211w="1")
281            dev[0].wait_regdom(country_ie=True)
282            dev[0].request("REMOVE_NETWORK all")
283            dev[1].request("REMOVE_NETWORK all")
284            dev[2].request("REMOVE_NETWORK all")
285            dev[0].wait_disconnected()
286            dev[1].wait_disconnected()
287            dev[2].wait_disconnected()
288        hapd.disable()
289
290@remote_compatible
291def test_wpa2_ocv_vht80plus80(dev, apdev):
292    """OCV with VHT80+80 channel"""
293    try:
294        run_wpa2_ocv_vht80plus80(dev, apdev)
295    finally:
296        set_world_reg(apdev[0], apdev[1], dev[0])
297        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
298        dev[0].flush_scan_cache()
299        dev[1].flush_scan_cache()
300        dev[2].flush_scan_cache()
301
302def run_wpa2_ocv_vht80plus80(dev, apdev):
303    for channel, capab, freq in [("36", "[HT40+]", "5180"),
304                                 ("40", "[HT40-]", "5200")]:
305        params = {"hw_mode": "a",
306                  "channel": channel,
307                  "country_code": "US",
308                  "ht_capab": capab,
309                  "vht_capab": "[VHT160-80PLUS80]",
310                  "ieee80211n": "1",
311                  "ieee80211ac": "1",
312                  "vht_oper_chwidth": "3",
313                  "vht_oper_centr_freq_seg0_idx": "42",
314                  "vht_oper_centr_freq_seg1_idx": "155",
315                  "ieee80211w": "1",
316                  "ieee80211d": "1",
317                  "ieee80211h": "1",
318                  "ocv": "1"}
319        hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
320        for ocv in range(2):
321            dev[0].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
322                           ieee80211w="1", disable_ht="1")
323            dev[1].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
324                           ieee80211w="1", disable_vht="1")
325            dev[2].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
326                           ieee80211w="1")
327            dev[0].wait_regdom(country_ie=True)
328            dev[0].request("REMOVE_NETWORK all")
329            dev[1].request("REMOVE_NETWORK all")
330            dev[2].request("REMOVE_NETWORK all")
331            dev[0].wait_disconnected()
332            dev[1].wait_disconnected()
333            dev[2].wait_disconnected()
334        for i in range(3):
335            dev[i].connect(ssid, psk=passphrase, scan_freq=freq, ocv=str(ocv),
336                           ieee80211w="1")
337            if i == 0:
338                dev[i].wait_regdom(country_ie=True)
339        hapd.disable()
340        for i in range(3):
341            dev[i].request("DISCONNECT")
342        for i in range(3):
343            dev[i].disconnect_and_stop_scan()
344
345class APConnection:
346    def init_params(self):
347        # Static parameters
348        self.ssid = "test-wpa2-ocv"
349        self.passphrase = "qwertyuiop"
350        self.psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
351
352        # Dynamic parameters
353        self.hapd = None
354        self.addr = None
355        self.rsne = None
356        self.kck = None
357        self.kek = None
358        self.msg = None
359        self.bssid = None
360        self.anonce = None
361        self.snonce = None
362        self.dev = None
363
364    def __init__(self, apdev, dev, params):
365        self.init_params()
366
367        # By default, OCV is enabled for both the client and AP. The following
368        # parameters can be used to disable OCV for the client or AP.
369        ap_ocv = params.pop("ap_ocv", "1")
370        sta_ocv = params.pop("sta_ocv", "1")
371
372        freq = params.pop("freq")
373        params.update(hostapd.wpa2_params(ssid=self.ssid,
374                                          passphrase=self.passphrase))
375        params["wpa_pairwise_update_count"] = "10"
376        params["ocv"] = ap_ocv
377        try:
378            self.hapd = hostapd.add_ap(apdev, params)
379        except Exception as e:
380            if "Failed to set hostapd parameter ocv" in str(e):
381                raise HwsimSkip("OCV not supported")
382            raise
383        self.hapd.request("SET ext_eapol_frame_io 1")
384        self.dev = dev
385        dev.request("SET ext_eapol_frame_io 1")
386
387        self.bssid = apdev['bssid']
388        pmk = binascii.unhexlify("c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7")
389
390        if sta_ocv != "0":
391            self.rsne = binascii.unhexlify("301a0100000fac040100000fac040100000fac028c400000000fac06")
392        else:
393            self.rsne = binascii.unhexlify("301a0100000fac040100000fac040100000fac028c000000000fac06")
394        self.snonce = binascii.unhexlify('1111111111111111111111111111111111111111111111111111111111111111')
395
396        dev.connect(self.ssid, raw_psk=self.psk, scan_freq=freq, ocv=sta_ocv,
397                    ieee80211w="1", wait_connect=False)
398        if "country_code" in params:
399            dev.wait_regdom(country_ie=True)
400        self.addr = dev.p2p_interface_addr()
401
402        # Wait for EAPOL-Key msg 1/4 from hostapd to determine when associated
403        self.msg = recv_eapol(self.hapd)
404        self.anonce = self.msg['rsn_key_nonce']
405        (ptk, self.kck, self.kek) = pmk_to_ptk(pmk, self.addr, self.bssid,
406                                               self.snonce, self.anonce)
407
408    # hapd, addr, rsne, kck, msg, anonce, snonce
409    def test_bad_oci(self, logmsg, op_class, channel, seg1_idx):
410        logger.debug("Bad OCI element: " + logmsg)
411        if op_class is None:
412            ocikde = b''
413        else:
414            ocikde = make_ocikde(op_class, channel, seg1_idx)
415
416        reply_eapol("2/4", self.hapd, self.addr, self.msg, 0x010a, self.snonce,
417                    self.rsne + ocikde, self.kck)
418        self.msg = recv_eapol(self.hapd)
419        if self.anonce != self.msg['rsn_key_nonce'] or self.msg["rsn_key_info"] != 138:
420            self.dev.request("DISCONNECT")
421            raise Exception("Didn't receive retransmitted 1/4")
422
423    def confirm_valid_oci(self, op_class, channel, seg1_idx):
424        logger.debug("Valid OCI element to complete handshake")
425        ocikde = make_ocikde(op_class, channel, seg1_idx)
426
427        reply_eapol("2/4", self.hapd, self.addr, self.msg, 0x010a, self.snonce,
428                    self.rsne + ocikde, self.kck)
429        self.msg = recv_eapol(self.hapd)
430        if self.anonce != self.msg['rsn_key_nonce'] or self.msg["rsn_key_info"] != 5066:
431            raise Exception("Didn't receive 3/4 in response to valid 2/4")
432
433        reply_eapol("4/4", self.hapd, self.addr, self.msg, 0x030a, None, None,
434                    self.kck)
435        self.hapd.wait_sta(timeout=15)
436
437@remote_compatible
438def test_wpa2_ocv_ap_mismatch(dev, apdev):
439    """OCV AP mismatch"""
440    params = {"channel": "1",
441              "ieee80211w": "1",
442              "freq": "2412"}
443    conn = APConnection(apdev[0], dev[0], params)
444    conn.test_bad_oci("element missing", None, 0, 0)
445    conn.test_bad_oci("wrong channel number", 81, 6, 0)
446    conn.test_bad_oci("invalid channel number", 81, 0, 0)
447    conn.test_bad_oci("wrong operating class", 80, 0, 0)
448    conn.test_bad_oci("invalid operating class", 0, 0, 0)
449    conn.confirm_valid_oci(81, 1, 0)
450
451    dev[0].dump_monitor()
452    dev[0].request("DISCONNECT")
453    dev[0].wait_disconnected()
454
455@remote_compatible
456def test_wpa2_ocv_ap_ht_mismatch(dev, apdev):
457    """OCV AP mismatch (HT)"""
458    clear_scan_cache(apdev[0])
459    params = {"channel": "6",
460              "ht_capab": "[HT40-]",
461              "ieee80211w": "1",
462              "freq": "2437"}
463    conn = APConnection(apdev[0], dev[0], params)
464    conn.test_bad_oci("wrong primary channel", 84, 5, 0)
465    conn.test_bad_oci("lower bandwidth than negotiated", 81, 6, 0)
466    conn.test_bad_oci("bad upper/lower channel", 83, 6, 0)
467    conn.confirm_valid_oci(84, 6, 0)
468
469    dev[0].dump_monitor()
470    dev[0].request("DISCONNECT")
471    dev[0].wait_disconnected()
472
473@remote_compatible
474def test_wpa2_ocv_ap_vht80_mismatch(dev, apdev):
475    """OCV AP mismatch (VHT80)"""
476    try:
477        run_wpa2_ocv_ap_vht80_mismatch(dev, apdev)
478    finally:
479        set_world_reg(apdev[0], apdev[1], dev[0])
480        dev[0].flush_scan_cache()
481
482def run_wpa2_ocv_ap_vht80_mismatch(dev, apdev):
483    params = {"hw_mode": "a",
484              "channel": "36",
485              "country_code": "US",
486              "ht_capab": "[HT40+]",
487              "ieee80211w": "1",
488              "ieee80211n": "1",
489              "ieee80211ac": "1",
490              "vht_oper_chwidth": "1",
491              "freq": "5180",
492              "vht_oper_centr_freq_seg0_idx": "42"}
493    conn = APConnection(apdev[0], dev[0], params)
494    conn.test_bad_oci("wrong primary channel", 128, 38, 0)
495    conn.test_bad_oci("wrong primary channel", 128, 32, 0)
496    conn.test_bad_oci("smaller bandwidth than negotiated", 116, 36, 0)
497    conn.test_bad_oci("smaller bandwidth than negotiated", 115, 36, 0)
498    conn.confirm_valid_oci(128, 36, 0)
499
500    dev[0].dump_monitor()
501    dev[0].request("DISCONNECT")
502    dev[0].wait_disconnected()
503
504@remote_compatible
505def test_wpa2_ocv_ap_vht160_mismatch(dev, apdev):
506    """OCV AP mismatch (VHT160)"""
507    try:
508        run_wpa2_ocv_ap_vht160_mismatch(dev, apdev)
509    finally:
510        set_world_reg(apdev[0], apdev[1], dev[0])
511        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
512        dev[0].flush_scan_cache()
513
514def run_wpa2_ocv_ap_vht160_mismatch(dev, apdev):
515    params = {"hw_mode": "a",
516              "channel": "100",
517              "country_code": "ZA",
518              "ht_capab": "[HT40+]",
519              "ieee80211w": "1",
520              "ieee80211n": "1",
521              "ieee80211ac": "1",
522              "vht_oper_chwidth": "2",
523              "freq": "5500",
524              "vht_oper_centr_freq_seg0_idx": "114",
525              "ieee80211d": "1",
526              "ieee80211h": "1"}
527    conn = APConnection(apdev[0], dev[0], params)
528    conn.test_bad_oci("wrong primary channel", 129, 36, 0)
529    conn.test_bad_oci("wrong primary channel", 129, 114, 0)
530    conn.test_bad_oci("smaller bandwidth (20 Mhz) than negotiated", 121, 100, 0)
531    conn.test_bad_oci("smaller bandwidth (40 Mhz) than negotiated", 122, 100, 0)
532    conn.test_bad_oci("smaller bandwidth (80 Mhz) than negotiated", 128, 100, 0)
533    conn.confirm_valid_oci(129, 100, 0)
534
535    dev[0].dump_monitor()
536    if conn.hapd:
537        conn.hapd.request("DISABLE")
538    dev[0].disconnect_and_stop_scan()
539
540@remote_compatible
541def test_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev):
542    """OCV AP mismatch (VHT80+80)"""
543    try:
544        run_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev)
545    finally:
546        set_world_reg(apdev[0], apdev[1], dev[0])
547        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
548        dev[0].flush_scan_cache()
549
550def run_wpa2_ocv_ap_vht80plus80_mismatch(dev, apdev):
551    params = {"hw_mode": "a",
552              "channel": "36",
553              "country_code": "US",
554              "ht_capab": "[HT40+]",
555              "ieee80211w": "1",
556              "ieee80211n": "1",
557              "ieee80211ac": "1",
558              "vht_oper_chwidth": "3",
559              "freq": "5180",
560              "vht_oper_centr_freq_seg0_idx": "42",
561              "ieee80211d": "1",
562              "vht_oper_centr_freq_seg1_idx": "155",
563              "ieee80211h": "1"}
564    conn = APConnection(apdev[0], dev[0], params)
565    conn.test_bad_oci("using 80 MHz operating class", 128, 36, 155)
566    conn.test_bad_oci("wrong frequency segment 1", 130, 36, 138)
567    conn.confirm_valid_oci(130, 36, 155)
568
569    dev[0].dump_monitor()
570    if conn.hapd:
571        conn.hapd.request("DISABLE")
572    dev[0].disconnect_and_stop_scan()
573
574@remote_compatible
575def test_wpa2_ocv_ap_unexpected1(dev, apdev):
576    """OCV and unexpected OCI KDE from station"""
577    params = {"channel": "1",
578              "ieee80211w": "1",
579              "ap_ocv": "0",
580              "sta_ocv": "1",
581              "freq": "2412"}
582    conn = APConnection(apdev[0], dev[0], params)
583    logger.debug("Client will send OCI KDE even if it was not negotiated")
584    conn.confirm_valid_oci(81, 1, 0)
585
586    dev[0].dump_monitor()
587    dev[0].request("DISCONNECT")
588    dev[0].wait_disconnected()
589
590@remote_compatible
591def test_wpa2_ocv_ap_unexpected2(dev, apdev):
592    """OCV and unexpected OCI KDE from station"""
593    params = {"channel": "1",
594              "ieee80211w": "1",
595              "ap_ocv": "1",
596              "sta_ocv": "0",
597              "freq": "2412"}
598    conn = APConnection(apdev[0], dev[0], params)
599    logger.debug("Client will send OCI KDE even if it was not negotiated")
600    conn.confirm_valid_oci(81, 1, 0)
601
602    dev[0].dump_monitor()
603    dev[0].request("DISCONNECT")
604    dev[0].wait_disconnected()
605
606@remote_compatible
607def test_wpa2_ocv_ap_retransmit_msg3(dev, apdev):
608    """Verify that manually retransmitted msg 3/4 contain a correct OCI"""
609    bssid = apdev[0]['bssid']
610    ssid = "test-wpa2-ocv"
611    passphrase = "qwertyuiop"
612    psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
613    params = hostapd.wpa2_params(ssid=ssid)
614    params["wpa_psk"] = psk
615    params["ieee80211w"] = "1"
616    params["ocv"] = "1"
617    params['wpa_disable_eapol_key_retries'] = "1"
618    try:
619        hapd = hostapd.add_ap(apdev[0], params)
620    except Exception as e:
621        if "Failed to set hostapd parameter ocv" in str(e):
622            raise HwsimSkip("OCV not supported")
623        raise
624    hapd.request("SET ext_eapol_frame_io 1")
625    dev[0].request("SET ext_eapol_frame_io 1")
626    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", wait_connect=False,
627                   ocv="1", ieee80211w="1")
628    addr = dev[0].own_addr()
629
630    # EAPOL-Key msg 1/4
631    ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
632    if ev is None:
633        raise Exception("Timeout on EAPOL-TX from hostapd")
634    res = dev[0].request("EAPOL_RX " + bssid + " " + ev.split(' ')[2])
635    if "OK" not in res:
636        raise Exception("EAPOL_RX to wpa_supplicant failed")
637
638    # EAPOL-Key msg 2/4
639    ev = dev[0].wait_event(["EAPOL-TX"], timeout=15)
640    if ev is None:
641        raise Exception("Timeout on EAPOL-TX from wpa_supplicant")
642    res = hapd.request("EAPOL_RX " + addr + " " + ev.split(' ')[2])
643    if "OK" not in res:
644        raise Exception("EAPOL_RX to hostapd failed")
645
646    # EAPOL-Key msg 3/4
647    ev = hapd.wait_event(["EAPOL-TX"], timeout=15)
648    if ev is None:
649        raise Exception("Timeout on EAPOL-TX from hostapd")
650    logger.info("Drop the first EAPOL-Key msg 3/4")
651
652    # Use normal EAPOL TX/RX to handle retries.
653    hapd.request("SET ext_eapol_frame_io 0")
654    dev[0].request("SET ext_eapol_frame_io 0")
655
656    # Manually retransmit EAPOL-Key msg 3/4
657    if "OK" not in hapd.request("RESEND_M3 " + addr):
658        raise Exception("RESEND_M3 failed")
659
660    dev[0].wait_connected()
661    hwsim_utils.test_connectivity(dev[0], hapd)
662
663def test_wpa2_ocv_ap_group_hs(dev, apdev):
664    """OCV group handshake (AP)"""
665    params = {"channel": "1",
666              "ieee80211w": "1",
667              "freq": "2412",
668              "wpa_strict_rekey": "1"}
669    conn = APConnection(apdev[0], dev[0], params)
670    conn.confirm_valid_oci(81, 1, 0)
671
672    conn.hapd.request("SET ext_eapol_frame_io 0")
673    dev[1].connect(conn.ssid, psk=conn.passphrase, scan_freq="2412", ocv="1",
674                   ieee80211w="1")
675    conn.hapd.wait_sta()
676    conn.hapd.request("SET ext_eapol_frame_io 1")
677
678    # Trigger a group key handshake
679    dev[1].request("DISCONNECT")
680    dev[0].dump_monitor()
681
682    # Wait for EAPOL-Key msg 1/2
683    conn.msg = recv_eapol(conn.hapd)
684    if conn.msg["rsn_key_info"] != 4994:
685        raise Exception("Didn't receive 1/2 of group key handshake")
686
687    # Send a EAPOL-Key msg 2/2 with a bad OCI
688    logger.info("Bad OCI element")
689    ocikde = make_ocikde(1, 1, 1)
690    msg = build_eapol_key_2_2(conn.kck, ocikde, replay_counter=3)
691    conn.hapd.dump_monitor()
692    send_eapol(conn.hapd, conn.addr, build_eapol(msg))
693
694    # Wait for retransmitted EAPOL-Key msg 1/2
695    conn.msg = recv_eapol(conn.hapd)
696    if conn.msg["rsn_key_info"] != 4994:
697        raise Exception("Didn't receive 1/2 of group key handshake")
698
699    # Send a EAPOL-Key msg 2/2 with a good OCI
700    logger.info("Good OCI element")
701    ocikde = make_ocikde(81, 1, 0)
702    msg = build_eapol_key_2_2(conn.kck, ocikde, replay_counter=4)
703    conn.hapd.dump_monitor()
704    send_eapol(conn.hapd, conn.addr, build_eapol(msg))
705
706    # Verify that group key handshake has completed
707    ev = conn.hapd.wait_event(["EAPOL-TX"], timeout=1)
708    if ev is not None:
709        eapol = binascii.unhexlify(ev.split(' ')[2])
710        msg = parse_eapol(eapol)
711        if msg["rsn_key_info"] == 4994:
712            raise Exception("AP didn't accept 2/2 of group key handshake")
713
714class STAConnection:
715    def init_params(self):
716        # Static parameters
717        self.ssid = "test-wpa2-ocv"
718        self.passphrase = "qwertyuiop"
719        self.psk = "c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7"
720
721        # Dynamic parameters
722        self.hapd = None
723        self.dev = None
724        self.addr = None
725        self.rsne = None
726        self.kck = None
727        self.kek = None
728        self.msg = None
729        self.bssid = None
730        self.anonce = None
731        self.snonce = None
732        self.gtkie = None
733        self.counter = None
734
735    def __init__(self, apdev, dev, params, sta_params=None):
736        self.init_params()
737        self.dev = dev
738        self.bssid = apdev['bssid']
739
740        freq = params.pop("freq")
741        if sta_params is None:
742            sta_params = dict()
743        if "ocv" not in sta_params:
744            sta_params["ocv"] = "1"
745        if "ieee80211w" not in sta_params:
746            sta_params["ieee80211w"] = "1"
747
748        params.update(hostapd.wpa2_params(ssid=self.ssid,
749                                          passphrase=self.passphrase))
750        params['wpa_pairwise_update_count'] = "10"
751
752        try:
753            self.hapd = hostapd.add_ap(apdev, params)
754        except Exception as e:
755            if "Failed to set hostapd parameter ocv" in str(e):
756                raise HwsimSkip("OCV not supported")
757            raise
758        self.hapd.request("SET ext_eapol_frame_io 1")
759        self.dev.request("SET ext_eapol_frame_io 1")
760        pmk = binascii.unhexlify("c2c6c255af836bed1b3f2f1ded98e052f5ad618bb554e2836757b55854a0eab7")
761
762        self.gtkie = binascii.unhexlify("dd16000fac010100dc11188831bf4aa4a8678d2b41498618")
763        if sta_params["ocv"] != "0":
764            self.rsne = binascii.unhexlify("30140100000fac040100000fac040100000fac028c40")
765        else:
766            self.rsne = binascii.unhexlify("30140100000fac040100000fac040100000fac028c00")
767
768        self.dev.connect(self.ssid, raw_psk=self.psk, scan_freq=freq,
769                         wait_connect=False, **sta_params)
770        if "country_code" in params:
771            self.dev.wait_regdom(country_ie=True)
772        self.addr = dev.p2p_interface_addr()
773
774        # Forward msg 1/4 from AP to STA
775        self.msg = recv_eapol(self.hapd)
776        self.anonce = self.msg['rsn_key_nonce']
777        send_eapol(self.dev, self.bssid, build_eapol(self.msg))
778
779        # Capture msg 2/4 from the STA so we can derive the session keys
780        self.msg = recv_eapol(dev)
781        self.snonce = self.msg['rsn_key_nonce']
782        (ptk, self.kck, self.kek) = pmk_to_ptk(pmk, self.addr, self.bssid,
783                                               self.snonce, self.anonce)
784
785        self.counter = struct.unpack('>Q',
786                                     self.msg['rsn_replay_counter'])[0] + 1
787
788    def test_bad_oci(self, logmsg, op_class, channel, seg1_idx, errmsg):
789        logger.info("Bad OCI element: " + logmsg)
790        if op_class is None:
791            ocikde = b''
792        else:
793            ocikde = make_ocikde(op_class, channel, seg1_idx)
794
795        plain = self.rsne + self.gtkie + ocikde
796        wrapped = aes_wrap(self.kek, pad_key_data(plain))
797        msg = build_eapol_key_3_4(self.anonce, self.kck, wrapped,
798                                  replay_counter=self.counter)
799
800        self.dev.dump_monitor()
801        send_eapol(self.dev, self.bssid, build_eapol(msg))
802        self.counter += 1
803
804        ev = self.dev.wait_event([errmsg], timeout=5)
805        if ev is None:
806            raise Exception("Bad OCI not reported")
807
808    def confirm_valid_oci(self, op_class, channel, seg1_idx):
809        logger.debug("Valid OCI element to complete handshake")
810        ocikde = make_ocikde(op_class, channel, seg1_idx)
811
812        plain = self.rsne + self.gtkie + ocikde
813        wrapped = aes_wrap(self.kek, pad_key_data(plain))
814        msg = build_eapol_key_3_4(self.anonce, self.kck, wrapped,
815                                  replay_counter=self.counter)
816
817        self.dev.dump_monitor()
818        send_eapol(self.dev, self.bssid, build_eapol(msg))
819        self.counter += 1
820
821        self.dev.wait_connected(timeout=1)
822
823@remote_compatible
824def test_wpa2_ocv_mismatch_client(dev, apdev):
825    """OCV client mismatch"""
826    params = {"channel": "1",
827              "ieee80211w": "1",
828              "ocv": "1",
829              "freq": "2412"}
830    conn = STAConnection(apdev[0], dev[0], params)
831    conn.test_bad_oci("element missing", None, 0, 0,
832                      "did not receive mandatory OCI")
833    conn.test_bad_oci("wrong channel number", 81, 6, 0,
834                      "primary channel mismatch")
835    conn.test_bad_oci("invalid channel number", 81, 0, 0,
836                      "unable to interpret received OCI")
837    conn.test_bad_oci("wrong operating class", 80, 0, 0,
838                      "unable to interpret received OCI")
839    conn.test_bad_oci("invalid operating class", 0, 0, 0,
840                      "unable to interpret received OCI")
841    conn.confirm_valid_oci(81, 1, 0)
842
843@remote_compatible
844def test_wpa2_ocv_vht160_mismatch_client(dev, apdev):
845    """OCV client mismatch (VHT160)"""
846    try:
847        run_wpa2_ocv_vht160_mismatch_client(dev, apdev)
848    finally:
849        set_world_reg(apdev[0], apdev[1], dev[0])
850        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
851        dev[0].flush_scan_cache()
852
853def run_wpa2_ocv_vht160_mismatch_client(dev, apdev):
854    params = {"hw_mode": "a",
855              "channel": "100",
856              "country_code": "ZA",
857              "ht_capab": "[HT40+]",
858              "ieee80211w": "1",
859              "ieee80211n": "1",
860              "ieee80211ac": "1",
861              "vht_oper_chwidth": "2",
862              "ocv": "1",
863              "vht_oper_centr_freq_seg0_idx": "114",
864              "freq": "5500",
865              "ieee80211d": "1",
866              "ieee80211h": "1"}
867    sta_params = {"disable_vht": "1"}
868    conn = STAConnection(apdev[0], dev[0], params, sta_params)
869    conn.test_bad_oci("smaller bandwidth (20 Mhz) than negotiated",
870                      121, 100, 0, "channel bandwidth mismatch")
871    conn.test_bad_oci("wrong frequency, bandwith, and secondary channel",
872                      123, 104, 0, "primary channel mismatch")
873    conn.test_bad_oci("wrong upper/lower behaviour",
874                      129, 104, 0, "primary channel mismatch")
875    conn.confirm_valid_oci(122, 100, 0)
876
877    dev[0].dump_monitor()
878    if conn.hapd:
879        conn.hapd.request("DISABLE")
880    dev[0].disconnect_and_stop_scan()
881
882def test_wpa2_ocv_sta_group_hs(dev, apdev):
883    """OCV group handshake (STA)"""
884    params = {"channel": "1",
885              "ieee80211w": "1",
886              "ocv": "1",
887              "freq": "2412",
888              "wpa_strict_rekey": "1"}
889    conn = STAConnection(apdev[0], dev[0], params.copy())
890    conn.confirm_valid_oci(81, 1, 0)
891
892    # Send a EAPOL-Key msg 1/2 with a bad OCI
893    logger.info("Bad OCI element")
894    plain = conn.gtkie + make_ocikde(1, 1, 1)
895    wrapped = aes_wrap(conn.kek, pad_key_data(plain))
896    msg = build_eapol_key_1_2(conn.kck, wrapped, replay_counter=3)
897    send_eapol(dev[0], conn.bssid, build_eapol(msg))
898
899    # We shouldn't get a EAPOL-Key message back
900    ev = dev[0].wait_event(["EAPOL-TX"], timeout=1)
901    if ev is not None:
902        raise Exception("Received response to invalid EAPOL-Key 1/2")
903
904    # Reset AP to try with valid OCI
905    conn.hapd.disable()
906    conn = STAConnection(apdev[0], dev[0], params.copy())
907    conn.confirm_valid_oci(81, 1, 0)
908
909    # Send a EAPOL-Key msg 1/2 with a good OCI
910    logger.info("Good OCI element")
911    plain = conn.gtkie + make_ocikde(81, 1, 0)
912    wrapped = aes_wrap(conn.kek, pad_key_data(plain))
913    msg = build_eapol_key_1_2(conn.kck, wrapped, replay_counter=4)
914    send_eapol(dev[0], conn.bssid, build_eapol(msg))
915
916    # Wait for EAPOL-Key msg 2/2
917    conn.msg = recv_eapol(dev[0])
918    if conn.msg["rsn_key_info"] != 0x0302:
919        raise Exception("Didn't receive 2/2 of group key handshake")
920
921def test_wpa2_ocv_auto_enable_pmf(dev, apdev):
922    """OCV on 2.4 GHz with PMF getting enabled automatically"""
923    params = {"channel": "1",
924              "ocv": "1"}
925    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
926    for ocv in range(2):
927        dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv=str(ocv),
928                       ieee80211w="2")
929        dev[0].request("REMOVE_NETWORK all")
930        dev[0].wait_disconnected()
931
932def test_wpa2_ocv_auto_enable_pmf_on_sta(dev, apdev):
933    """OCV on 2.4 GHz with PMF getting enabled automatically on STA"""
934    params = {"channel": "1",
935              "ieee80211w": "1",
936              "ocv": "1"}
937    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
938    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1")
939
940def test_wpa2_ocv_sta_override_eapol(dev, apdev):
941    """OCV on 2.4 GHz and STA override EAPOL-Key msg 2/4"""
942    params = {"channel": "1",
943              "ieee80211w": "2",
944              "ocv": "1"}
945    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
946    dev[0].set("oci_freq_override_eapol", "2462")
947    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
948                   ieee80211w="2", wait_connect=False)
949    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
950                            "CTRL-EVENT-DISCONNECTED"], timeout=15)
951    dev[0].request("DISCONNECT")
952    if ev is None:
953        raise Exception("No connection result reported")
954    if "CTRL-EVENT-CONNECTED" in ev:
955        raise Exception("Unexpected connection")
956    if "reason=15" not in ev:
957        raise Exception("Unexpected disconnection reason: " + ev)
958
959    check_ocv_failure(hapd, "EAPOL-Key msg 2/4", "eapol-key-m2",
960                      dev[0].own_addr())
961
962def test_wpa2_ocv_sta_override_sa_query_req(dev, apdev):
963    """OCV on 2.4 GHz and STA override SA Query Request"""
964    params = {"channel": "1",
965              "ieee80211w": "2",
966              "ocv": "1"}
967    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
968    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
969                   ieee80211w="2")
970    hapd.wait_sta()
971    dev[0].set("oci_freq_override_saquery_req", "2462")
972    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
973        raise Exception("Triggering SA Query from the STA failed")
974    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=3)
975    if ev is None:
976        raise Exception("Disconnection after failed SA Query not reported")
977    dev[0].set("oci_freq_override_saquery_req", "0")
978    dev[0].wait_connected()
979    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
980        raise Exception("Triggering SA Query from the STA failed")
981    check_ocv_failure(hapd, "SA Query Request", "saqueryreq",
982                      dev[0].own_addr())
983    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=3)
984    if ev is not None:
985        raise Exception("SA Query from the STA failed")
986
987def test_wpa2_ocv_sta_override_sa_query_resp(dev, apdev):
988    """OCV on 2.4 GHz and STA override SA Query Response"""
989    params = {"channel": "1",
990              "ieee80211w": "2",
991              "ocv": "1"}
992    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
993    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
994                   ieee80211w="2")
995    dev[0].set("oci_freq_override_saquery_resp", "2462")
996    hapd.wait_sta(wait_4way_hs=True)
997    if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
998        raise Exception("SA_QUERY failed")
999    check_ocv_failure(hapd, "SA Query Response", "saqueryresp",
1000                      dev[0].own_addr())
1001
1002def check_ocv_failure(dev, frame_txt, frame, addr):
1003    ev = dev.wait_event(["OCV-FAILURE"], timeout=3)
1004    if ev is None:
1005        raise Exception("OCV failure for %s not reported" % frame_txt)
1006    if "addr=" + addr not in ev:
1007        raise Exception("Unexpected OCV failure addr: " + ev)
1008    if "frame=" + frame not in ev:
1009        raise Exception("Unexpected OCV failure frame: " + ev)
1010    if "error=primary channel mismatch" not in ev:
1011        raise Exception("Unexpected OCV failure error: " + ev)
1012
1013def test_wpa2_ocv_ap_override_eapol_m3(dev, apdev):
1014    """OCV on 2.4 GHz and AP override EAPOL-Key msg 3/4"""
1015    run_wpa2_ocv_ap_override_eapol_m3(dev, apdev)
1016
1017def test_wpa2_ocv_ap_override_eapol_m3_post_enable(dev, apdev):
1018    """OCV on 2.4 GHz and AP override EAPOL-Key msg 3/4 (post enable)"""
1019    run_wpa2_ocv_ap_override_eapol_m3(dev, apdev, True)
1020
1021def run_wpa2_ocv_ap_override_eapol_m3(dev, apdev, post_enable=False):
1022    params = {"channel": "1",
1023              "ieee80211w": "2",
1024              "ocv": "1"}
1025    if not post_enable:
1026        params["oci_freq_override_eapol_m3"] = "2462"
1027    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1028    bssid = hapd.own_addr()
1029    if post_enable:
1030        hapd.set("oci_freq_override_eapol_m3", "2462")
1031    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1032                   ieee80211w="2", wait_connect=False)
1033
1034    check_ocv_failure(dev[0], "EAPOL-Key msg 3/4", "eapol-key-m3", bssid)
1035
1036    ev = dev[0].wait_disconnected()
1037    if "reason=15" not in ev:
1038        raise Exception("Unexpected disconnection reason: " + ev)
1039
1040def test_wpa2_ocv_ap_override_eapol_g1(dev, apdev):
1041    """OCV on 2.4 GHz and AP override EAPOL-Key group msg 1/2"""
1042    run_wpa2_ocv_ap_override_eapol_g1(dev, apdev)
1043
1044def test_wpa2_ocv_ap_override_eapol_g1_post_enable(dev, apdev):
1045    """OCV on 2.4 GHz and AP override EAPOL-Key group msg 1/2 (post enable)"""
1046    run_wpa2_ocv_ap_override_eapol_g1(dev, apdev, True)
1047
1048def run_wpa2_ocv_ap_override_eapol_g1(dev, apdev, post_enable=False):
1049    params = {"channel": "1",
1050              "ieee80211w": "2",
1051              "ocv": "1"}
1052    if not post_enable:
1053        params["oci_freq_override_eapol_g1"] = "2462"
1054    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1055    bssid = hapd.own_addr()
1056    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1057                   ieee80211w="2")
1058
1059    if post_enable:
1060        hapd.set("oci_freq_override_eapol_g1", "2462")
1061    if "OK" not in hapd.request("REKEY_GTK"):
1062        raise Exception("REKEY_GTK failed")
1063    check_ocv_failure(dev[0], "EAPOL-Key group msg 1/2", "eapol-key-g1", bssid)
1064
1065def test_wpa2_ocv_ap_override_saquery_req(dev, apdev):
1066    """OCV on 2.4 GHz and AP override SA Query Request"""
1067    params = {"channel": "1",
1068              "ieee80211w": "2",
1069              "ocv": "1",
1070              "oci_freq_override_saquery_req": "2462"}
1071    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1072    bssid = hapd.own_addr()
1073    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1074                   ieee80211w="2")
1075    hapd.wait_sta(wait_4way_hs=True)
1076
1077    if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
1078        raise Exception("SA_QUERY failed")
1079    check_ocv_failure(dev[0], "SA Query Request", "saqueryreq", bssid)
1080
1081def test_wpa2_ocv_ap_override_saquery_resp(dev, apdev):
1082    """OCV on 2.4 GHz and AP override SA Query Response"""
1083    params = {"channel": "1",
1084              "ieee80211w": "2",
1085              "ocv": "1",
1086              "oci_freq_override_saquery_resp": "2462"}
1087    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1088    bssid = hapd.own_addr()
1089    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="1",
1090                   ieee80211w="2")
1091
1092    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
1093        raise Exception("Triggering SA Query from the STA failed")
1094    check_ocv_failure(dev[0], "SA Query Response", "saqueryresp", bssid)
1095
1096def test_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params):
1097    """OCV on 2.4 GHz and AP override FILS association"""
1098    run_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params)
1099
1100def test_wpa2_ocv_ap_override_fils_assoc_post_enable(dev, apdev, params):
1101    """OCV on 2.4 GHz and AP override FILS association (post enable)"""
1102    run_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params, True)
1103
1104def run_wpa2_ocv_ap_override_fils_assoc(dev, apdev, params, post_enable=False):
1105    check_fils_capa(dev[0])
1106    check_erp_capa(dev[0])
1107
1108    start_erp_as(msk_dump=os.path.join(params['logdir'], "msk.lst"))
1109
1110    bssid = apdev[0]['bssid']
1111    ssid = "test-wpa2-ocv"
1112    params = hostapd.wpa2_eap_params(ssid=ssid)
1113    params['wpa_key_mgmt'] = "FILS-SHA256"
1114    params['auth_server_port'] = "18128"
1115    params['erp_send_reauth_start'] = '1'
1116    params['erp_domain'] = 'example.com'
1117    params['fils_realm'] = 'example.com'
1118    params['wpa_group_rekey'] = '1'
1119    params["ieee80211w"] = "2"
1120    params["ocv"] = "1"
1121    if not post_enable:
1122        params["oci_freq_override_fils_assoc"] = "2462"
1123    try:
1124        hapd = hostapd.add_ap(apdev[0], params)
1125    except Exception as e:
1126        if "Failed to set hostapd parameter ocv" in str(e):
1127            raise HwsimSkip("OCV not supported")
1128        raise
1129    bssid = hapd.own_addr()
1130    if post_enable:
1131        hapd.set("oci_freq_override_fils_assoc", "2462")
1132    dev[0].request("ERP_FLUSH")
1133    id = dev[0].connect(ssid, key_mgmt="FILS-SHA256",
1134                        eap="PSK", identity="psk.user@example.com",
1135                        password_hex="0123456789abcdef0123456789abcdef",
1136                        erp="1", scan_freq="2412", ocv="1", ieee80211w="2")
1137
1138    dev[0].request("DISCONNECT")
1139    dev[0].wait_disconnected()
1140
1141    dev[0].dump_monitor()
1142    dev[0].select_network(id, freq=2412)
1143
1144    check_ocv_failure(dev[0], "FILS Association Response", "fils-assoc", bssid)
1145    dev[0].request("DISCONNECT")
1146
1147def test_wpa2_ocv_ap_override_ft_assoc(dev, apdev):
1148    """OCV on 2.4 GHz and AP override FT reassociation"""
1149    run_wpa2_ocv_ap_override_ft_assoc(dev, apdev)
1150
1151def test_wpa2_ocv_ap_override_ft_assoc_post_enable(dev, apdev):
1152    """OCV on 2.4 GHz and AP override FT reassociation (post enable)"""
1153    run_wpa2_ocv_ap_override_ft_assoc(dev, apdev, True)
1154
1155def run_wpa2_ocv_ap_override_ft_assoc(dev, apdev, post_enable=False):
1156    ssid = "test-wpa2-ocv"
1157    passphrase = "qwertyuiop"
1158    params = ft_params1(ssid=ssid, passphrase=passphrase)
1159    params["ieee80211w"] = "2"
1160    params["ocv"] = "1"
1161    if not post_enable:
1162        params["oci_freq_override_ft_assoc"] = "2462"
1163    try:
1164        hapd0 = hostapd.add_ap(apdev[0], params)
1165    except Exception as e:
1166        if "Failed to set hostapd parameter ocv" in str(e):
1167            raise HwsimSkip("OCV not supported")
1168        raise
1169    params = ft_params2(ssid=ssid, passphrase=passphrase)
1170    params["ieee80211w"] = "2"
1171    params["ocv"] = "1"
1172    if not post_enable:
1173        params["oci_freq_override_ft_assoc"] = "2462"
1174    hapd1 = hostapd.add_ap(apdev[1], params)
1175
1176    if post_enable:
1177        hapd0.set("oci_freq_override_ft_assoc", "2462")
1178        hapd1.set("oci_freq_override_ft_assoc", "2462")
1179
1180    dev[0].connect(ssid, key_mgmt="FT-PSK", psk=passphrase,
1181                   scan_freq="2412", ocv="1", ieee80211w="2")
1182
1183    bssid = dev[0].get_status_field("bssid")
1184    bssid0 = hapd0.own_addr()
1185    bssid1 = hapd1.own_addr()
1186    target = bssid0 if bssid == bssid1 else bssid1
1187
1188    dev[0].scan_for_bss(target, freq="2412")
1189    if "OK" not in dev[0].request("ROAM " + target):
1190        raise Exception("ROAM failed")
1191
1192    check_ocv_failure(dev[0], "FT Reassociation Response", "ft-assoc", target)
1193    dev[0].request("DISCONNECT")
1194
1195@remote_compatible
1196def test_wpa2_ocv_no_pmf(dev, apdev):
1197    """OCV on 2.4 GHz and no PMF on STA"""
1198    params = {"channel": "1",
1199              "ieee80211w": "1",
1200              "ocv": "1"}
1201    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1202    ie = "301a0100000fac040100000fac040100000fac0200400000000fac06"
1203    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1204        raise Exception("Could not set TEST_ASSOC_IE")
1205    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1206                   ieee80211w="0", wait_connect=False)
1207    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-ASSOC-REJECT"],
1208                           timeout=10)
1209    dev[0].request("DISCONNECT")
1210    if ev is None:
1211        raise Exception("No connection result seen")
1212    if "CTRL-EVENT-CONNECTED" in ev:
1213        raise Exception("Unexpected connection")
1214    if "status_code=31" not in ev:
1215        raise Exception("Unexpected status code: " + ev)
1216
1217@remote_compatible
1218def test_wpa2_ocv_no_pmf_workaround(dev, apdev):
1219    """OCV on 2.4 GHz and no PMF on STA with workaround"""
1220    params = {"channel": "1",
1221              "ieee80211w": "1",
1222              "ocv": "2"}
1223    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1224    ie = "301a0100000fac040100000fac040100000fac0200400000000fac06"
1225    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1226        raise Exception("Could not set TEST_ASSOC_IE")
1227    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1228                   ieee80211w="0")
1229
1230@remote_compatible
1231def test_wpa2_ocv_no_oci(dev, apdev):
1232    """OCV on 2.4 GHz and no OCI from STA"""
1233    params = {"channel": "1",
1234              "ieee80211w": "1",
1235              "ocv": "1"}
1236    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1237    ie = "301a0100000fac040100000fac040100000fac0280400000000fac06"
1238    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1239        raise Exception("Could not set TEST_ASSOC_IE")
1240    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1241                   ieee80211w="1", wait_connect=False)
1242    ev = hapd.wait_event(["OCV-FAILURE"], timeout=10)
1243    if ev is None:
1244        raise Exception("No OCV failure reported")
1245    if "frame=eapol-key-m2 error=did not receive mandatory OCI" not in ev:
1246        raise Exception("Unexpected error: " + ev)
1247    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
1248                            "WPA: 4-Way Handshake failed"], timeout=10)
1249    dev[0].request("DISCONNECT")
1250    if "CTRL-EVENT-CONNECTED" in ev:
1251        raise Exception("Unexpected connection")
1252    if ev is None:
1253        raise Exception("4-way handshake failure not reported")
1254
1255@remote_compatible
1256def test_wpa2_ocv_no_oci_workaround(dev, apdev):
1257    """OCV on 2.4 GHz and no OCI from STA with workaround"""
1258    params = {"channel": "1",
1259              "ieee80211w": "1",
1260              "ocv": "2"}
1261    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1262    ie = "301a0100000fac040100000fac040100000fac0280400000000fac06"
1263    if "OK" not in dev[0].request("TEST_ASSOC_IE " + ie):
1264        raise Exception("Could not set TEST_ASSOC_IE")
1265    dev[0].connect(ssid, psk=passphrase, scan_freq="2412", ocv="0",
1266                   ieee80211w="1")
1267
1268def test_wpa2_ocv_without_pmf(dev, apdev):
1269    """OCV without PMF"""
1270    params = {"channel": "6",
1271              "ieee80211n": "1",
1272              "ieee80211w": "1",
1273              "ocv": "1"}
1274    hapd, ssid, passphrase = ocv_setup_ap(apdev[0], params)
1275    hapd.disable()
1276    hapd.set("ieee80211w", "0")
1277    if "FAIL" not in hapd.request("ENABLE"):
1278        raise Exception("OCV without PMF accepted")
1279