1 /* 2 * Sigma Control API DUT (station/AP) 3 * Copyright (c) 2010, Atheros Communications, Inc. 4 * Copyright (c) 2011-2015, Qualcomm Atheros, Inc. 5 * All Rights Reserved. 6 * Licensed under the Clear BSD license. See README for more details. 7 */ 8 9 #include "sigma_dut.h" 10 #include "wpa_helpers.h" 11 12 #define TG_MAX_CLIENTS_CONNECTIONS 1 13 14 15 static int cmd_traffic_agent_config(struct sigma_dut *dut, 16 struct sigma_conn *conn, 17 struct sigma_cmd *cmd) 18 { 19 struct sigma_stream *s; 20 const char *val; 21 char buf[100]; 22 23 if (dut->num_streams == MAX_SIGMA_STREAMS) { 24 send_resp(dut, conn, SIGMA_ERROR, "errorCode,No more " 25 "concurrent traffic streams supported"); 26 return 0; 27 } 28 29 s = &dut->streams[dut->num_streams]; 30 free(s->stats); 31 memset(s, 0, sizeof(*s)); 32 s->sock = -1; 33 s->no_timestamps = dut->no_timestamps; 34 35 val = get_param(cmd, "profile"); 36 if (!val) 37 return -1; 38 39 if (strcasecmp(val, "File_Transfer") == 0) 40 s->profile = SIGMA_PROFILE_FILE_TRANSFER; 41 else if (strcasecmp(val, "Multicast") == 0) 42 s->profile = SIGMA_PROFILE_MULTICAST; 43 else if (strcasecmp(val, "IPTV") == 0) 44 s->profile = SIGMA_PROFILE_IPTV; 45 else if (strcasecmp(val, "Transaction") == 0) 46 s->profile = SIGMA_PROFILE_TRANSACTION; 47 else if (strcasecmp(val, "Start_Sync") == 0) 48 s->profile = SIGMA_PROFILE_START_SYNC; 49 else if (strcasecmp(val, "Uapsd") == 0) 50 s->profile = SIGMA_PROFILE_UAPSD; 51 else { 52 send_resp(dut, conn, SIGMA_INVALID, "errorCode,Unsupported " 53 "profile"); 54 return 0; 55 } 56 57 val = get_param(cmd, "direction"); 58 if (!val) 59 return -1; 60 if (strcasecmp(val, "send") == 0) 61 s->sender = 1; 62 else if (strcasecmp(val, "receive") == 0) 63 s->sender = 0; 64 else 65 return -1; 66 67 val = get_param(cmd, "destination"); 68 if (val) { 69 if (inet_aton(val, &s->dst) == 0) 70 return -1; 71 } 72 73 val = get_param(cmd, "source"); 74 if (val) { 75 if (inet_aton(val, &s->src) == 0) 76 return -1; 77 } 78 79 val = get_param(cmd, "destinationPort"); 80 if (val) 81 s->dst_port = atoi(val); 82 83 val = get_param(cmd, "sourcePort"); 84 if (val) 85 s->src_port = atoi(val); 86 87 val = get_param(cmd, "frameRate"); 88 if (val) 89 s->frame_rate = atoi(val); 90 91 val = get_param(cmd, "duration"); 92 if (val) 93 s->duration = atoi(val); 94 95 val = get_param(cmd, "payloadSize"); 96 if (val) 97 s->payload_size = atoi(val); 98 99 val = get_param(cmd, "startDelay"); 100 if (val) 101 s->start_delay = atoi(val); 102 103 val = get_param(cmd, "maxCnt"); 104 if (val) 105 s->max_cnt = atoi(val); 106 107 val = get_param(cmd, "trafficClass"); 108 if (val) { 109 if (strcasecmp(val, "Voice") == 0) 110 s->tc = SIGMA_TC_VOICE; 111 else if (strcasecmp(val, "Video") == 0) 112 s->tc = SIGMA_TC_VIDEO; 113 else if (strcasecmp(val, "Background") == 0) 114 s->tc = SIGMA_TC_BACKGROUND; 115 else if (strcasecmp(val, "BestEffort") == 0) 116 s->tc = SIGMA_TC_BEST_EFFORT; 117 else 118 return -1; 119 } 120 121 val = get_param(cmd, "userpriority"); 122 if (val) { 123 s->user_priority_set = 1; 124 s->user_priority = atoi(val); 125 } 126 127 val = get_param(cmd, "tagName"); 128 if (val) { 129 strncpy(s->test_name, val, sizeof(s->test_name)); 130 s->test_name[sizeof(s->test_name) - 1] = '\0'; 131 sigma_dut_print(dut, DUT_MSG_DEBUG, 132 "Traffic agent: U-APSD console tagname %s", 133 s->test_name); 134 } 135 136 if (dut->throughput_pktsize && s->frame_rate == 0 && s->sender && 137 dut->throughput_pktsize != s->payload_size && 138 (s->profile == SIGMA_PROFILE_FILE_TRANSFER || 139 s->profile == SIGMA_PROFILE_IPTV || 140 s->profile == SIGMA_PROFILE_UAPSD)) { 141 sigma_dut_print(dut, DUT_MSG_INFO, "Traffic agent: Override " 142 "throughput test payload size %d -> %d", 143 s->payload_size, dut->throughput_pktsize); 144 s->payload_size = dut->throughput_pktsize; 145 } 146 147 val = get_param(cmd, "transProtoType"); 148 if (val) { 149 if (strcmp(val, "1") == 0) 150 s->trans_proto = IPPROTO_TCP; 151 else if (strcmp(val, "0") == 0) 152 s->trans_proto = IPPROTO_UDP; 153 else 154 return -1; 155 } else { 156 s->trans_proto = IPPROTO_UDP; 157 } 158 159 if (s->profile == SIGMA_PROFILE_IPTV && !s->sender && !s->no_timestamps) 160 { 161 s->stats = calloc(MAX_SIGMA_STATS, 162 sizeof(struct sigma_frame_stats)); 163 if (s->stats == NULL) 164 return -1; 165 } 166 167 dut->stream_id++; 168 dut->num_streams++; 169 170 s->stream_id = dut->stream_id; 171 snprintf(buf, sizeof(buf), "streamID,%d", s->stream_id); 172 send_resp(dut, conn, SIGMA_COMPLETE, buf); 173 return 0; 174 } 175 176 177 static void stop_stream(struct sigma_stream *s) 178 { 179 if (s && s->started) { 180 pthread_join(s->thr, NULL); 181 if (s->sock != -1) { 182 close(s->sock); 183 s->sock = -1; 184 } 185 186 s->started = 0; 187 } 188 } 189 190 191 static int cmd_traffic_agent_reset(struct sigma_dut *dut, 192 struct sigma_conn *conn, 193 struct sigma_cmd *cmd) 194 { 195 int i; 196 for (i = 0; i < dut->num_streams; i++) { 197 struct sigma_stream *s = &dut->streams[i]; 198 s->stop = 1; 199 stop_stream(s); 200 } 201 dut->num_streams = 0; 202 memset(&dut->streams, 0, sizeof(dut->streams)); 203 return 1; 204 } 205 206 207 static int get_stream_id(const char *str, int streams[MAX_SIGMA_STREAMS]) 208 { 209 int count; 210 211 count = 0; 212 for (;;) { 213 if (count == MAX_SIGMA_STREAMS) 214 return -1; 215 streams[count] = atoi(str); 216 if (streams[count] == 0) 217 return -1; 218 count++; 219 str = strchr(str, ' '); 220 if (str == NULL) 221 break; 222 while (*str == ' ') 223 str++; 224 } 225 226 return count; 227 } 228 229 230 static int open_socket_file_transfer(struct sigma_dut *dut, 231 struct sigma_stream *s) 232 { 233 struct sockaddr_in addr; 234 int sock_opt_val = 1; 235 236 s->sock = socket(PF_INET, IPPROTO_UDP == s->trans_proto ? SOCK_DGRAM : 237 SOCK_STREAM, s->trans_proto); 238 if (s->sock < 0) { 239 perror("socket"); 240 return -1; 241 } 242 243 if (setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR, &sock_opt_val, 244 sizeof(sock_opt_val)) < 0) { 245 perror("setsockopt"); 246 close(s->sock); 247 s->sock = -1; 248 return -1; 249 } 250 251 memset(&addr, 0, sizeof(addr)); 252 addr.sin_family = AF_INET; 253 addr.sin_port = htons(s->sender ? s->src_port : s->dst_port); 254 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: sender=%d " 255 "bind port %d", s->sender, ntohs(addr.sin_port)); 256 if (bind(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 0) { 257 perror("bind"); 258 close(s->sock); 259 s->sock = -1; 260 return -1; 261 } 262 263 if (s->profile == SIGMA_PROFILE_MULTICAST && !s->sender) 264 return 0; 265 266 if (s->trans_proto == IPPROTO_TCP && s->sender == 0) { 267 if (listen(s->sock, TG_MAX_CLIENTS_CONNECTIONS ) < 0) { 268 sigma_dut_print(dut, DUT_MSG_INFO, 269 "Listen failed with error %d: %s", 270 errno, strerror(errno)); 271 close(s->sock); 272 s->sock = -1; 273 return -1; 274 } 275 } else { 276 memset(&addr, 0, sizeof(addr)); 277 addr.sin_family = AF_INET; 278 addr.sin_addr.s_addr = s->sender ? s->dst.s_addr : 279 s->src.s_addr; 280 addr.sin_port = htons(s->sender ? s->dst_port : s->src_port); 281 sigma_dut_print(dut, DUT_MSG_DEBUG, 282 "Traffic agent: connect %s:%d", 283 inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); 284 if (connect(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 285 0) { 286 perror("connect"); 287 close(s->sock); 288 s->sock = -1; 289 return -1; 290 } 291 } 292 293 return 0; 294 } 295 296 297 static int open_socket_multicast(struct sigma_dut *dut, struct sigma_stream *s) 298 { 299 if (open_socket_file_transfer(dut, s) < 0) 300 return -1; 301 302 if (!s->sender) { 303 struct ip_mreq mr; 304 memset(&mr, 0, sizeof(mr)); 305 mr.imr_multiaddr.s_addr = s->dst.s_addr; 306 mr.imr_interface.s_addr = htonl(INADDR_ANY); 307 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: " 308 "IP_ADD_MEMBERSHIP %s", inet_ntoa(s->dst)); 309 if (setsockopt(s->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, 310 (void *) &mr, sizeof(mr)) < 0) { 311 sigma_dut_print(dut, DUT_MSG_INFO, 312 "setsockopt[IP_ADD_MEMBERSHIP]: %s", 313 strerror(errno)); 314 /* 315 * Continue anyway since this can happen, e.g., if the 316 * default route is missing. This is not critical for 317 * multicast RX testing. 318 */ 319 } 320 } 321 322 return 0; 323 } 324 325 326 static int set_socket_prio(struct sigma_stream *s) 327 { 328 int tos = 0x00; 329 330 switch (s->tc) { 331 case SIGMA_TC_VOICE: 332 if (s->user_priority_set) { 333 if (s->user_priority == 6) 334 tos = 48 << 2; 335 else if (s->user_priority == 7) 336 tos = 56 << 2; 337 else 338 return -1; 339 } else 340 tos = 0xe0; /* DSCP = 56 */ 341 break; 342 case SIGMA_TC_VIDEO: 343 if (s->user_priority_set) { 344 if (s->user_priority == 4) 345 tos = 32 << 2; 346 else if (s->user_priority == 5) 347 tos = 40 << 2; 348 else 349 return -1; 350 } else 351 tos = 0xa0; /* DSCP = 40 */ 352 break; 353 case SIGMA_TC_BACKGROUND: 354 if (s->user_priority_set) { 355 if (s->user_priority == 1) 356 tos = 8 << 2; 357 else if (s->user_priority == 2) 358 tos = 16 << 2; 359 else 360 return -1; 361 } else 362 tos = 0x20; /* DSCP = 8 */ 363 break; 364 case SIGMA_TC_BEST_EFFORT: 365 if (s->user_priority_set) { 366 if (s->user_priority == 0) 367 tos = 0 << 2; 368 else if (s->user_priority == 3) 369 tos = 20 << 2; 370 else 371 return -1; 372 } else 373 tos = 0x00; /* DSCP = 0 */ 374 break; 375 } 376 377 if (setsockopt(s->sock, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)) < 0) { 378 perror("setsockopt"); 379 return -1; 380 } 381 382 return 0; 383 } 384 385 386 static int open_socket(struct sigma_dut *dut, struct sigma_stream *s) 387 { 388 switch (s->profile) { 389 case SIGMA_PROFILE_FILE_TRANSFER: 390 return open_socket_file_transfer(dut, s); 391 case SIGMA_PROFILE_MULTICAST: 392 return open_socket_multicast(dut, s); 393 case SIGMA_PROFILE_IPTV: 394 if (open_socket_file_transfer(dut, s) < 0) 395 return -1; 396 return set_socket_prio(s); 397 case SIGMA_PROFILE_TRANSACTION: 398 return open_socket_file_transfer(dut, s); 399 case SIGMA_PROFILE_UAPSD: 400 return open_socket_file_transfer(dut, s); 401 case SIGMA_PROFILE_START_SYNC: 402 sigma_dut_print(dut, DUT_MSG_INFO, "Traffic stream profile %d " 403 "not yet supported", s->profile); 404 /* TODO */ 405 break; 406 } 407 408 return -1; 409 } 410 411 412 static void send_file_fast(struct sigma_stream *s, char *pkt) 413 { 414 struct timeval stop, now; 415 int res; 416 unsigned int counter = 0; 417 418 gettimeofday(&stop, NULL); 419 stop.tv_sec += s->duration; 420 421 while (!s->stop) { 422 counter++; 423 WPA_PUT_BE32(&pkt[8], counter); 424 425 if ((counter & 0xf) == 0) { 426 gettimeofday(&now, NULL); 427 if (now.tv_sec > stop.tv_sec || 428 (now.tv_sec == stop.tv_sec && 429 now.tv_usec >= stop.tv_usec)) 430 break; 431 } 432 433 s->tx_act_frames++; 434 res = send(s->sock, pkt, s->payload_size, 0); 435 if (res >= 0) { 436 s->tx_frames++; 437 s->tx_payload_bytes += res; 438 } else { 439 switch (errno) { 440 case EAGAIN: 441 case ENOBUFS: 442 usleep(1000); 443 break; 444 case ECONNRESET: 445 case EPIPE: 446 s->stop = 1; 447 break; 448 default: 449 perror("send"); 450 break; 451 } 452 } 453 } 454 } 455 456 457 static void send_file(struct sigma_stream *s) 458 { 459 char *pkt; 460 struct timeval stop, now, start; 461 int res; 462 unsigned int counter = 0; 463 int sleep_time, sleep_usec, pkt_spacing, duration = 0; 464 465 if (s->duration <= 0 || s->frame_rate < 0 || s->payload_size < 20) 466 return; 467 468 pkt = malloc(s->payload_size); 469 if (pkt == NULL) 470 return; 471 memset(pkt, 1, s->payload_size); 472 strncpy(pkt, "1345678", s->payload_size); 473 474 if (s->frame_rate == 0 && s->no_timestamps) { 475 send_file_fast(s, pkt); 476 free(pkt); 477 return; 478 } 479 480 gettimeofday(&stop, NULL); 481 stop.tv_sec += s->duration; 482 483 if (s->frame_rate == 0) { 484 sleep_time = 0; 485 pkt_spacing = 0; 486 } else { 487 sleep_time = 1000000 / s->frame_rate; 488 pkt_spacing = sleep_time; 489 sleep_time -= 100; 490 if (sleep_time < 0) 491 sleep_time = 0; 492 } 493 sleep_usec = sleep_time; 494 495 gettimeofday(&start, NULL); 496 497 while (!s->stop) { 498 counter++; 499 WPA_PUT_BE32(&pkt[8], counter); 500 501 if (sleep_usec) 502 usleep(sleep_usec); 503 504 gettimeofday(&now, NULL); 505 if (now.tv_sec > stop.tv_sec || 506 (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec)) 507 break; 508 509 if (sleep_time) { 510 /* Update sleep time based on current time */ 511 struct timeval tmp; 512 int diff; 513 514 duration += pkt_spacing; 515 timersub(&now, &start, &tmp); 516 diff = tmp.tv_sec * 1000000 + tmp.tv_usec; 517 518 if (diff > duration) { 519 sleep_usec = pkt_spacing - (diff - duration); 520 sleep_usec -= 100; 521 } else { 522 sleep_usec = pkt_spacing - 100; 523 } 524 if (sleep_usec < 0) 525 sleep_usec = 0; 526 } 527 528 WPA_PUT_BE32(&pkt[12], now.tv_sec); 529 WPA_PUT_BE32(&pkt[16], now.tv_usec); 530 531 s->tx_act_frames++; 532 res = send(s->sock, pkt, s->payload_size, 0); 533 if (res >= 0) { 534 s->tx_frames++; 535 s->tx_payload_bytes += res; 536 } else { 537 switch (errno) { 538 case EAGAIN: 539 case ENOBUFS: 540 usleep(1000); 541 break; 542 case ECONNRESET: 543 case EPIPE: 544 s->stop = 1; 545 break; 546 default: 547 perror("send"); 548 break; 549 } 550 } 551 } 552 553 free(pkt); 554 } 555 556 557 static void send_transaction(struct sigma_stream *s) 558 { 559 char *pkt, *rpkt; 560 struct timeval stop, now; 561 int res; 562 unsigned int counter = 0, rcounter; 563 int wait_time; 564 fd_set rfds; 565 struct timeval tv; 566 567 if (s->duration <= 0 || s->frame_rate <= 0 || s->payload_size < 20) 568 return; 569 570 pkt = malloc(s->payload_size); 571 if (pkt == NULL) 572 return; 573 rpkt = malloc(s->payload_size); 574 if (rpkt == NULL) { 575 free(pkt); 576 return; 577 } 578 memset(pkt, 1, s->payload_size); 579 strncpy(pkt, "1345678", s->payload_size); 580 581 gettimeofday(&stop, NULL); 582 stop.tv_sec += s->duration; 583 584 wait_time = 1000000 / s->frame_rate; 585 586 while (!s->stop) { 587 counter++; 588 if (s->max_cnt && (int) counter > s->max_cnt) 589 break; 590 WPA_PUT_BE32(&pkt[8], counter); 591 592 gettimeofday(&now, NULL); 593 if (now.tv_sec > stop.tv_sec || 594 (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec)) 595 break; 596 WPA_PUT_BE32(&pkt[12], now.tv_sec); 597 WPA_PUT_BE32(&pkt[16], now.tv_usec); 598 599 res = send(s->sock, pkt, s->payload_size, 0); 600 if (res >= 0) { 601 s->tx_frames++; 602 s->tx_payload_bytes += res; 603 } else { 604 switch (errno) { 605 case EAGAIN: 606 case ENOBUFS: 607 usleep(1000); 608 break; 609 case ECONNRESET: 610 case EPIPE: 611 s->stop = 1; 612 break; 613 default: 614 perror("send"); 615 break; 616 } 617 } 618 619 /* Wait for response */ 620 tv.tv_sec = 0; 621 tv.tv_usec = wait_time; 622 FD_ZERO(&rfds); 623 FD_SET(s->sock, &rfds); 624 res = select(s->sock + 1, &rfds, NULL, NULL, &tv); 625 if (res < 0) { 626 if (errno == EINTR) 627 continue; 628 perror("select"); 629 break; 630 } 631 632 if (res == 0) { 633 /* timeout */ 634 continue; 635 } 636 637 if (FD_ISSET(s->sock, &rfds)) { 638 /* response received */ 639 res = recv(s->sock, rpkt, s->payload_size, 0); 640 if (res < 0) { 641 perror("recv"); 642 break; 643 } 644 rcounter = WPA_GET_BE32(&rpkt[8]); 645 if (rcounter != counter) 646 s->out_of_seq_frames++; 647 s->rx_frames++; 648 s->rx_payload_bytes += res; 649 } 650 } 651 652 free(pkt); 653 free(rpkt); 654 } 655 656 657 static void * send_thread(void *ctx) 658 { 659 struct sigma_stream *s = ctx; 660 661 sleep(s->start_delay); 662 663 switch (s->profile) { 664 case SIGMA_PROFILE_FILE_TRANSFER: 665 send_file(s); 666 break; 667 case SIGMA_PROFILE_MULTICAST: 668 send_file(s); 669 break; 670 case SIGMA_PROFILE_IPTV: 671 send_file(s); 672 break; 673 case SIGMA_PROFILE_TRANSACTION: 674 send_transaction(s); 675 break; 676 case SIGMA_PROFILE_START_SYNC: 677 break; 678 case SIGMA_PROFILE_UAPSD: 679 send_uapsd_console(s); 680 break; 681 } 682 683 return NULL; 684 } 685 686 687 struct traffic_agent_send_data { 688 struct sigma_dut *dut; 689 struct sigma_conn *conn; 690 int streams[MAX_SIGMA_STREAMS]; 691 int count; 692 }; 693 694 695 static struct sigma_stream * get_stream(struct sigma_dut *dut, int id) 696 { 697 int i; 698 699 for (i = 0; i < dut->num_streams; i++) { 700 if ((unsigned int) id == dut->streams[i].stream_id) 701 return &dut->streams[i]; 702 } 703 704 return NULL; 705 } 706 707 708 static void * send_report_thread(void *ctx) 709 { 710 struct traffic_agent_send_data *data = ctx; 711 struct sigma_dut *dut = data->dut; 712 struct sigma_conn *conn = data->conn; 713 int i, ret; 714 char buf[100 + MAX_SIGMA_STREAMS * 60], *pos; 715 716 for (i = 0; i < data->count; i++) { 717 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: waiting " 718 "for stream %d send to complete", 719 data->streams[i]); 720 stop_stream(get_stream(dut, data->streams[i])); 721 } 722 723 buf[0] = '\0'; 724 pos = buf; 725 726 pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,"); 727 for (i = 0; i < data->count; i++) { 728 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 729 i > 0 ? " " : "", data->streams[i]); 730 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 731 break; 732 pos += ret; 733 } 734 735 if (dut->program == PROGRAM_60GHZ) { 736 sigma_dut_print(dut, DUT_MSG_INFO, "reporting tx_act_frames"); 737 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,"); 738 for (i = 0; i < data->count; i++) { 739 struct sigma_stream *s; 740 741 s = get_stream(dut, data->streams[i]); 742 if (!s) 743 continue; 744 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 745 i > 0 ? " " : "", s->tx_act_frames); 746 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 747 break; 748 pos += ret; 749 } 750 } 751 752 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,"); 753 for (i = 0; i < data->count; i++) { 754 struct sigma_stream *s = get_stream(dut, data->streams[i]); 755 756 if (!s) 757 continue; 758 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 759 i > 0 ? " " : "", s->tx_frames); 760 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 761 break; 762 pos += ret; 763 } 764 765 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,"); 766 for (i = 0; i < data->count; i++) { 767 struct sigma_stream *s = get_stream(dut, data->streams[i]); 768 769 if (!s) 770 continue; 771 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 772 i > 0 ? " " : "", s->rx_frames); 773 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 774 break; 775 pos += ret; 776 } 777 778 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,"); 779 for (i = 0; i < data->count; i++) { 780 struct sigma_stream *s = get_stream(dut, data->streams[i]); 781 782 if (!s) 783 continue; 784 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", 785 i > 0 ? " " : "", s->tx_payload_bytes); 786 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 787 break; 788 pos += ret; 789 } 790 791 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,"); 792 for (i = 0; i < data->count; i++) { 793 struct sigma_stream *s = get_stream(dut, data->streams[i]); 794 795 if (!s) 796 continue; 797 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", 798 i > 0 ? " " : "", s->rx_payload_bytes); 799 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 800 break; 801 pos += ret; 802 } 803 804 pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,"); 805 for (i = 0; i < data->count; i++) { 806 struct sigma_stream *s = get_stream(dut, data->streams[i]); 807 808 if (!s) 809 continue; 810 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 811 i > 0 ? " " : "", s->out_of_seq_frames); 812 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 813 break; 814 pos += ret; 815 } 816 817 for (i = 0; i < data->count; i++) { 818 struct sigma_stream *s = get_stream(dut, data->streams[i]); 819 if (!s) 820 continue; 821 s->ta_send_in_progress = 0; 822 if (s->trans_proto == IPPROTO_TCP) { 823 /* 824 * Close the socket to make sure client side close the 825 * network before the server. Otherwise, the server 826 * might get "Address already in use" when trying to 827 * reuse the port. 828 */ 829 close(s->sock); 830 s->sock = -1; 831 sigma_dut_print(dut, DUT_MSG_DEBUG, 832 "Closed the sender socket"); 833 } 834 } 835 836 buf[sizeof(buf) - 1] = '\0'; 837 838 if (conn->s < 0) 839 sigma_dut_print(dut, DUT_MSG_INFO, "Cannot send traffic_agent response since control socket has already been closed"); 840 else 841 send_resp(dut, conn, SIGMA_COMPLETE, buf); 842 conn->waiting_completion = 0; 843 844 free(data); 845 846 return NULL; 847 } 848 849 850 static int cmd_traffic_agent_send(struct sigma_dut *dut, 851 struct sigma_conn *conn, 852 struct sigma_cmd *cmd) 853 { 854 const char *val; 855 int i, j, res; 856 char buf[100]; 857 struct traffic_agent_send_data *data; 858 859 val = get_param(cmd, "streamID"); 860 if (val == NULL) 861 return -1; 862 863 data = calloc(1, sizeof(*data)); 864 if (data == NULL) 865 return -1; 866 data->dut = dut; 867 data->conn = conn; 868 869 data->count = get_stream_id(val, data->streams); 870 if (data->count < 0) { 871 free(data); 872 return -1; 873 } 874 for (i = 0; i < data->count; i++) { 875 struct sigma_stream *s = get_stream(dut, data->streams[i]); 876 877 if (!s) { 878 snprintf(buf, sizeof(buf), "errorCode,StreamID %d " 879 "not configured", data->streams[i]); 880 send_resp(dut, conn, SIGMA_INVALID, buf); 881 free(data); 882 return 0; 883 } 884 for (j = 0; j < i; j++) 885 if (data->streams[i] == data->streams[j]) 886 return -1; 887 if (!s->sender) { 888 snprintf(buf, sizeof(buf), "errorCode,Not configured " 889 "as sender for streamID %d", data->streams[i]); 890 send_resp(dut, conn, SIGMA_INVALID, buf); 891 free(data); 892 return 0; 893 } 894 if (s->ta_send_in_progress) { 895 send_resp(dut, conn, SIGMA_ERROR, 896 "errorCode,Multiple concurrent send cmds on same streamID not supported"); 897 free(data); 898 return 0; 899 } 900 } 901 902 for (i = 0; i < data->count; i++) { 903 struct sigma_stream *s = get_stream(dut, data->streams[i]); 904 905 if (!s) 906 continue; 907 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open " 908 "socket for send stream %d", data->streams[i]); 909 if (open_socket(dut, s) < 0) { 910 free(data); 911 return -2; 912 } 913 } 914 915 for (i = 0; i < data->count; i++) { 916 struct sigma_stream *s = get_stream(dut, data->streams[i]); 917 918 if (!s) 919 continue; 920 921 /* 922 * Provide dut context to the thread to support debugging and 923 * returning of error messages. 924 */ 925 s->dut = dut; 926 927 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start " 928 "send for stream %d", data->streams[i]); 929 res = pthread_create(&s->thr, NULL, send_thread, s); 930 if (res) { 931 sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create " 932 "failed: %d", res); 933 free(data); 934 return -2; 935 } 936 s->started = 1; 937 } 938 939 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start a thread to track sending streams"); 940 conn->waiting_completion = 1; 941 res = pthread_create(&dut->thr, NULL, send_report_thread, data); 942 if (res) { 943 sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create failed: %d", 944 res); 945 free(data); 946 conn->waiting_completion = 0; 947 return -2; 948 } 949 950 for (i = 0; i < data->count; i++) { 951 struct sigma_stream *s = get_stream(dut, data->streams[i]); 952 953 if (s) 954 s->ta_send_in_progress = 1; 955 } 956 957 /* Command will be completed in send_report_thread() */ 958 959 return 0; 960 } 961 962 963 static void receive_file(struct sigma_stream *s) 964 { 965 struct timeval tv, now; 966 fd_set rfds; 967 int res; 968 char *pkt; 969 int pktlen; 970 unsigned int last_rx = 0, counter; 971 972 pktlen = 65536 + 1; 973 pkt = malloc(pktlen); 974 if (pkt == NULL) 975 return; 976 977 while (!s->stop) { 978 FD_ZERO(&rfds); 979 FD_SET(s->sock, &rfds); 980 tv.tv_sec = 0; 981 tv.tv_usec = 300000; 982 res = select(s->sock + 1, &rfds, NULL, NULL, &tv); 983 if (res < 0) { 984 perror("select"); 985 usleep(10000); 986 } else if (FD_ISSET(s->sock, &rfds)) { 987 res = recv(s->sock, pkt, pktlen, 0); 988 if (res >= 0) { 989 s->rx_frames++; 990 s->rx_payload_bytes += res; 991 992 counter = WPA_GET_BE32(&pkt[8]); 993 if (counter < last_rx) 994 s->out_of_seq_frames++; 995 last_rx = counter; 996 } else { 997 perror("recv"); 998 break; 999 } 1000 1001 if (res >= 20 && s->stats && 1002 s->num_stats < MAX_SIGMA_STATS) { 1003 struct sigma_frame_stats *stats; 1004 stats = &s->stats[s->num_stats]; 1005 s->num_stats++; 1006 gettimeofday(&now, NULL); 1007 stats->seqnum = counter; 1008 stats->local_sec = now.tv_sec; 1009 stats->local_usec = now.tv_usec; 1010 stats->remote_sec = WPA_GET_BE32(&pkt[12]); 1011 stats->remote_usec = WPA_GET_BE32(&pkt[16]); 1012 } 1013 } 1014 } 1015 1016 free(pkt); 1017 } 1018 1019 1020 static void receive_transaction(struct sigma_stream *s) 1021 { 1022 struct timeval tv; 1023 fd_set rfds; 1024 int res; 1025 char *pkt; 1026 int pktlen; 1027 unsigned int last_rx = 0, counter; 1028 struct sockaddr_in addr; 1029 socklen_t addrlen; 1030 1031 if (s->payload_size) 1032 pktlen = s->payload_size; 1033 else 1034 pktlen = 65536 + 1; 1035 pkt = malloc(pktlen); 1036 if (pkt == NULL) 1037 return; 1038 1039 while (!s->stop) { 1040 FD_ZERO(&rfds); 1041 FD_SET(s->sock, &rfds); 1042 tv.tv_sec = 0; 1043 tv.tv_usec = 300000; 1044 res = select(s->sock + 1, &rfds, NULL, NULL, &tv); 1045 if (res < 0) { 1046 perror("select"); 1047 usleep(10000); 1048 } else if (FD_ISSET(s->sock, &rfds)) { 1049 addrlen = sizeof(addr); 1050 res = recvfrom(s->sock, pkt, pktlen, 0, 1051 (struct sockaddr *) &addr, &addrlen); 1052 if (res < 0) { 1053 perror("recv"); 1054 break; 1055 } 1056 1057 s->rx_frames++; 1058 s->rx_payload_bytes += res; 1059 1060 counter = WPA_GET_BE32(&pkt[8]); 1061 if (counter < last_rx) 1062 s->out_of_seq_frames++; 1063 last_rx = counter; 1064 1065 /* send response */ 1066 res = sendto(s->sock, pkt, pktlen, 0, 1067 (struct sockaddr *) &addr, addrlen); 1068 if (res < 0) { 1069 perror("sendto"); 1070 } else { 1071 s->tx_frames++; 1072 s->tx_payload_bytes += res; 1073 } 1074 } 1075 } 1076 1077 free(pkt); 1078 } 1079 1080 1081 static void * receive_thread(void *ctx) 1082 { 1083 struct sigma_stream *s = ctx; 1084 1085 if (s->trans_proto == IPPROTO_TCP) { 1086 /* Wait for socket to be accepted */ 1087 struct sockaddr_in connected_addr; 1088 int connected_sock; /* returned from accept on sock */ 1089 socklen_t connected_addr_len = sizeof(connected_addr); 1090 1091 sigma_dut_print(s->dut, DUT_MSG_DEBUG, 1092 "Traffic agent: Waiting on accept"); 1093 connected_sock = accept(s->sock, 1094 (struct sockaddr *) &connected_addr, 1095 &connected_addr_len); 1096 if (connected_sock < 0) { 1097 sigma_dut_print(s->dut, DUT_MSG_ERROR, 1098 "Traffic agent: Failed to accept: %s", 1099 strerror(errno)); 1100 return NULL; 1101 } 1102 1103 sigma_dut_print(s->dut, DUT_MSG_DEBUG, 1104 "Traffic agent: Accepted client closing parent socket and talk over connected sock."); 1105 close(s->sock); 1106 s->sock = connected_sock; 1107 } 1108 1109 switch (s->profile) { 1110 case SIGMA_PROFILE_FILE_TRANSFER: 1111 receive_file(s); 1112 break; 1113 case SIGMA_PROFILE_MULTICAST: 1114 receive_file(s); 1115 break; 1116 case SIGMA_PROFILE_IPTV: 1117 receive_file(s); 1118 break; 1119 case SIGMA_PROFILE_TRANSACTION: 1120 receive_transaction(s); 1121 break; 1122 case SIGMA_PROFILE_START_SYNC: 1123 break; 1124 case SIGMA_PROFILE_UAPSD: 1125 receive_uapsd(s); 1126 break; 1127 } 1128 1129 return NULL; 1130 } 1131 1132 1133 static int cmd_traffic_agent_receive_start(struct sigma_dut *dut, 1134 struct sigma_conn *conn, 1135 struct sigma_cmd *cmd) 1136 { 1137 const char *val; 1138 int streams[MAX_SIGMA_STREAMS]; 1139 int i, j, count; 1140 char buf[100]; 1141 1142 val = get_param(cmd, "streamID"); 1143 if (val == NULL) 1144 return -1; 1145 count = get_stream_id(val, streams); 1146 if (count < 0) 1147 return -1; 1148 for (i = 0; i < count; i++) { 1149 struct sigma_stream *s = get_stream(dut, streams[i]); 1150 1151 if (!s) { 1152 snprintf(buf, sizeof(buf), "errorCode,StreamID %d " 1153 "not configured", streams[i]); 1154 send_resp(dut, conn, SIGMA_INVALID, buf); 1155 return 0; 1156 } 1157 for (j = 0; j < i; j++) 1158 if (streams[i] == streams[j]) 1159 return -1; 1160 if (s->sender) { 1161 snprintf(buf, sizeof(buf), "errorCode,Not configured " 1162 "as receiver for streamID %d", streams[i]); 1163 send_resp(dut, conn, SIGMA_INVALID, buf); 1164 return 0; 1165 } 1166 } 1167 1168 for (i = 0; i < count; i++) { 1169 struct sigma_stream *s = get_stream(dut, streams[i]); 1170 1171 if (!s) 1172 continue; 1173 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open " 1174 "receive socket for stream %d", streams[i]); 1175 if (open_socket(dut, s) < 0) 1176 return -2; 1177 } 1178 1179 for (i = 0; i < count; i++) { 1180 struct sigma_stream *s = get_stream(dut, streams[i]); 1181 int res; 1182 1183 if (!s) 1184 continue; 1185 /* 1186 * Provide dut context to the thread to support debugging and 1187 * returning of error messages. Similarly, provide interface 1188 * information to the thread. If the Interface parameter is not 1189 * passed, get it from get_station_ifname() since the interface 1190 * name is needed for power save mode configuration for Uapsd 1191 * cases. 1192 */ 1193 s->dut = dut; 1194 val = get_param(cmd, "Interface"); 1195 strncpy(s->ifname, (val ? val : get_station_ifname()), 1196 sizeof(s->ifname)); 1197 s->ifname[sizeof(s->ifname) - 1] = '\0'; 1198 1199 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start " 1200 "receive for stream %d", streams[i]); 1201 res = pthread_create(&s->thr, NULL, receive_thread, s); 1202 if (res) { 1203 sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create " 1204 "failed: %d", res); 1205 return -2; 1206 } 1207 s->started = 1; 1208 } 1209 1210 return 1; 1211 } 1212 1213 1214 static void write_frame_stats(struct sigma_dut *dut, struct sigma_stream *s, 1215 int id) 1216 { 1217 char fname[128]; 1218 FILE *f; 1219 unsigned int i; 1220 1221 snprintf(fname, sizeof(fname), SIGMA_TMPDIR "/e2e%u-%d.txt", 1222 (unsigned int) time(NULL), id); 1223 f = fopen(fname, "w"); 1224 if (f == NULL) { 1225 sigma_dut_print(dut, DUT_MSG_INFO, "Could not write %s", 1226 fname); 1227 return; 1228 } 1229 fprintf(f, "seqnum:local_sec:local_usec:remote_sec:remote_usec\n"); 1230 1231 sigma_dut_print(dut, DUT_MSG_DEBUG, "Writing frame stats to %s", 1232 fname); 1233 1234 for (i = 0; i < s->num_stats; i++) { 1235 struct sigma_frame_stats *stats = &s->stats[i]; 1236 fprintf(f, "%u:%u:%u:%u:%u\n", stats->seqnum, 1237 stats->local_sec, stats->local_usec, 1238 stats->remote_sec, stats->remote_usec); 1239 } 1240 1241 fclose(f); 1242 } 1243 1244 1245 static int cmd_traffic_agent_receive_stop(struct sigma_dut *dut, 1246 struct sigma_conn *conn, 1247 struct sigma_cmd *cmd) 1248 { 1249 const char *val; 1250 int streams[MAX_SIGMA_STREAMS]; 1251 int i, j, ret, count; 1252 char buf[100 + MAX_SIGMA_STREAMS * 60], *pos; 1253 1254 val = get_param(cmd, "streamID"); 1255 if (val == NULL) 1256 return -1; 1257 count = get_stream_id(val, streams); 1258 if (count < 0) 1259 return -1; 1260 for (i = 0; i < count; i++) { 1261 struct sigma_stream *s = get_stream(dut, streams[i]); 1262 1263 if (!s) { 1264 snprintf(buf, sizeof(buf), "errorCode,StreamID %d " 1265 "not configured", streams[i]); 1266 send_resp(dut, conn, SIGMA_INVALID, buf); 1267 return 0; 1268 } 1269 for (j = 0; j < i; j++) 1270 if (streams[i] == streams[j]) 1271 return -1; 1272 if (!s->started) { 1273 snprintf(buf, sizeof(buf), "errorCode,Receive not " 1274 "started for streamID %d", streams[i]); 1275 send_resp(dut, conn, SIGMA_INVALID, buf); 1276 return 0; 1277 } 1278 } 1279 1280 for (i = 0; i < count; i++) { 1281 struct sigma_stream *s = get_stream(dut, streams[i]); 1282 1283 if (s) 1284 s->stop = 1; 1285 } 1286 1287 for (i = 0; i < count; i++) { 1288 struct sigma_stream *s = get_stream(dut, streams[i]); 1289 1290 if (!s) 1291 continue; 1292 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: stop " 1293 "receive for stream %d", streams[i]); 1294 stop_stream(s); 1295 } 1296 1297 buf[0] = '\0'; 1298 pos = buf; 1299 1300 pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,"); 1301 for (i = 0; i < count; i++) { 1302 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 1303 i > 0 ? " " : "", streams[i]); 1304 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 1305 break; 1306 pos += ret; 1307 } 1308 1309 if (dut->program == PROGRAM_60GHZ) { 1310 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,"); 1311 for (i = 0; i < count; i++) { 1312 struct sigma_stream *s = get_stream(dut, streams[i]); 1313 1314 if (!s) 1315 continue; 1316 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 1317 i > 0 ? " " : "", s->tx_act_frames); 1318 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 1319 break; 1320 pos += ret; 1321 } 1322 } 1323 1324 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,"); 1325 for (i = 0; i < count; i++) { 1326 struct sigma_stream *s = get_stream(dut, streams[i]); 1327 1328 if (!s) 1329 continue; 1330 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 1331 i > 0 ? " " : "", s->tx_frames); 1332 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 1333 break; 1334 pos += ret; 1335 } 1336 1337 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,"); 1338 for (i = 0; i < count; i++) { 1339 struct sigma_stream *s = get_stream(dut, streams[i]); 1340 1341 if (!s) 1342 continue; 1343 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 1344 i > 0 ? " " : "", s->rx_frames); 1345 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 1346 break; 1347 pos += ret; 1348 } 1349 1350 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,"); 1351 for (i = 0; i < count; i++) { 1352 struct sigma_stream *s = get_stream(dut, streams[i]); 1353 1354 if (!s) 1355 continue; 1356 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", 1357 i > 0 ? " " : "", s->tx_payload_bytes); 1358 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 1359 break; 1360 pos += ret; 1361 } 1362 1363 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,"); 1364 for (i = 0; i < count; i++) { 1365 struct sigma_stream *s = get_stream(dut, streams[i]); 1366 1367 if (!s) 1368 continue; 1369 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", 1370 i > 0 ? " " : "", s->rx_payload_bytes); 1371 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 1372 break; 1373 pos += ret; 1374 } 1375 1376 pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,"); 1377 for (i = 0; i < count; i++) { 1378 struct sigma_stream *s = get_stream(dut, streams[i]); 1379 1380 if (!s) 1381 continue; 1382 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", 1383 i > 0 ? " " : "", s->out_of_seq_frames); 1384 if (ret < 0 || ret >= buf + sizeof(buf) - pos) 1385 break; 1386 pos += ret; 1387 } 1388 1389 buf[sizeof(buf) - 1] = '\0'; 1390 1391 send_resp(dut, conn, SIGMA_COMPLETE, buf); 1392 1393 for (i = 0; i < count; i++) { 1394 struct sigma_stream *s = get_stream(dut, streams[i]); 1395 1396 if (!s) 1397 continue; 1398 if (s->profile == SIGMA_PROFILE_IPTV && s->num_stats > 0 && 1399 dut->write_stats) 1400 write_frame_stats(dut, s, streams[i]); 1401 free(s->stats); 1402 s->stats = NULL; 1403 s->num_stats = 0; 1404 } 1405 1406 return 0; 1407 } 1408 1409 1410 static int cmd_traffic_agent_version(struct sigma_dut *dut, 1411 struct sigma_conn *conn, 1412 struct sigma_cmd *cmd) 1413 { 1414 send_resp(dut, conn, SIGMA_COMPLETE, "version,1.0"); 1415 return 0; 1416 } 1417 1418 1419 void traffic_agent_register_cmds(void) 1420 { 1421 sigma_dut_reg_cmd("traffic_agent_config", NULL, 1422 cmd_traffic_agent_config); 1423 sigma_dut_reg_cmd("traffic_agent_reset", NULL, 1424 cmd_traffic_agent_reset); 1425 sigma_dut_reg_cmd("traffic_agent_send", NULL, 1426 cmd_traffic_agent_send); 1427 sigma_dut_reg_cmd("traffic_agent_receive_start", NULL, 1428 cmd_traffic_agent_receive_start); 1429 sigma_dut_reg_cmd("traffic_agent_receive_stop", NULL, 1430 cmd_traffic_agent_receive_stop); 1431 sigma_dut_reg_cmd("traffic_agent_version", NULL, 1432 cmd_traffic_agent_version); 1433 } 1434