xref: /wlan-dirver/utils/sigma-dut/traffic_agent.c (revision 3f8c0b3fc4ea8dfe23a1f02ef357cfcae03b2cfd)
1 /*
2  * Sigma Control API DUT (station/AP)
3  * Copyright (c) 2010, Atheros Communications, Inc.
4  * Copyright (c) 2011-2015, Qualcomm Atheros, Inc.
5  * All Rights Reserved.
6  * Licensed under the Clear BSD license. See README for more details.
7  */
8 
9 #include "sigma_dut.h"
10 #include "wpa_helpers.h"
11 
12 #define TG_MAX_CLIENTS_CONNECTIONS 1
13 
14 
15 static int cmd_traffic_agent_config(struct sigma_dut *dut,
16 				    struct sigma_conn *conn,
17 				    struct sigma_cmd *cmd)
18 {
19 	struct sigma_stream *s;
20 	const char *val;
21 	char buf[100];
22 
23 	if (dut->num_streams == MAX_SIGMA_STREAMS) {
24 		send_resp(dut, conn, SIGMA_ERROR, "errorCode,No more "
25 			  "concurrent traffic streams supported");
26 		return 0;
27 	}
28 
29 	s = &dut->streams[dut->num_streams];
30 	free(s->stats);
31 	memset(s, 0, sizeof(*s));
32 	s->sock = -1;
33 	s->no_timestamps = dut->no_timestamps;
34 
35 	val = get_param(cmd, "profile");
36 	if (!val)
37 		return -1;
38 
39 	if (strcasecmp(val, "File_Transfer") == 0)
40 		s->profile = SIGMA_PROFILE_FILE_TRANSFER;
41 	else if (strcasecmp(val, "Multicast") == 0)
42 		s->profile = SIGMA_PROFILE_MULTICAST;
43 	else if (strcasecmp(val, "IPTV") == 0)
44 		s->profile = SIGMA_PROFILE_IPTV;
45 	else if (strcasecmp(val, "Transaction") == 0)
46 		s->profile = SIGMA_PROFILE_TRANSACTION;
47 	else if (strcasecmp(val, "Start_Sync") == 0)
48 		s->profile = SIGMA_PROFILE_START_SYNC;
49 	else if (strcasecmp(val, "Uapsd") == 0)
50 		s->profile = SIGMA_PROFILE_UAPSD;
51 	else {
52 		send_resp(dut, conn, SIGMA_INVALID, "errorCode,Unsupported "
53 			  "profile");
54 		return 0;
55 	}
56 
57 	val = get_param(cmd, "direction");
58 	if (!val)
59 		return -1;
60 	if (strcasecmp(val, "send") == 0)
61 		s->sender = 1;
62 	else if (strcasecmp(val, "receive") == 0)
63 		s->sender = 0;
64 	else
65 		return -1;
66 
67 	val = get_param(cmd, "destination");
68 	if (val) {
69 		if (inet_aton(val, &s->dst) == 0)
70 			return -1;
71 	}
72 
73 	val = get_param(cmd, "source");
74 	if (val) {
75 		if (inet_aton(val, &s->src) == 0)
76 			return -1;
77 	}
78 
79 	val = get_param(cmd, "destinationPort");
80 	if (val)
81 		s->dst_port = atoi(val);
82 
83 	val = get_param(cmd, "sourcePort");
84 	if (val)
85 		s->src_port = atoi(val);
86 
87 	val = get_param(cmd, "frameRate");
88 	if (val)
89 		s->frame_rate = atoi(val);
90 
91 	val = get_param(cmd, "duration");
92 	if (val)
93 		s->duration = atoi(val);
94 
95 	val = get_param(cmd, "payloadSize");
96 	if (val)
97 		s->payload_size = atoi(val);
98 
99 	val = get_param(cmd, "startDelay");
100 	if (val)
101 		s->start_delay = atoi(val);
102 
103 	val = get_param(cmd, "maxCnt");
104 	if (val)
105 		s->max_cnt = atoi(val);
106 
107 	val = get_param(cmd, "trafficClass");
108 	if (val) {
109 		if (strcasecmp(val, "Voice") == 0)
110 			s->tc = SIGMA_TC_VOICE;
111 		else if (strcasecmp(val, "Video") == 0)
112 			s->tc = SIGMA_TC_VIDEO;
113 		else if (strcasecmp(val, "Background") == 0)
114 			s->tc = SIGMA_TC_BACKGROUND;
115 		else if (strcasecmp(val, "BestEffort") == 0)
116 			s->tc = SIGMA_TC_BEST_EFFORT;
117 		else
118 			return -1;
119 	}
120 
121 	val = get_param(cmd, "userpriority");
122 	if (val) {
123 		s->user_priority_set = 1;
124 		s->user_priority = atoi(val);
125 	}
126 
127 	val = get_param(cmd, "tagName");
128 	if (val) {
129 		strncpy(s->test_name, val, sizeof(s->test_name));
130 		s->test_name[sizeof(s->test_name) - 1] = '\0';
131 		sigma_dut_print(dut, DUT_MSG_DEBUG,
132 				"Traffic agent: U-APSD console tagname %s",
133 				s->test_name);
134 	}
135 
136 	if (dut->throughput_pktsize && s->frame_rate == 0 && s->sender &&
137 	    dut->throughput_pktsize != s->payload_size &&
138 	    (s->profile == SIGMA_PROFILE_FILE_TRANSFER ||
139 	     s->profile == SIGMA_PROFILE_IPTV ||
140 	     s->profile == SIGMA_PROFILE_UAPSD)) {
141 		sigma_dut_print(dut, DUT_MSG_INFO, "Traffic agent: Override "
142 				"throughput test payload size %d -> %d",
143 				s->payload_size, dut->throughput_pktsize);
144 		s->payload_size = dut->throughput_pktsize;
145 	}
146 
147 	val = get_param(cmd, "transProtoType");
148 	if (val) {
149 		if (strcmp(val, "1") == 0)
150 			s->trans_proto = IPPROTO_TCP;
151 		else if (strcmp(val, "0") == 0)
152 			s->trans_proto = IPPROTO_UDP;
153 		else
154 			return -1;
155 	} else {
156 		s->trans_proto = IPPROTO_UDP;
157 	}
158 
159 	if (s->profile == SIGMA_PROFILE_IPTV && !s->sender && !s->no_timestamps)
160 	{
161 		s->stats = calloc(MAX_SIGMA_STATS,
162 				  sizeof(struct sigma_frame_stats));
163 		if (s->stats == NULL)
164 			return -1;
165 	}
166 
167 	dut->stream_id++;
168 	dut->num_streams++;
169 
170 	s->stream_id = dut->stream_id;
171 	snprintf(buf, sizeof(buf), "streamID,%d", s->stream_id);
172 	send_resp(dut, conn, SIGMA_COMPLETE, buf);
173 	return 0;
174 }
175 
176 
177 static void stop_stream(struct sigma_stream *s)
178 {
179 	if (s && s->started) {
180 		pthread_join(s->thr, NULL);
181 		if (s->sock != -1) {
182 			close(s->sock);
183 			s->sock = -1;
184 		}
185 
186 		s->started = 0;
187 	}
188 }
189 
190 
191 static int cmd_traffic_agent_reset(struct sigma_dut *dut,
192 				   struct sigma_conn *conn,
193 				   struct sigma_cmd *cmd)
194 {
195 	int i;
196 	for (i = 0; i < dut->num_streams; i++) {
197 		struct sigma_stream *s = &dut->streams[i];
198 		s->stop = 1;
199 		stop_stream(s);
200 	}
201 	dut->num_streams = 0;
202 	memset(&dut->streams, 0, sizeof(dut->streams));
203 	return 1;
204 }
205 
206 
207 static int get_stream_id(const char *str, int streams[MAX_SIGMA_STREAMS])
208 {
209 	int count;
210 
211 	count = 0;
212 	for (;;) {
213 		if (count == MAX_SIGMA_STREAMS)
214 			return -1;
215 		streams[count] = atoi(str);
216 		if (streams[count] == 0)
217 			return -1;
218 		count++;
219 		str = strchr(str, ' ');
220 		if (str == NULL)
221 			break;
222 		while (*str == ' ')
223 			str++;
224 	}
225 
226 	return count;
227 }
228 
229 
230 static int open_socket_file_transfer(struct sigma_dut *dut,
231 				     struct sigma_stream *s)
232 {
233 	struct sockaddr_in addr;
234 	int sock_opt_val = 1;
235 
236 	s->sock = socket(PF_INET, IPPROTO_UDP == s->trans_proto ? SOCK_DGRAM :
237 			 SOCK_STREAM, s->trans_proto);
238 	if (s->sock < 0) {
239 		perror("socket");
240 		return -1;
241 	}
242 
243 	if (setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR, &sock_opt_val,
244 		       sizeof(sock_opt_val)) < 0) {
245 		perror("setsockopt");
246 		close(s->sock);
247 		s->sock = -1;
248 		return -1;
249 	}
250 
251 	memset(&addr, 0, sizeof(addr));
252 	addr.sin_family = AF_INET;
253 	addr.sin_port = htons(s->sender ? s->src_port : s->dst_port);
254 	sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: sender=%d "
255 			"bind port %d", s->sender, ntohs(addr.sin_port));
256 	if (bind(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
257 		perror("bind");
258 		close(s->sock);
259 		s->sock = -1;
260 		return -1;
261 	}
262 
263 	if (s->profile == SIGMA_PROFILE_MULTICAST && !s->sender)
264 		return 0;
265 
266 	if (s->trans_proto == IPPROTO_TCP && s->sender == 0) {
267 		if (listen(s->sock, TG_MAX_CLIENTS_CONNECTIONS ) < 0) {
268 			sigma_dut_print(dut, DUT_MSG_INFO,
269 					"Listen failed with error %d: %s",
270 					errno, strerror(errno));
271 			close(s->sock);
272 			s->sock = -1;
273 			return -1;
274 		}
275 	} else {
276 		memset(&addr, 0, sizeof(addr));
277 		addr.sin_family = AF_INET;
278 		addr.sin_addr.s_addr = s->sender ? s->dst.s_addr :
279 			s->src.s_addr;
280 		addr.sin_port = htons(s->sender ? s->dst_port : s->src_port);
281 		sigma_dut_print(dut, DUT_MSG_DEBUG,
282 				"Traffic agent: connect %s:%d",
283 				inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
284 		if (connect(s->sock, (struct sockaddr *) &addr, sizeof(addr)) <
285 		    0) {
286 			perror("connect");
287 			close(s->sock);
288 			s->sock = -1;
289 			return -1;
290 		}
291 	}
292 
293 	return 0;
294 }
295 
296 
297 static int open_socket_multicast(struct sigma_dut *dut, struct sigma_stream *s)
298 {
299 	if (open_socket_file_transfer(dut, s) < 0)
300 		return -1;
301 
302 	if (!s->sender) {
303 		struct ip_mreq mr;
304 		memset(&mr, 0, sizeof(mr));
305 		mr.imr_multiaddr.s_addr = s->dst.s_addr;
306 		mr.imr_interface.s_addr = htonl(INADDR_ANY);
307 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: "
308 				"IP_ADD_MEMBERSHIP %s", inet_ntoa(s->dst));
309 		if (setsockopt(s->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
310 			       (void *) &mr, sizeof(mr)) < 0) {
311 			sigma_dut_print(dut, DUT_MSG_INFO,
312 					"setsockopt[IP_ADD_MEMBERSHIP]: %s",
313 					strerror(errno));
314 			/*
315 			 * Continue anyway since this can happen, e.g., if the
316 			 * default route is missing. This is not critical for
317 			 * multicast RX testing.
318 			 */
319 		}
320 	}
321 
322 	return 0;
323 }
324 
325 
326 static int set_socket_prio(struct sigma_stream *s)
327 {
328 	int tos = 0x00;
329 
330 	switch (s->tc) {
331 	case SIGMA_TC_VOICE:
332 		if (s->user_priority_set) {
333 			if (s->user_priority == 6)
334 				tos = 48 << 2;
335 			else if (s->user_priority == 7)
336 				tos = 56 << 2;
337 			else
338 				return -1;
339 		} else
340 			tos = 0xe0; /* DSCP = 56 */
341 		break;
342 	case SIGMA_TC_VIDEO:
343 		if (s->user_priority_set) {
344 			if (s->user_priority == 4)
345 				tos = 32 << 2;
346 			else if (s->user_priority == 5)
347 				tos = 40 << 2;
348 			else
349 				return -1;
350 		} else
351 			tos = 0xa0; /* DSCP = 40 */
352 		break;
353 	case SIGMA_TC_BACKGROUND:
354 		if (s->user_priority_set) {
355 			if (s->user_priority == 1)
356 				tos = 8 << 2;
357 			else if (s->user_priority == 2)
358 				tos = 16 << 2;
359 			else
360 				return -1;
361 		} else
362 			tos = 0x20; /* DSCP = 8 */
363 		break;
364 	case SIGMA_TC_BEST_EFFORT:
365 		if (s->user_priority_set) {
366 			if (s->user_priority == 0)
367 				tos = 0 << 2;
368 			else if (s->user_priority == 3)
369 				tos = 20 << 2;
370 			else
371 				return -1;
372 		} else
373 			tos = 0x00; /* DSCP = 0 */
374 		break;
375 	}
376 
377 	if (setsockopt(s->sock, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)) < 0) {
378 		perror("setsockopt");
379 		return -1;
380 	}
381 
382 	return 0;
383 }
384 
385 
386 static int open_socket(struct sigma_dut *dut, struct sigma_stream *s)
387 {
388 	switch (s->profile) {
389 	case SIGMA_PROFILE_FILE_TRANSFER:
390 		return open_socket_file_transfer(dut, s);
391 	case SIGMA_PROFILE_MULTICAST:
392 		return open_socket_multicast(dut, s);
393 	case SIGMA_PROFILE_IPTV:
394 		if (open_socket_file_transfer(dut, s) < 0)
395 			return -1;
396 		return set_socket_prio(s);
397 	case SIGMA_PROFILE_TRANSACTION:
398 		return open_socket_file_transfer(dut, s);
399 	case SIGMA_PROFILE_UAPSD:
400 		return open_socket_file_transfer(dut, s);
401 	case SIGMA_PROFILE_START_SYNC:
402 		sigma_dut_print(dut, DUT_MSG_INFO, "Traffic stream profile %d "
403 				"not yet supported", s->profile);
404 		/* TODO */
405 		break;
406 	}
407 
408 	return -1;
409 }
410 
411 
412 static void send_file_fast(struct sigma_stream *s, char *pkt)
413 {
414 	struct timeval stop, now;
415 	int res;
416 	unsigned int counter = 0;
417 
418 	gettimeofday(&stop, NULL);
419 	stop.tv_sec += s->duration;
420 
421 	while (!s->stop) {
422 		counter++;
423 		WPA_PUT_BE32(&pkt[8], counter);
424 
425 		if ((counter & 0xf) == 0) {
426 			gettimeofday(&now, NULL);
427 			if (now.tv_sec > stop.tv_sec ||
428 			    (now.tv_sec == stop.tv_sec &&
429 			     now.tv_usec >= stop.tv_usec))
430 				break;
431 		}
432 
433 		s->tx_act_frames++;
434 		res = send(s->sock, pkt, s->payload_size, 0);
435 		if (res >= 0) {
436 			s->tx_frames++;
437 			s->tx_payload_bytes += res;
438 		} else {
439 			switch (errno) {
440 			case EAGAIN:
441 			case ENOBUFS:
442 				usleep(1000);
443 				break;
444 			case ECONNRESET:
445 			case EPIPE:
446 				s->stop = 1;
447 				break;
448 			default:
449 				perror("send");
450 				break;
451 			}
452 		}
453 	}
454 }
455 
456 
457 static void send_file(struct sigma_stream *s)
458 {
459 	char *pkt;
460 	struct timeval stop, now, start;
461 	int res;
462 	unsigned int counter = 0;
463 	int sleep_time, sleep_usec, pkt_spacing, duration = 0;
464 
465 	if (s->duration <= 0 || s->frame_rate < 0 || s->payload_size < 20)
466 		return;
467 
468 	pkt = malloc(s->payload_size);
469 	if (pkt == NULL)
470 		return;
471 	memset(pkt, 1, s->payload_size);
472 	strncpy(pkt, "1345678", s->payload_size);
473 
474 	if (s->frame_rate == 0 && s->no_timestamps) {
475 		send_file_fast(s, pkt);
476 		free(pkt);
477 		return;
478 	}
479 
480 	gettimeofday(&stop, NULL);
481 	stop.tv_sec += s->duration;
482 
483 	if (s->frame_rate == 0) {
484 		sleep_time = 0;
485 		pkt_spacing = 0;
486 	} else {
487 		sleep_time = 1000000 / s->frame_rate;
488 		pkt_spacing = sleep_time;
489 		sleep_time -= 100;
490 		if (sleep_time < 0)
491 			sleep_time = 0;
492 	}
493 	sleep_usec = sleep_time;
494 
495 	gettimeofday(&start, NULL);
496 
497 	while (!s->stop) {
498 		counter++;
499 		WPA_PUT_BE32(&pkt[8], counter);
500 
501 		if (sleep_usec)
502 			usleep(sleep_usec);
503 
504 		gettimeofday(&now, NULL);
505 		if (now.tv_sec > stop.tv_sec ||
506 		    (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
507 			break;
508 
509 		if (sleep_time) {
510 			/* Update sleep time based on current time */
511 			struct timeval tmp;
512 			int diff;
513 
514 			duration += pkt_spacing;
515 			timersub(&now, &start, &tmp);
516 			diff = tmp.tv_sec * 1000000 + tmp.tv_usec;
517 
518 			if (diff > duration) {
519 				sleep_usec = pkt_spacing - (diff - duration);
520 				sleep_usec -= 100;
521 			} else {
522 				sleep_usec = pkt_spacing - 100;
523 			}
524 			if (sleep_usec < 0)
525 				sleep_usec = 0;
526 		}
527 
528 		WPA_PUT_BE32(&pkt[12], now.tv_sec);
529 		WPA_PUT_BE32(&pkt[16], now.tv_usec);
530 
531 		s->tx_act_frames++;
532 		res = send(s->sock, pkt, s->payload_size, 0);
533 		if (res >= 0) {
534 			s->tx_frames++;
535 			s->tx_payload_bytes += res;
536 		} else {
537 			switch (errno) {
538 			case EAGAIN:
539 			case ENOBUFS:
540 				usleep(1000);
541 				break;
542 			case ECONNRESET:
543 			case EPIPE:
544 				s->stop = 1;
545 				break;
546 			default:
547 				perror("send");
548 				break;
549 			}
550 		}
551 	}
552 
553 	free(pkt);
554 }
555 
556 
557 static void send_transaction(struct sigma_stream *s)
558 {
559 	char *pkt, *rpkt;
560 	struct timeval stop, now;
561 	int res;
562 	unsigned int counter = 0, rcounter;
563 	int wait_time;
564 	fd_set rfds;
565 	struct timeval tv;
566 
567 	if (s->duration <= 0 || s->frame_rate <= 0 || s->payload_size < 20)
568 		return;
569 
570 	pkt = malloc(s->payload_size);
571 	if (pkt == NULL)
572 		return;
573 	rpkt = malloc(s->payload_size);
574 	if (rpkt == NULL) {
575 		free(pkt);
576 		return;
577 	}
578 	memset(pkt, 1, s->payload_size);
579 	strncpy(pkt, "1345678", s->payload_size);
580 
581 	gettimeofday(&stop, NULL);
582 	stop.tv_sec += s->duration;
583 
584 	wait_time = 1000000 / s->frame_rate;
585 
586 	while (!s->stop) {
587 		counter++;
588 		if (s->max_cnt && (int) counter > s->max_cnt)
589 			break;
590 		WPA_PUT_BE32(&pkt[8], counter);
591 
592 		gettimeofday(&now, NULL);
593 		if (now.tv_sec > stop.tv_sec ||
594 		    (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
595 			break;
596 		WPA_PUT_BE32(&pkt[12], now.tv_sec);
597 		WPA_PUT_BE32(&pkt[16], now.tv_usec);
598 
599 		res = send(s->sock, pkt, s->payload_size, 0);
600 		if (res >= 0) {
601 			s->tx_frames++;
602 			s->tx_payload_bytes += res;
603 		} else {
604 			switch (errno) {
605 			case EAGAIN:
606 			case ENOBUFS:
607 				usleep(1000);
608 				break;
609 			case ECONNRESET:
610 			case EPIPE:
611 				s->stop = 1;
612 				break;
613 			default:
614 				perror("send");
615 				break;
616 			}
617 		}
618 
619 		/* Wait for response */
620 		tv.tv_sec = 0;
621 		tv.tv_usec = wait_time;
622 		FD_ZERO(&rfds);
623 		FD_SET(s->sock, &rfds);
624 		res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
625 		if (res < 0) {
626 			if (errno == EINTR)
627 				continue;
628 			perror("select");
629 			break;
630 		}
631 
632 		if (res == 0) {
633 			/* timeout */
634 			continue;
635 		}
636 
637 		if (FD_ISSET(s->sock, &rfds)) {
638 			/* response received */
639 			res = recv(s->sock, rpkt, s->payload_size, 0);
640 			if (res < 0) {
641 				perror("recv");
642 				break;
643 			}
644 			rcounter = WPA_GET_BE32(&rpkt[8]);
645 			if (rcounter != counter)
646 				s->out_of_seq_frames++;
647 			s->rx_frames++;
648 			s->rx_payload_bytes += res;
649 		}
650 	}
651 
652 	free(pkt);
653 	free(rpkt);
654 }
655 
656 
657 static void * send_thread(void *ctx)
658 {
659 	struct sigma_stream *s = ctx;
660 
661 	sleep(s->start_delay);
662 
663 	switch (s->profile) {
664 	case SIGMA_PROFILE_FILE_TRANSFER:
665 		send_file(s);
666 		break;
667 	case SIGMA_PROFILE_MULTICAST:
668 		send_file(s);
669 		break;
670 	case SIGMA_PROFILE_IPTV:
671 		send_file(s);
672 		break;
673 	case SIGMA_PROFILE_TRANSACTION:
674 		send_transaction(s);
675 		break;
676 	case SIGMA_PROFILE_START_SYNC:
677 		break;
678 	case SIGMA_PROFILE_UAPSD:
679 		send_uapsd_console(s);
680 		break;
681 	}
682 
683 	return NULL;
684 }
685 
686 
687 struct traffic_agent_send_data {
688 	struct sigma_dut *dut;
689 	struct sigma_conn *conn;
690 	int streams[MAX_SIGMA_STREAMS];
691 	int count;
692 };
693 
694 
695 static struct sigma_stream * get_stream(struct sigma_dut *dut, int id)
696 {
697 	int i;
698 
699 	for (i = 0; i < dut->num_streams; i++) {
700 		if ((unsigned int) id == dut->streams[i].stream_id)
701 			return &dut->streams[i];
702 	}
703 
704 	return NULL;
705 }
706 
707 
708 static void * send_report_thread(void *ctx)
709 {
710 	struct traffic_agent_send_data *data = ctx;
711 	struct sigma_dut *dut = data->dut;
712 	struct sigma_conn *conn = data->conn;
713 	int i, ret;
714 	char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
715 
716 	for (i = 0; i < data->count; i++) {
717 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: waiting "
718 				"for stream %d send to complete",
719 				data->streams[i]);
720 		stop_stream(get_stream(dut, data->streams[i]));
721 	}
722 
723 	buf[0] = '\0';
724 	pos = buf;
725 
726 	pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
727 	for (i = 0; i < data->count; i++) {
728 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
729 			       i > 0 ? " " : "", data->streams[i]);
730 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
731 			break;
732 		pos += ret;
733 	}
734 
735 	if (dut->program == PROGRAM_60GHZ) {
736 		sigma_dut_print(dut, DUT_MSG_INFO, "reporting tx_act_frames");
737 		pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
738 		for (i = 0; i < data->count; i++) {
739 			struct sigma_stream *s;
740 
741 			s = get_stream(dut, data->streams[i]);
742 			if (!s)
743 				continue;
744 			ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
745 				       i > 0 ? " " : "", s->tx_act_frames);
746 			if (ret < 0 || ret >= buf + sizeof(buf) - pos)
747 				break;
748 			pos += ret;
749 		}
750 	}
751 
752 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
753 	for (i = 0; i < data->count; i++) {
754 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
755 
756 		if (!s)
757 			continue;
758 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
759 			       i > 0 ? " " : "", s->tx_frames);
760 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
761 			break;
762 		pos += ret;
763 	}
764 
765 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
766 	for (i = 0; i < data->count; i++) {
767 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
768 
769 		if (!s)
770 			continue;
771 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
772 			       i > 0 ? " " : "", s->rx_frames);
773 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
774 			break;
775 		pos += ret;
776 	}
777 
778 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
779 	for (i = 0; i < data->count; i++) {
780 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
781 
782 		if (!s)
783 			continue;
784 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
785 			       i > 0 ? " " : "", s->tx_payload_bytes);
786 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
787 			break;
788 		pos += ret;
789 	}
790 
791 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
792 	for (i = 0; i < data->count; i++) {
793 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
794 
795 		if (!s)
796 			continue;
797 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
798 			       i > 0 ? " " : "", s->rx_payload_bytes);
799 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
800 			break;
801 		pos += ret;
802 	}
803 
804 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
805 	for (i = 0; i < data->count; i++) {
806 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
807 
808 		if (!s)
809 			continue;
810 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
811 			       i > 0 ? " " : "", s->out_of_seq_frames);
812 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
813 			break;
814 		pos += ret;
815 	}
816 
817 	for (i = 0; i < data->count; i++) {
818 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
819 		if (!s)
820 			continue;
821 		s->ta_send_in_progress = 0;
822 		if (s->trans_proto == IPPROTO_TCP) {
823 			/*
824 			 * Close the socket to make sure client side close the
825 			 * network before the server. Otherwise, the server
826 			 * might get "Address already in use" when trying to
827 			 * reuse the port.
828 			 */
829 			close(s->sock);
830 			s->sock = -1;
831 			sigma_dut_print(dut, DUT_MSG_DEBUG,
832 					"Closed the sender socket");
833 		}
834 	}
835 
836 	buf[sizeof(buf) - 1] = '\0';
837 
838 	if (conn->s < 0)
839 		sigma_dut_print(dut, DUT_MSG_INFO, "Cannot send traffic_agent response since control socket has already been closed");
840 	else
841 		send_resp(dut, conn, SIGMA_COMPLETE, buf);
842 	conn->waiting_completion = 0;
843 
844 	free(data);
845 
846 	return NULL;
847 }
848 
849 
850 static int cmd_traffic_agent_send(struct sigma_dut *dut,
851 				  struct sigma_conn *conn,
852 				  struct sigma_cmd *cmd)
853 {
854 	const char *val;
855 	int i, j, res;
856 	char buf[100];
857 	struct traffic_agent_send_data *data;
858 
859 	val = get_param(cmd, "streamID");
860 	if (val == NULL)
861 		return -1;
862 
863 	data = calloc(1, sizeof(*data));
864 	if (data == NULL)
865 		return -1;
866 	data->dut = dut;
867 	data->conn = conn;
868 
869 	data->count = get_stream_id(val, data->streams);
870 	if (data->count < 0) {
871 		free(data);
872 		return -1;
873 	}
874 	for (i = 0; i < data->count; i++) {
875 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
876 
877 		if (!s) {
878 			snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
879 				 "not configured", data->streams[i]);
880 			send_resp(dut, conn, SIGMA_INVALID, buf);
881 			free(data);
882 			return 0;
883 		}
884 		for (j = 0; j < i; j++)
885 			if (data->streams[i] == data->streams[j])
886 				return -1;
887 		if (!s->sender) {
888 			snprintf(buf, sizeof(buf), "errorCode,Not configured "
889 				 "as sender for streamID %d", data->streams[i]);
890 			send_resp(dut, conn, SIGMA_INVALID, buf);
891 			free(data);
892 			return 0;
893 		}
894 		if (s->ta_send_in_progress) {
895 			send_resp(dut, conn, SIGMA_ERROR,
896 				  "errorCode,Multiple concurrent send cmds on same streamID not supported");
897 			free(data);
898 			return 0;
899 		}
900 	}
901 
902 	for (i = 0; i < data->count; i++) {
903 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
904 
905 		if (!s)
906 			continue;
907 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
908 				"socket for send stream %d", data->streams[i]);
909 		if (open_socket(dut, s) < 0) {
910 			free(data);
911 			return -2;
912 		}
913 	}
914 
915 	for (i = 0; i < data->count; i++) {
916 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
917 
918 		if (!s)
919 			continue;
920 
921 		/*
922 		 * Provide dut context to the thread to support debugging and
923 		 * returning of error messages.
924 		 */
925 		s->dut = dut;
926 
927 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
928 				"send for stream %d", data->streams[i]);
929 		res = pthread_create(&s->thr, NULL, send_thread, s);
930 		if (res) {
931 			sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
932 					"failed: %d", res);
933 			free(data);
934 			return -2;
935 		}
936 		s->started = 1;
937 	}
938 
939 	sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start a thread to track sending streams");
940 	conn->waiting_completion = 1;
941 	res = pthread_create(&dut->thr, NULL, send_report_thread, data);
942 	if (res) {
943 		sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create failed: %d",
944 				res);
945 		free(data);
946 		conn->waiting_completion = 0;
947 		return -2;
948 	}
949 
950 	for (i = 0; i < data->count; i++) {
951 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
952 
953 		if (s)
954 			s->ta_send_in_progress = 1;
955 	}
956 
957 	/* Command will be completed in send_report_thread() */
958 
959 	return 0;
960 }
961 
962 
963 static void receive_file(struct sigma_stream *s)
964 {
965 	struct timeval tv, now;
966 	fd_set rfds;
967 	int res;
968 	char *pkt;
969 	int pktlen;
970 	unsigned int last_rx = 0, counter;
971 
972 	pktlen = 65536 + 1;
973 	pkt = malloc(pktlen);
974 	if (pkt == NULL)
975 		return;
976 
977 	while (!s->stop) {
978 		FD_ZERO(&rfds);
979 		FD_SET(s->sock, &rfds);
980 		tv.tv_sec = 0;
981 		tv.tv_usec = 300000;
982 		res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
983 		if (res < 0) {
984 			perror("select");
985 			usleep(10000);
986 		} else if (FD_ISSET(s->sock, &rfds)) {
987 			res = recv(s->sock, pkt, pktlen, 0);
988 			if (res >= 0) {
989 				s->rx_frames++;
990 				s->rx_payload_bytes += res;
991 
992 				counter = WPA_GET_BE32(&pkt[8]);
993 				if (counter < last_rx)
994 					s->out_of_seq_frames++;
995 				last_rx = counter;
996 			} else {
997 				perror("recv");
998 				break;
999 			}
1000 
1001 			if (res >= 20 && s->stats &&
1002 			    s->num_stats < MAX_SIGMA_STATS) {
1003 				struct sigma_frame_stats *stats;
1004 				stats = &s->stats[s->num_stats];
1005 				s->num_stats++;
1006 				gettimeofday(&now, NULL);
1007 				stats->seqnum = counter;
1008 				stats->local_sec = now.tv_sec;
1009 				stats->local_usec = now.tv_usec;
1010 				stats->remote_sec = WPA_GET_BE32(&pkt[12]);
1011 				stats->remote_usec = WPA_GET_BE32(&pkt[16]);
1012 			}
1013 		}
1014 	}
1015 
1016 	free(pkt);
1017 }
1018 
1019 
1020 static void receive_transaction(struct sigma_stream *s)
1021 {
1022 	struct timeval tv;
1023 	fd_set rfds;
1024 	int res;
1025 	char *pkt;
1026 	int pktlen;
1027 	unsigned int last_rx = 0, counter;
1028 	struct sockaddr_in addr;
1029 	socklen_t addrlen;
1030 
1031 	if (s->payload_size)
1032 		pktlen = s->payload_size;
1033 	else
1034 		pktlen = 65536 + 1;
1035 	pkt = malloc(pktlen);
1036 	if (pkt == NULL)
1037 		return;
1038 
1039 	while (!s->stop) {
1040 		FD_ZERO(&rfds);
1041 		FD_SET(s->sock, &rfds);
1042 		tv.tv_sec = 0;
1043 		tv.tv_usec = 300000;
1044 		res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
1045 		if (res < 0) {
1046 			perror("select");
1047 			usleep(10000);
1048 		} else if (FD_ISSET(s->sock, &rfds)) {
1049 			addrlen = sizeof(addr);
1050 			res = recvfrom(s->sock, pkt, pktlen, 0,
1051 				       (struct sockaddr *) &addr, &addrlen);
1052 			if (res < 0) {
1053 				perror("recv");
1054 				break;
1055 			}
1056 
1057 			s->rx_frames++;
1058 			s->rx_payload_bytes += res;
1059 
1060 			counter = WPA_GET_BE32(&pkt[8]);
1061 			if (counter < last_rx)
1062 				s->out_of_seq_frames++;
1063 			last_rx = counter;
1064 
1065 			/* send response */
1066 			res = sendto(s->sock, pkt, pktlen, 0,
1067 				     (struct sockaddr *) &addr, addrlen);
1068 			if (res < 0) {
1069 				perror("sendto");
1070 			} else {
1071 				s->tx_frames++;
1072 				s->tx_payload_bytes += res;
1073 			}
1074 		}
1075 	}
1076 
1077 	free(pkt);
1078 }
1079 
1080 
1081 static void * receive_thread(void *ctx)
1082 {
1083 	struct sigma_stream *s = ctx;
1084 
1085 	if (s->trans_proto == IPPROTO_TCP) {
1086 		/* Wait for socket to be accepted */
1087 		struct sockaddr_in connected_addr;
1088 		int connected_sock; /* returned from accept on sock */
1089 		socklen_t connected_addr_len = sizeof(connected_addr);
1090 
1091 		sigma_dut_print(s->dut, DUT_MSG_DEBUG,
1092 				"Traffic agent: Waiting on accept");
1093 		connected_sock = accept(s->sock,
1094 					(struct sockaddr *) &connected_addr,
1095 					&connected_addr_len);
1096 		if (connected_sock < 0) {
1097 			sigma_dut_print(s->dut, DUT_MSG_ERROR,
1098 					"Traffic agent: Failed to accept: %s",
1099 					strerror(errno));
1100 			return NULL;
1101 		}
1102 
1103 		sigma_dut_print(s->dut, DUT_MSG_DEBUG,
1104 				"Traffic agent: Accepted client closing parent socket and talk over connected sock.");
1105 		close(s->sock);
1106 		s->sock = connected_sock;
1107 	}
1108 
1109 	switch (s->profile) {
1110 	case SIGMA_PROFILE_FILE_TRANSFER:
1111 		receive_file(s);
1112 		break;
1113 	case SIGMA_PROFILE_MULTICAST:
1114 		receive_file(s);
1115 		break;
1116 	case SIGMA_PROFILE_IPTV:
1117 		receive_file(s);
1118 		break;
1119 	case SIGMA_PROFILE_TRANSACTION:
1120 		receive_transaction(s);
1121 		break;
1122 	case SIGMA_PROFILE_START_SYNC:
1123 		break;
1124 	case SIGMA_PROFILE_UAPSD:
1125 		receive_uapsd(s);
1126 		break;
1127 	}
1128 
1129 	return NULL;
1130 }
1131 
1132 
1133 static int cmd_traffic_agent_receive_start(struct sigma_dut *dut,
1134 					   struct sigma_conn *conn,
1135 					   struct sigma_cmd *cmd)
1136 {
1137 	const char *val;
1138 	int streams[MAX_SIGMA_STREAMS];
1139 	int i, j, count;
1140 	char buf[100];
1141 
1142 	val = get_param(cmd, "streamID");
1143 	if (val == NULL)
1144 		return -1;
1145 	count = get_stream_id(val, streams);
1146 	if (count < 0)
1147 		return -1;
1148 	for (i = 0; i < count; i++) {
1149 		struct sigma_stream *s = get_stream(dut, streams[i]);
1150 
1151 		if (!s) {
1152 			snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
1153 				 "not configured", streams[i]);
1154 			send_resp(dut, conn, SIGMA_INVALID, buf);
1155 			return 0;
1156 		}
1157 		for (j = 0; j < i; j++)
1158 			if (streams[i] == streams[j])
1159 				return -1;
1160 		if (s->sender) {
1161 			snprintf(buf, sizeof(buf), "errorCode,Not configured "
1162 				 "as receiver for streamID %d", streams[i]);
1163 			send_resp(dut, conn, SIGMA_INVALID, buf);
1164 			return 0;
1165 		}
1166 	}
1167 
1168 	for (i = 0; i < count; i++) {
1169 		struct sigma_stream *s = get_stream(dut, streams[i]);
1170 
1171 		if (!s)
1172 			continue;
1173 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
1174 				"receive socket for stream %d", streams[i]);
1175 		if (open_socket(dut, s) < 0)
1176 			return -2;
1177 	}
1178 
1179 	for (i = 0; i < count; i++) {
1180 		struct sigma_stream *s = get_stream(dut, streams[i]);
1181 		int res;
1182 
1183 		if (!s)
1184 			continue;
1185 		/*
1186 		 * Provide dut context to the thread to support debugging and
1187 		 * returning of error messages. Similarly, provide interface
1188 		 * information to the thread. If the Interface parameter is not
1189 		 * passed, get it from get_station_ifname() since the interface
1190 		 * name is needed for power save mode configuration for Uapsd
1191 		 * cases.
1192 		 */
1193 		s->dut = dut;
1194 		val = get_param(cmd, "Interface");
1195 		strncpy(s->ifname, (val ? val : get_station_ifname()),
1196 			sizeof(s->ifname));
1197 		s->ifname[sizeof(s->ifname) - 1] = '\0';
1198 
1199 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
1200 				"receive for stream %d", streams[i]);
1201 		res = pthread_create(&s->thr, NULL, receive_thread, s);
1202 		if (res) {
1203 			sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
1204 					"failed: %d", res);
1205 			return -2;
1206 		}
1207 		s->started = 1;
1208 	}
1209 
1210 	return 1;
1211 }
1212 
1213 
1214 static void write_frame_stats(struct sigma_dut *dut, struct sigma_stream *s,
1215 			      int id)
1216 {
1217 	char fname[128];
1218 	FILE *f;
1219 	unsigned int i;
1220 
1221 	snprintf(fname, sizeof(fname), SIGMA_TMPDIR "/e2e%u-%d.txt",
1222 		 (unsigned int) time(NULL), id);
1223 	f = fopen(fname, "w");
1224 	if (f == NULL) {
1225 		sigma_dut_print(dut, DUT_MSG_INFO, "Could not write %s",
1226 				fname);
1227 		return;
1228 	}
1229 	fprintf(f, "seqnum:local_sec:local_usec:remote_sec:remote_usec\n");
1230 
1231 	sigma_dut_print(dut, DUT_MSG_DEBUG, "Writing frame stats to %s",
1232 			fname);
1233 
1234 	for (i = 0; i < s->num_stats; i++) {
1235 		struct sigma_frame_stats *stats = &s->stats[i];
1236 		fprintf(f, "%u:%u:%u:%u:%u\n", stats->seqnum,
1237 			stats->local_sec, stats->local_usec,
1238 			stats->remote_sec, stats->remote_usec);
1239 	}
1240 
1241 	fclose(f);
1242 }
1243 
1244 
1245 static int cmd_traffic_agent_receive_stop(struct sigma_dut *dut,
1246 					  struct sigma_conn *conn,
1247 					  struct sigma_cmd *cmd)
1248 {
1249 	const char *val;
1250 	int streams[MAX_SIGMA_STREAMS];
1251 	int i, j, ret, count;
1252 	char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
1253 
1254 	val = get_param(cmd, "streamID");
1255 	if (val == NULL)
1256 		return -1;
1257 	count = get_stream_id(val, streams);
1258 	if (count < 0)
1259 		return -1;
1260 	for (i = 0; i < count; i++) {
1261 		struct sigma_stream *s = get_stream(dut, streams[i]);
1262 
1263 		if (!s) {
1264 			snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
1265 				 "not configured", streams[i]);
1266 			send_resp(dut, conn, SIGMA_INVALID, buf);
1267 			return 0;
1268 		}
1269 		for (j = 0; j < i; j++)
1270 			if (streams[i] == streams[j])
1271 				return -1;
1272 		if (!s->started) {
1273 			snprintf(buf, sizeof(buf), "errorCode,Receive not "
1274 				 "started for streamID %d", streams[i]);
1275 			send_resp(dut, conn, SIGMA_INVALID, buf);
1276 			return 0;
1277 		}
1278 	}
1279 
1280 	for (i = 0; i < count; i++) {
1281 		struct sigma_stream *s = get_stream(dut, streams[i]);
1282 
1283 		if (s)
1284 			s->stop = 1;
1285 	}
1286 
1287 	for (i = 0; i < count; i++) {
1288 		struct sigma_stream *s = get_stream(dut, streams[i]);
1289 
1290 		if (!s)
1291 			continue;
1292 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: stop "
1293 				"receive for stream %d", streams[i]);
1294 		stop_stream(s);
1295 	}
1296 
1297 	buf[0] = '\0';
1298 	pos = buf;
1299 
1300 	pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
1301 	for (i = 0; i < count; i++) {
1302 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1303 			       i > 0 ? " " : "", streams[i]);
1304 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1305 			break;
1306 		pos += ret;
1307 	}
1308 
1309 	if (dut->program == PROGRAM_60GHZ) {
1310 		pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
1311 		for (i = 0; i < count; i++) {
1312 			struct sigma_stream *s = get_stream(dut, streams[i]);
1313 
1314 			if (!s)
1315 				continue;
1316 			ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1317 				       i > 0 ? " " : "", s->tx_act_frames);
1318 			if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1319 				break;
1320 			pos += ret;
1321 		}
1322 	}
1323 
1324 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
1325 	for (i = 0; i < count; i++) {
1326 		struct sigma_stream *s = get_stream(dut, streams[i]);
1327 
1328 		if (!s)
1329 			continue;
1330 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1331 			       i > 0 ? " " : "", s->tx_frames);
1332 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1333 			break;
1334 		pos += ret;
1335 	}
1336 
1337 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
1338 	for (i = 0; i < count; i++) {
1339 		struct sigma_stream *s = get_stream(dut, streams[i]);
1340 
1341 		if (!s)
1342 			continue;
1343 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1344 			       i > 0 ? " " : "", s->rx_frames);
1345 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1346 			break;
1347 		pos += ret;
1348 	}
1349 
1350 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
1351 	for (i = 0; i < count; i++) {
1352 		struct sigma_stream *s = get_stream(dut, streams[i]);
1353 
1354 		if (!s)
1355 			continue;
1356 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
1357 			       i > 0 ? " " : "", s->tx_payload_bytes);
1358 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1359 			break;
1360 		pos += ret;
1361 	}
1362 
1363 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
1364 	for (i = 0; i < count; i++) {
1365 		struct sigma_stream *s = get_stream(dut, streams[i]);
1366 
1367 		if (!s)
1368 			continue;
1369 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
1370 			       i > 0 ? " " : "", s->rx_payload_bytes);
1371 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1372 			break;
1373 		pos += ret;
1374 	}
1375 
1376 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
1377 	for (i = 0; i < count; i++) {
1378 		struct sigma_stream *s = get_stream(dut, streams[i]);
1379 
1380 		if (!s)
1381 			continue;
1382 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1383 			       i > 0 ? " " : "", s->out_of_seq_frames);
1384 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1385 			break;
1386 		pos += ret;
1387 	}
1388 
1389 	buf[sizeof(buf) - 1] = '\0';
1390 
1391 	send_resp(dut, conn, SIGMA_COMPLETE, buf);
1392 
1393 	for (i = 0; i < count; i++) {
1394 		struct sigma_stream *s = get_stream(dut, streams[i]);
1395 
1396 		if (!s)
1397 			continue;
1398 		if (s->profile == SIGMA_PROFILE_IPTV && s->num_stats > 0 &&
1399 		    dut->write_stats)
1400 			write_frame_stats(dut, s, streams[i]);
1401 		free(s->stats);
1402 		s->stats = NULL;
1403 		s->num_stats = 0;
1404 	}
1405 
1406 	return 0;
1407 }
1408 
1409 
1410 static int cmd_traffic_agent_version(struct sigma_dut *dut,
1411 				     struct sigma_conn *conn,
1412 				     struct sigma_cmd *cmd)
1413 {
1414 	send_resp(dut, conn, SIGMA_COMPLETE, "version,1.0");
1415 	return 0;
1416 }
1417 
1418 
1419 void traffic_agent_register_cmds(void)
1420 {
1421 	sigma_dut_reg_cmd("traffic_agent_config", NULL,
1422 			  cmd_traffic_agent_config);
1423 	sigma_dut_reg_cmd("traffic_agent_reset", NULL,
1424 			  cmd_traffic_agent_reset);
1425 	sigma_dut_reg_cmd("traffic_agent_send", NULL,
1426 			  cmd_traffic_agent_send);
1427 	sigma_dut_reg_cmd("traffic_agent_receive_start", NULL,
1428 			  cmd_traffic_agent_receive_start);
1429 	sigma_dut_reg_cmd("traffic_agent_receive_stop", NULL,
1430 			  cmd_traffic_agent_receive_stop);
1431 	sigma_dut_reg_cmd("traffic_agent_version", NULL,
1432 			  cmd_traffic_agent_version);
1433 }
1434