1# Hotspot 2.0 filtering tests
2# Copyright (c) 2015, Intel Deutschland GmbH
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import hostapd
8import hwsim_utils
9import socket
10import subprocess
11import binascii
12from utils import HwsimSkip, require_under_vm
13import os
14import time
15from test_ap_hs20 import build_arp, build_na, hs20_ap_params
16from test_ap_hs20 import interworking_select, interworking_connect
17import struct
18import logging
19logger = logging.getLogger()
20
21class IPAssign(object):
22    def __init__(self, iface, addr, ipv6=False):
23        self._iface = iface
24        self._addr = addr
25        self._cmd = ['ip']
26        if ipv6:
27            self._cmd.append('-6')
28        self._cmd.append('addr')
29        self._ipv6 = ipv6
30    def __enter__(self):
31        subprocess.call(self._cmd + ['add', self._addr, 'dev', self._iface])
32        if self._ipv6:
33            # wait for DAD to finish
34            while True:
35                o = subprocess.check_output(self._cmd + ['show', 'tentative', 'dev', self._iface]).decode()
36                if self._addr not in o:
37                    break
38                time.sleep(0.1)
39    def __exit__(self, type, value, traceback):
40        subprocess.call(self._cmd + ['del', self._addr, 'dev', self._iface])
41
42def hs20_filters_connect(dev, apdev, disable_dgaf=False, proxy_arp=False):
43    bssid = apdev[0]['bssid']
44    params = hs20_ap_params()
45    params['hessid'] = bssid
46
47    # Do not disable dgaf, to test that the station drops unicast IP packets
48    # encrypted with GTK.
49    params['disable_dgaf'] = '0'
50    params['proxy_arp'] = '1'
51    params['ap_isolate'] = '1'
52    params['bridge'] = 'ap-br0'
53
54    try:
55        hapd = hostapd.add_ap(apdev[0], params)
56    except:
57        # For now, do not report failures due to missing kernel support.
58        raise HwsimSkip("Could not start hostapd - assume proxyarp not supported in the kernel")
59
60    subprocess.call(['brctl', 'setfd', 'ap-br0', '0'])
61    subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
62
63    dev[0].hs20_enable()
64
65    id = dev[0].add_cred_values({'realm': "example.com",
66                                 'username': "hs20-test",
67                                 'password': "password",
68                                 'ca_cert': "auth_serv/ca.pem",
69                                 'domain': "example.com",
70                                 'update_identifier': "1234"})
71    interworking_select(dev[0], bssid, "home", freq="2412")
72    interworking_connect(dev[0], bssid, "TTLS")
73
74    time.sleep(0.1)
75
76    return dev[0], hapd
77
78def _test_ip4_gtk_drop(devs, apdevs, params, dst):
79    require_under_vm()
80    procfile = '/proc/sys/net/ipv4/conf/%s/drop_unicast_in_l2_multicast' % devs[0].ifname
81    if not os.path.exists(procfile):
82        raise HwsimSkip("kernel doesn't have capability")
83
84    [dev, hapd] = hs20_filters_connect(devs, apdevs)
85    with IPAssign(dev.ifname, '10.0.0.1/24'):
86        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
87        s.bind(("10.0.0.1", 12345))
88        s.settimeout(0.1)
89
90        pkt = dst
91        pkt += hapd.own_addr().replace(':', '')
92        pkt += '0800'
93        pkt += '45000020786840004011ae600a0000040a000001'
94        pkt += '30393039000c0000'
95        pkt += '61736466' # "asdf"
96        if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
97            raise Exception("DATA_TEST_FRAME failed")
98        try:
99            logger.info(s.recvfrom(1024))
100            logger.info("procfile=" + procfile + " val=" + open(procfile, 'r').read().rstrip())
101            raise Exception("erroneously received frame!")
102        except socket.timeout:
103            # this is the expected behaviour
104            pass
105
106def test_ip4_gtk_drop_bcast(devs, apdevs, params):
107    """Hotspot 2.0 frame filtering - IPv4 GTK drop broadcast"""
108    _test_ip4_gtk_drop(devs, apdevs, params, dst='ffffffffffff')
109
110def test_ip4_gtk_drop_mcast(devs, apdevs, params):
111    """Hotspot 2.0 frame filtering - IPv4 GTK drop multicast"""
112    _test_ip4_gtk_drop(devs, apdevs, params, dst='ff0000000000')
113
114def _test_ip6_gtk_drop(devs, apdevs, params, dst):
115    require_under_vm()
116    dev = devs[0]
117    procfile = '/proc/sys/net/ipv6/conf/%s/drop_unicast_in_l2_multicast' % devs[0].ifname
118    if not os.path.exists(procfile):
119        raise HwsimSkip("kernel doesn't have capability")
120
121    [dev, hapd] = hs20_filters_connect(devs, apdevs)
122
123    with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True):
124        s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
125        s.bind(("fdaa::1", 12345))
126        s.settimeout(0.1)
127
128        pkt = dst
129        pkt += hapd.own_addr().replace(':', '')
130        pkt += '86dd'
131        pkt += '60000000000c1140fdaa0000000000000000000000000002fdaa0000000000000000000000000001'
132        pkt += '30393039000cde31'
133        pkt += '61736466' # "asdf"
134        if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
135            raise Exception("DATA_TEST_FRAME failed")
136        try:
137            logger.info(s.recvfrom(1024))
138            logger.info("procfile=" + procfile + " val=" + open(procfile, 'r').read().rstrip())
139            raise Exception("erroneously received frame!")
140        except socket.timeout:
141            # this is the expected behaviour
142            pass
143
144def test_ip6_gtk_drop_bcast(devs, apdevs, params):
145    """Hotspot 2.0 frame filtering - IPv6 GTK drop broadcast"""
146    _test_ip6_gtk_drop(devs, apdevs, params, dst='ffffffffffff')
147
148def test_ip6_gtk_drop_mcast(devs, apdevs, params):
149    """Hotspot 2.0 frame filtering - IPv6 GTK drop multicast"""
150    _test_ip6_gtk_drop(devs, apdevs, params, dst='ff0000000000')
151
152def test_ip4_drop_gratuitous_arp(devs, apdevs, params):
153    """Hotspot 2.0 frame filtering - IPv4 drop gratuitous ARP"""
154    require_under_vm()
155    procfile = '/proc/sys/net/ipv4/conf/%s/drop_gratuitous_arp' % devs[0].ifname
156    if not os.path.exists(procfile):
157        raise HwsimSkip("kernel doesn't have capability")
158
159    [dev, hapd] = hs20_filters_connect(devs, apdevs)
160
161    with IPAssign(dev.ifname, '10.0.0.2/24'):
162        # add an entry that can be updated by gratuitous ARP
163        subprocess.call(['ip', 'neigh', 'add', '10.0.0.1', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
164        # wait for lock-time
165        time.sleep(1)
166        try:
167            ap_addr = hapd.own_addr()
168            cl_addr = dev.own_addr()
169            pkt = build_arp(cl_addr, ap_addr, 2, ap_addr, '10.0.0.1', ap_addr, '10.0.0.1')
170            pkt = binascii.hexlify(pkt).decode()
171
172            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
173                raise Exception("DATA_TEST_FRAME failed")
174
175            # Wait for frames to be processed
176            time.sleep(0.1)
177
178            if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']).decode():
179                raise Exception("gratuitous ARP frame updated erroneously")
180        finally:
181            subprocess.call(['ip', 'neigh', 'del', '10.0.0.1', 'dev', dev.ifname])
182
183def test_ip6_drop_unsolicited_na(devs, apdevs, params):
184    """Hotspot 2.0 frame filtering - IPv6 drop unsolicited NA"""
185    require_under_vm()
186    procfile = '/proc/sys/net/ipv6/conf/%s/drop_unsolicited_na' % devs[0].ifname
187    if not os.path.exists(procfile):
188        raise HwsimSkip("kernel doesn't have capability")
189
190    [dev, hapd] = hs20_filters_connect(devs, apdevs)
191
192    with IPAssign(dev.ifname, 'fdaa::1/48', ipv6=True):
193        # add an entry that can be updated by unsolicited NA
194        subprocess.call(['ip', '-6', 'neigh', 'add', 'fdaa::2', 'lladdr', '02:00:00:00:00:ff', 'nud', 'reachable', 'dev', dev.ifname])
195        try:
196            ap_addr = hapd.own_addr()
197            cl_addr = dev.own_addr()
198            pkt = build_na(ap_addr, 'fdaa::2', 'ff02::1', 'fdaa::2', flags=0x20,
199                           opt=binascii.unhexlify('0201' + ap_addr.replace(':', '')))
200            pkt = binascii.hexlify(pkt).decode()
201
202            if "OK" not in hapd.request('DATA_TEST_FRAME ' + pkt):
203                raise Exception("DATA_TEST_FRAME failed")
204
205            # Wait for frames to be processed
206            time.sleep(0.1)
207
208            if hapd.own_addr() in subprocess.check_output(['ip', 'neigh', 'show']).decode():
209                raise Exception("unsolicited NA frame updated erroneously")
210        finally:
211            subprocess.call(['ip', '-6', 'neigh', 'del', 'fdaa::2', 'dev', dev.ifname])
212