1# Hotspot 2.0 filtering tests 2# Copyright (c) 2015, Intel Deutschland GmbH 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import hostapd 8import hwsim_utils 9import socket 10import subprocess 11import binascii 12from utils import HwsimSkip, require_under_vm 13import os 14import time 15from test_ap_hs20 import build_arp, build_na, hs20_ap_params 16from test_ap_hs20 import interworking_select, interworking_connect 17import struct 18import logging 19logger = logging.getLogger() 20 21class IPAssign(object): 22 def __init__(self, iface, addr, ipv6=False): 23 self._iface = iface 24 self._addr = addr 25 self._cmd = ['ip'] 26 if ipv6: 27 self._cmd.append('-6') 28 self._cmd.append('addr') 29 self._ipv6 = ipv6 30 def __enter__(self): 31 subprocess.call(self._cmd + ['add', self._addr, 'dev', self._iface]) 32 if self._ipv6: 33 # wait for DAD to finish 34 while True: 35 o = subprocess.check_output(self._cmd + ['show', 'tentative', 'dev', self._iface]).decode() 36 if self._addr not in o: 37 break 38 time.sleep(0.1) 39 def __exit__(self, type, value, traceback): 40 subprocess.call(self._cmd + ['del', self._addr, 'dev', self._iface]) 41 42def hs20_filters_connect(dev, apdev, disable_dgaf=False, proxy_arp=False): 43 bssid = apdev[0]['bssid'] 44 params = hs20_ap_params() 45 params['hessid'] = bssid 46 47 # Do not disable dgaf, to test that the station drops unicast IP packets 48 # encrypted with GTK. 49 params['disable_dgaf'] = '0' 50 params['proxy_arp'] = '1' 51 params['ap_isolate'] = '1' 52 params['bridge'] = 'ap-br0' 53 54 try: 55 hapd = hostapd.add_ap(apdev[0], params) 56 except: 57 # For now, do not report failures due to missing kernel support. 58 raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in the kernel") 59 60 subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) 61 subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) 62 63 dev[0].hs20_enable() 64 65 id = dev[0].add_cred_values({'realm': "example.com", 66 'username': "hs20-test", 67 'password': "password", 68 'ca_cert': "auth_serv/ca.pem", 69 'domain': "example.com", 70 'update_identifier': "1234"}) 71 interworking_select(dev[0], bssid, "home", freq="2412") 72 interworking_connect(dev[0], bssid, "TTLS") 73 74 time.sleep(0.1) 75 76 return dev[0], hapd 77 78def _test_ip4_gtk_drop(devs, apdevs, params, dst): 79 require_under_vm() 80 procfile = '/proc/sys/net/ipv4/conf/%s/drop_unicast_in_l2_multicast' % devs[0].ifname 81 if not os.path.exists(procfile): 82 raise HwsimSkip("kernel doesn't have capability") 83 84 [dev, hapd] = hs20_filters_connect(devs, apdevs) 85 with IPAssign(dev.ifname, '10.0.0.1/24'): 86 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 87 s.bind(("10.0.0.1", 12345)) 88 s.settimeout(0.1) 89 90 pkt = dst 91 pkt += hapd.own_addr().replace(':', '') 92 pkt += '0800' 93 pkt += '45000020786840004011ae600a0000040a000001' 94 pkt += '30393039000c0000' 95 pkt += '61736466' # "asdf" 96 if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt): 97 raise Exception("DATA_TEST_FRAME failed") 98 try: 99 logger.info(s.recvfrom(1024)) 100 logger.info("procfile=" + procfile + " val=" + open(procfile, 'r').read().rstrip()) 101 raise Exception("erroneously received frame!") 102 except socket.timeout: 103 # this is the expected behaviour 104 pass 105 106def test_ip4_gtk_drop_bcast(devs, apdevs, params): 107 """Hotspot 2.0 frame filtering - IPv4 GTK drop broadcast""" 108 _test_ip4_gtk_drop(devs, apdevs, params, dst='ffffffffffff') 109 110def test_ip4_gtk_drop_mcast(devs, apdevs, params): 111 """Hotspot 2.0 frame filtering - IPv4 GTK drop multicast""" 112 _test_ip4_gtk_drop(devs, apdevs, params, dst='ff0000000000') 113 114def _test_ip6_gtk_drop(devs, apdevs, params, dst): 115 require_under_vm() 116 dev = devs[0] 117 procfile = '/proc/sys/net/ipv6/conf/%s/drop_unicast_in_l2_multicast' % devs[0].ifname 118 if not os.path.exists(procfile): 119 raise HwsimSkip("kernel doesn't have capability") 120 121 [dev, hapd] = hs20_filters_connect(devs, apdevs) 122 123 with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True): 124 s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 125 s.bind(("fdaa::1", 12345)) 126 s.settimeout(0.1) 127 128 pkt = dst 129 pkt += hapd.own_addr().replace(':', '') 130 pkt += '86dd' 131 pkt += '60000000000c1140fdaa0000000000000000000000000002fdaa0000000000000000000000000001' 132 pkt += '30393039000cde31' 133 pkt += '61736466' # "asdf" 134 if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt): 135 raise Exception("DATA_TEST_FRAME failed") 136 try: 137 logger.info(s.recvfrom(1024)) 138 logger.info("procfile=" + procfile + " val=" + open(procfile, 'r').read().rstrip()) 139 raise Exception("erroneously received frame!") 140 except socket.timeout: 141 # this is the expected behaviour 142 pass 143 144def test_ip6_gtk_drop_bcast(devs, apdevs, params): 145 """Hotspot 2.0 frame filtering - IPv6 GTK drop broadcast""" 146 _test_ip6_gtk_drop(devs, apdevs, params, dst='ffffffffffff') 147 148def test_ip6_gtk_drop_mcast(devs, apdevs, params): 149 """Hotspot 2.0 frame filtering - IPv6 GTK drop multicast""" 150 _test_ip6_gtk_drop(devs, apdevs, params, dst='ff0000000000') 151 152def test_ip4_drop_gratuitous_arp(devs, apdevs, params): 153 """Hotspot 2.0 frame filtering - IPv4 drop gratuitous ARP""" 154 require_under_vm() 155 procfile = '/proc/sys/net/ipv4/conf/%s/drop_gratuitous_arp' % devs[0].ifname 156 if not os.path.exists(procfile): 157 raise HwsimSkip("kernel doesn't have capability") 158 159 [dev, hapd] = hs20_filters_connect(devs, apdevs) 160 161 with IPAssign(dev.ifname, '10.0.0.2/24'): 162 # add an entry that can be updated by gratuitous ARP 163 subprocess.call(['ip', 'neigh', 'add', '10.0.0.1', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname]) 164 # wait for lock-time 165 time.sleep(1) 166 try: 167 ap_addr = hapd.own_addr() 168 cl_addr = dev.own_addr() 169 pkt = build_arp(cl_addr, ap_addr, 2, ap_addr, '10.0.0.1', ap_addr, '10.0.0.1') 170 pkt = binascii.hexlify(pkt).decode() 171 172 if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt): 173 raise Exception("DATA_TEST_FRAME failed") 174 175 # Wait for frames to be processed 176 time.sleep(0.1) 177 178 if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']).decode(): 179 raise Exception("gratuitous ARP frame updated erroneously") 180 finally: 181 subprocess.call(['ip', 'neigh', 'del', '10.0.0.1', 'dev', dev.ifname]) 182 183def test_ip6_drop_unsolicited_na(devs, apdevs, params): 184 """Hotspot 2.0 frame filtering - IPv6 drop unsolicited NA""" 185 require_under_vm() 186 procfile = '/proc/sys/net/ipv6/conf/%s/drop_unsolicited_na' % devs[0].ifname 187 if not os.path.exists(procfile): 188 raise HwsimSkip("kernel doesn't have capability") 189 190 [dev, hapd] = hs20_filters_connect(devs, apdevs) 191 192 with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True): 193 # add an entry that can be updated by unsolicited NA 194 subprocess.call(['ip', '-6', 'neigh', 'add', 'fdaa::2', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname]) 195 try: 196 ap_addr = hapd.own_addr() 197 cl_addr = dev.own_addr() 198 pkt = build_na(ap_addr, 'fdaa::2', 'ff02::1', 'fdaa::2', flags=0x20, 199 opt=binascii.unhexlify('0201' + ap_addr.replace(':', ''))) 200 pkt = binascii.hexlify(pkt).decode() 201 202 if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt): 203 raise Exception("DATA_TEST_FRAME failed") 204 205 # Wait for frames to be processed 206 time.sleep(0.1) 207 208 if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']).decode(): 209 raise Exception("unsolicited NA frame updated erroneously") 210 finally: 211 subprocess.call(['ip', '-6', 'neigh', 'del', 'fdaa::2', 'dev', dev.ifname]) 212