1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""Run the tools/testing/selftests/net/csum testsuite."""
5
6from os import path
7
8from lib.py import ksft_run, ksft_exit, KsftSkipEx
9from lib.py import EthtoolFamily, NetDrvEpEnv
10from lib.py import bkg, cmd, wait_port_listen
11
12def test_receive(cfg, ipv4=False, extra_args=None):
13    """Test local nic checksum receive. Remote host sends crafted packets."""
14    if not cfg.have_rx_csum:
15        raise KsftSkipEx(f"Test requires rx checksum offload on {cfg.ifname}")
16
17    if ipv4:
18        ip_args = f"-4 -S {cfg.remote_v4} -D {cfg.v4}"
19    else:
20        ip_args = f"-6 -S {cfg.remote_v6} -D {cfg.v6}"
21
22    rx_cmd = f"{cfg.bin_local} -i {cfg.ifname} -n 100 {ip_args} -r 1 -R {extra_args}"
23    tx_cmd = f"{cfg.bin_remote} -i {cfg.ifname} -n 100 {ip_args} -r 1 -T {extra_args}"
24
25    with bkg(rx_cmd, exit_wait=True):
26        wait_port_listen(34000, proto="udp")
27        cmd(tx_cmd, host=cfg.remote)
28
29
30def test_transmit(cfg, ipv4=False, extra_args=None):
31    """Test local nic checksum transmit. Remote host verifies packets."""
32    if (not cfg.have_tx_csum_generic and
33        not (cfg.have_tx_csum_ipv4 and ipv4) and
34        not (cfg.have_tx_csum_ipv6 and not ipv4)):
35        raise KsftSkipEx(f"Test requires tx checksum offload on {cfg.ifname}")
36
37    if ipv4:
38        ip_args = f"-4 -S {cfg.v4} -D {cfg.remote_v4}"
39    else:
40        ip_args = f"-6 -S {cfg.v6} -D {cfg.remote_v6}"
41
42    # Cannot randomize input when calculating zero checksum
43    if extra_args != "-U -Z":
44        extra_args += " -r 1"
45
46    rx_cmd = f"{cfg.bin_remote} -i {cfg.ifname} -L 1 -n 100 {ip_args} -R {extra_args}"
47    tx_cmd = f"{cfg.bin_local} -i {cfg.ifname} -L 1 -n 100 {ip_args} -T {extra_args}"
48
49    with bkg(rx_cmd, host=cfg.remote, exit_wait=True):
50        wait_port_listen(34000, proto="udp", host=cfg.remote)
51        cmd(tx_cmd)
52
53
54def test_builder(name, cfg, ipv4=False, tx=False, extra_args=""):
55    """Construct specific tests from the common template.
56
57       Most tests follow the same basic pattern, differing only in
58       Direction of the test and optional flags passed to csum."""
59    def f(cfg):
60        if ipv4:
61            cfg.require_v4()
62        else:
63            cfg.require_v6()
64
65        if tx:
66            test_transmit(cfg, ipv4, extra_args)
67        else:
68            test_receive(cfg, ipv4, extra_args)
69
70    if ipv4:
71        f.__name__ = "ipv4_" + name
72    else:
73        f.__name__ = "ipv6_" + name
74    return f
75
76
77def check_nic_features(cfg) -> None:
78    """Test whether Tx and Rx checksum offload are enabled.
79
80       If the device under test has either off, then skip the relevant tests."""
81    cfg.have_tx_csum_generic = False
82    cfg.have_tx_csum_ipv4 = False
83    cfg.have_tx_csum_ipv6 = False
84    cfg.have_rx_csum = False
85
86    ethnl = EthtoolFamily()
87    features = ethnl.features_get({"header": {"dev-index": cfg.ifindex}})
88    for f in features["active"]["bits"]["bit"]:
89        if f["name"] == "tx-checksum-ip-generic":
90            cfg.have_tx_csum_generic = True
91        elif f["name"] == "tx-checksum-ipv4":
92            cfg.have_tx_csum_ipv4 = True
93        elif f["name"] == "tx-checksum-ipv6":
94            cfg.have_tx_csum_ipv6 = True
95        elif f["name"] == "rx-checksum":
96            cfg.have_rx_csum = True
97
98
99def main() -> None:
100    with NetDrvEpEnv(__file__, nsim_test=False) as cfg:
101        check_nic_features(cfg)
102
103        cfg.bin_local = path.abspath(path.dirname(__file__) + "/../../../net/lib/csum")
104        cfg.bin_remote = cfg.remote.deploy(cfg.bin_local)
105
106        cases = []
107        for ipv4 in [True, False]:
108            cases.append(test_builder("rx_tcp", cfg, ipv4, False, "-t"))
109            cases.append(test_builder("rx_tcp_invalid", cfg, ipv4, False, "-t -E"))
110
111            cases.append(test_builder("rx_udp", cfg, ipv4, False, ""))
112            cases.append(test_builder("rx_udp_invalid", cfg, ipv4, False, "-E"))
113
114            cases.append(test_builder("tx_udp_csum_offload", cfg, ipv4, True, "-U"))
115            cases.append(test_builder("tx_udp_zero_checksum", cfg, ipv4, True, "-U -Z"))
116
117        ksft_run(cases=cases, args=(cfg, ))
118    ksft_exit()
119
120
121if __name__ == "__main__":
122    main()
123