xref: /wlan-dirver/utils/sigma-dut/traffic_agent.c (revision 6d7c88cf50c3135d2320d3bb9666c61974de1ea7)
1 /*
2  * Sigma Control API DUT (station/AP)
3  * Copyright (c) 2010, Atheros Communications, Inc.
4  * Copyright (c) 2011-2017, Qualcomm Atheros, Inc.
5  * Copyright (c) 2018-2019, The Linux Foundation
6  * All Rights Reserved.
7  * Licensed under the Clear BSD license. See README for more details.
8  */
9 
10 #include "sigma_dut.h"
11 #include "wpa_helpers.h"
12 
13 #define TG_MAX_CLIENTS_CONNECTIONS 1
14 
15 
cmd_traffic_agent_config(struct sigma_dut * dut,struct sigma_conn * conn,struct sigma_cmd * cmd)16 static enum sigma_cmd_result cmd_traffic_agent_config(struct sigma_dut *dut,
17 						      struct sigma_conn *conn,
18 						      struct sigma_cmd *cmd)
19 {
20 	struct sigma_stream *s;
21 	const char *val;
22 	char buf[100];
23 
24 	if (dut->num_streams == MAX_SIGMA_STREAMS) {
25 		send_resp(dut, conn, SIGMA_ERROR, "errorCode,No more "
26 			  "concurrent traffic streams supported");
27 		return STATUS_SENT;
28 	}
29 
30 	s = &dut->streams[dut->num_streams];
31 	free(s->stats);
32 	memset(s, 0, sizeof(*s));
33 	s->sock = -1;
34 	s->no_timestamps = dut->no_timestamps;
35 
36 	val = get_param(cmd, "profile");
37 	if (!val)
38 		return INVALID_SEND_STATUS;
39 
40 	if (strcasecmp(val, "File_Transfer") == 0)
41 		s->profile = SIGMA_PROFILE_FILE_TRANSFER;
42 	else if (strcasecmp(val, "Multicast") == 0)
43 		s->profile = SIGMA_PROFILE_MULTICAST;
44 	else if (strcasecmp(val, "IPTV") == 0)
45 		s->profile = SIGMA_PROFILE_IPTV;
46 	else if (strcasecmp(val, "Transaction") == 0)
47 		s->profile = SIGMA_PROFILE_TRANSACTION;
48 	else if (strcasecmp(val, "Start_Sync") == 0)
49 		s->profile = SIGMA_PROFILE_START_SYNC;
50 	else if (strcasecmp(val, "Uapsd") == 0)
51 		s->profile = SIGMA_PROFILE_UAPSD;
52 	else {
53 		send_resp(dut, conn, SIGMA_INVALID, "errorCode,Unsupported "
54 			  "profile");
55 		return STATUS_SENT;
56 	}
57 
58 	val = get_param(cmd, "direction");
59 	if (!val)
60 		return INVALID_SEND_STATUS;
61 	if (strcasecmp(val, "send") == 0)
62 		s->sender = 1;
63 	else if (strcasecmp(val, "receive") == 0)
64 		s->sender = 0;
65 	else
66 		return INVALID_SEND_STATUS;
67 
68 	val = get_param(cmd, "destination");
69 	if (val) {
70 		if (!is_ipv6_addr(val)) {
71 			if (inet_aton(val, &s->dst) == 0)
72 				return INVALID_SEND_STATUS;
73 		} else {
74 			if (inet_pton(AF_INET6, val, &s->dst) != 1)
75 				return INVALID_SEND_STATUS;
76 		}
77 	}
78 
79 	val = get_param(cmd, "source");
80 	if (val) {
81 		if (!is_ipv6_addr(val)) {
82 			if (inet_aton(val, &s->src) == 0)
83 				return INVALID_SEND_STATUS;
84 		} else {
85 			if (inet_pton(AF_INET6, val, &s->src) != 1)
86 				return INVALID_SEND_STATUS;
87 		}
88 	}
89 
90 	val = get_param(cmd, "destinationPort");
91 	if (val)
92 		s->dst_port = atoi(val);
93 
94 	val = get_param(cmd, "sourcePort");
95 	if (val)
96 		s->src_port = atoi(val);
97 
98 	val = get_param(cmd, "frameRate");
99 	if (val)
100 		s->frame_rate = atoi(val);
101 
102 	val = get_param(cmd, "duration");
103 	if (val)
104 		s->duration = atoi(val);
105 
106 	val = get_param(cmd, "payloadSize");
107 	if (val)
108 		s->payload_size = atoi(val);
109 
110 	val = get_param(cmd, "startDelay");
111 	if (val)
112 		s->start_delay = atoi(val);
113 
114 	val = get_param(cmd, "maxCnt");
115 	if (val)
116 		s->max_cnt = atoi(val);
117 
118 	val = get_param(cmd, "trafficClass");
119 	if (val) {
120 		if (strcasecmp(val, "Voice") == 0)
121 			s->tc = SIGMA_TC_VOICE;
122 		else if (strcasecmp(val, "Video") == 0)
123 			s->tc = SIGMA_TC_VIDEO;
124 		else if (strcasecmp(val, "Background") == 0)
125 			s->tc = SIGMA_TC_BACKGROUND;
126 		else if (strcasecmp(val, "BestEffort") == 0)
127 			s->tc = SIGMA_TC_BEST_EFFORT;
128 		else
129 			return INVALID_SEND_STATUS;
130 	}
131 
132 	val = get_param(cmd, "userpriority");
133 	if (val) {
134 		s->user_priority_set = 1;
135 		s->user_priority = atoi(val);
136 	}
137 
138 	val = get_param(cmd, "tagName");
139 	if (val) {
140 		strlcpy(s->test_name, val, sizeof(s->test_name));
141 		sigma_dut_print(dut, DUT_MSG_DEBUG,
142 				"Traffic agent: U-APSD console tagname %s",
143 				s->test_name);
144 	}
145 
146 	if (dut->throughput_pktsize && s->frame_rate == 0 && s->sender &&
147 	    dut->throughput_pktsize != s->payload_size &&
148 	    (s->profile == SIGMA_PROFILE_FILE_TRANSFER ||
149 	     s->profile == SIGMA_PROFILE_IPTV ||
150 	     s->profile == SIGMA_PROFILE_UAPSD)) {
151 		sigma_dut_print(dut, DUT_MSG_INFO,
152 				"Traffic agent: Override throughput test payload size %u -> %u",
153 				s->payload_size, dut->throughput_pktsize);
154 		s->payload_size = dut->throughput_pktsize;
155 	}
156 
157 	val = get_param(cmd, "transProtoType");
158 	if (val) {
159 		if (strcmp(val, "1") == 0)
160 			s->trans_proto = IPPROTO_TCP;
161 		else if (strcmp(val, "0") == 0)
162 			s->trans_proto = IPPROTO_UDP;
163 		else
164 			return INVALID_SEND_STATUS;
165 	} else {
166 		s->trans_proto = IPPROTO_UDP;
167 	}
168 
169 	if (s->profile == SIGMA_PROFILE_IPTV && !s->sender && !s->no_timestamps)
170 	{
171 		s->stats = calloc(MAX_SIGMA_STATS,
172 				  sizeof(struct sigma_frame_stats));
173 		if (s->stats == NULL)
174 			return ERROR_SEND_STATUS;
175 	}
176 
177 	dut->stream_id++;
178 	dut->num_streams++;
179 
180 	s->stream_id = dut->stream_id;
181 	snprintf(buf, sizeof(buf), "streamID,%d", s->stream_id);
182 	send_resp(dut, conn, SIGMA_COMPLETE, buf);
183 	return STATUS_SENT;
184 }
185 
186 
stop_stream(struct sigma_stream * s)187 static void stop_stream(struct sigma_stream *s)
188 {
189 	if (s && s->started) {
190 		pthread_join(s->thr, NULL);
191 		if (s->sock != -1) {
192 			close(s->sock);
193 			s->sock = -1;
194 		}
195 
196 		s->started = 0;
197 	}
198 }
199 
200 
cmd_traffic_agent_reset(struct sigma_dut * dut,struct sigma_conn * conn,struct sigma_cmd * cmd)201 static enum sigma_cmd_result cmd_traffic_agent_reset(struct sigma_dut *dut,
202 						     struct sigma_conn *conn,
203 						     struct sigma_cmd *cmd)
204 {
205 	int i;
206 	for (i = 0; i < dut->num_streams; i++) {
207 		struct sigma_stream *s = &dut->streams[i];
208 		s->stop = 1;
209 		stop_stream(s);
210 	}
211 	dut->num_streams = 0;
212 	memset(&dut->streams, 0, sizeof(dut->streams));
213 	return SUCCESS_SEND_STATUS;
214 }
215 
216 
get_stream_id(const char * str,int streams[MAX_SIGMA_STREAMS])217 static int get_stream_id(const char *str, int streams[MAX_SIGMA_STREAMS])
218 {
219 	int count;
220 
221 	count = 0;
222 	for (;;) {
223 		if (count == MAX_SIGMA_STREAMS)
224 			return -1;
225 		streams[count] = atoi(str);
226 		if (streams[count] == 0)
227 			return -1;
228 		count++;
229 		str = strchr(str, ' ');
230 		if (str == NULL)
231 			break;
232 		while (*str == ' ')
233 			str++;
234 	}
235 
236 	return count;
237 }
238 
239 
open_socket_file_transfer(struct sigma_dut * dut,struct sigma_stream * s)240 static int open_socket_file_transfer(struct sigma_dut *dut,
241 				     struct sigma_stream *s)
242 {
243 	struct sockaddr_in addr;
244 	int sock_opt_val = 1;
245 
246 	s->sock = socket(PF_INET, IPPROTO_UDP == s->trans_proto ? SOCK_DGRAM :
247 			 SOCK_STREAM, s->trans_proto);
248 	if (s->sock < 0) {
249 		perror("socket");
250 		return -1;
251 	}
252 
253 	if (setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR, &sock_opt_val,
254 		       sizeof(sock_opt_val)) < 0) {
255 		perror("setsockopt");
256 		close(s->sock);
257 		s->sock = -1;
258 		return -1;
259 	}
260 
261 	memset(&addr, 0, sizeof(addr));
262 	addr.sin_family = AF_INET;
263 	addr.sin_port = htons(s->sender ? s->src_port : s->dst_port);
264 	sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: sender=%d "
265 			"bind port %d", s->sender, ntohs(addr.sin_port));
266 	if (bind(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
267 		perror("bind");
268 		close(s->sock);
269 		s->sock = -1;
270 		return -1;
271 	}
272 
273 	if (s->profile == SIGMA_PROFILE_MULTICAST && !s->sender)
274 		return 0;
275 
276 	if (s->trans_proto == IPPROTO_TCP && s->sender == 0) {
277 		if (listen(s->sock, TG_MAX_CLIENTS_CONNECTIONS ) < 0) {
278 			sigma_dut_print(dut, DUT_MSG_INFO,
279 					"Listen failed with error %d: %s",
280 					errno, strerror(errno));
281 			close(s->sock);
282 			s->sock = -1;
283 			return -1;
284 		}
285 	} else {
286 		memset(&addr, 0, sizeof(addr));
287 		addr.sin_family = AF_INET;
288 		addr.sin_addr.s_addr = s->sender ? s->dst.s_addr :
289 			s->src.s_addr;
290 		addr.sin_port = htons(s->sender ? s->dst_port : s->src_port);
291 		sigma_dut_print(dut, DUT_MSG_DEBUG,
292 				"Traffic agent: connect %s:%d",
293 				inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
294 		if (connect(s->sock, (struct sockaddr *) &addr, sizeof(addr)) <
295 		    0) {
296 			perror("connect");
297 			close(s->sock);
298 			s->sock = -1;
299 			return -1;
300 		}
301 	}
302 
303 	return 0;
304 }
305 
306 
open_socket_multicast(struct sigma_dut * dut,struct sigma_stream * s)307 static int open_socket_multicast(struct sigma_dut *dut, struct sigma_stream *s)
308 {
309 	if (open_socket_file_transfer(dut, s) < 0)
310 		return -1;
311 
312 	if (!s->sender) {
313 		struct ip_mreq mr;
314 		memset(&mr, 0, sizeof(mr));
315 		mr.imr_multiaddr.s_addr = s->dst.s_addr;
316 		mr.imr_interface.s_addr = htonl(INADDR_ANY);
317 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: "
318 				"IP_ADD_MEMBERSHIP %s", inet_ntoa(s->dst));
319 		if (setsockopt(s->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
320 			       (void *) &mr, sizeof(mr)) < 0) {
321 			sigma_dut_print(dut, DUT_MSG_INFO,
322 					"setsockopt[IP_ADD_MEMBERSHIP]: %s",
323 					strerror(errno));
324 			/*
325 			 * Continue anyway since this can happen, e.g., if the
326 			 * default route is missing. This is not critical for
327 			 * multicast RX testing.
328 			 */
329 		}
330 	}
331 
332 	return 0;
333 }
334 
335 
set_socket_prio(struct sigma_stream * s)336 static int set_socket_prio(struct sigma_stream *s)
337 {
338 	int tos = 0x00;
339 
340 	switch (s->tc) {
341 	case SIGMA_TC_VOICE:
342 		if (s->user_priority_set) {
343 			if (s->user_priority == 6)
344 				tos = 46 << 2;
345 			else if (s->user_priority == 7)
346 				tos = 56 << 2;
347 			else
348 				return -1;
349 		} else
350 			tos = 0xe0; /* DSCP = 56 */
351 		break;
352 	case SIGMA_TC_VIDEO:
353 		if (s->user_priority_set) {
354 			if (s->user_priority == 4)
355 				tos = 32 << 2;
356 			else if (s->user_priority == 5)
357 				tos = 40 << 2;
358 			else
359 				return -1;
360 		} else
361 			tos = 0xa0; /* DSCP = 40 */
362 		break;
363 	case SIGMA_TC_BACKGROUND:
364 		if (s->user_priority_set) {
365 			if (s->user_priority == 1)
366 				tos = 8 << 2;
367 			else if (s->user_priority == 2)
368 				tos = 16 << 2;
369 			else
370 				return -1;
371 		} else
372 			tos = 0x20; /* DSCP = 8 */
373 		break;
374 	case SIGMA_TC_BEST_EFFORT:
375 		if (s->user_priority_set) {
376 			if (s->user_priority == 0)
377 				tos = 0 << 2;
378 			else if (s->user_priority == 3)
379 				tos = 20 << 2;
380 			else
381 				return -1;
382 		} else
383 			tos = 0x00; /* DSCP = 0 */
384 		break;
385 	}
386 
387 	if (setsockopt(s->sock, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)) < 0) {
388 		perror("setsockopt");
389 		return -1;
390 	}
391 
392 	return 0;
393 }
394 
395 
open_socket(struct sigma_dut * dut,struct sigma_stream * s)396 static int open_socket(struct sigma_dut *dut, struct sigma_stream *s)
397 {
398 	switch (s->profile) {
399 	case SIGMA_PROFILE_FILE_TRANSFER:
400 		return open_socket_file_transfer(dut, s);
401 	case SIGMA_PROFILE_MULTICAST:
402 		return open_socket_multicast(dut, s);
403 	case SIGMA_PROFILE_IPTV:
404 		if (open_socket_file_transfer(dut, s) < 0)
405 			return -1;
406 		return set_socket_prio(s);
407 	case SIGMA_PROFILE_TRANSACTION:
408 		return open_socket_file_transfer(dut, s);
409 	case SIGMA_PROFILE_UAPSD:
410 		return open_socket_file_transfer(dut, s);
411 	case SIGMA_PROFILE_START_SYNC:
412 		sigma_dut_print(dut, DUT_MSG_INFO, "Traffic stream profile %d "
413 				"not yet supported", s->profile);
414 		/* TODO */
415 		break;
416 	}
417 
418 	return -1;
419 }
420 
421 
send_file_fast(struct sigma_stream * s,char * pkt)422 static void send_file_fast(struct sigma_stream *s, char *pkt)
423 {
424 	struct timeval stop, now;
425 	int res;
426 	unsigned int counter = 0;
427 
428 	gettimeofday(&stop, NULL);
429 	stop.tv_sec += s->duration;
430 
431 	while (!s->stop) {
432 		counter++;
433 		WPA_PUT_BE32(&pkt[8], counter);
434 
435 		if ((counter & 0xf) == 0) {
436 			gettimeofday(&now, NULL);
437 			if (now.tv_sec > stop.tv_sec ||
438 			    (now.tv_sec == stop.tv_sec &&
439 			     now.tv_usec >= stop.tv_usec))
440 				break;
441 		}
442 
443 		s->tx_act_frames++;
444 		res = send(s->sock, pkt, s->payload_size, MSG_DONTWAIT);
445 		if (res >= 0) {
446 			s->tx_frames++;
447 			s->tx_payload_bytes += res;
448 		} else {
449 			switch (errno) {
450 			case EAGAIN:
451 			case ENOBUFS:
452 				usleep(1000);
453 				break;
454 			case ECONNRESET:
455 			case EPIPE:
456 				s->stop = 1;
457 				break;
458 			default:
459 				perror("send");
460 				break;
461 			}
462 		}
463 	}
464 }
465 
466 
send_file(struct sigma_stream * s)467 static void send_file(struct sigma_stream *s)
468 {
469 	char *pkt;
470 	struct timeval stop, now, start;
471 	int res;
472 	unsigned int counter = 0, total_sleep_usec = 0, total_pkts;
473 	int sleep_usec = 0;
474 
475 	if (s->duration <= 0 || s->frame_rate < 0 || s->payload_size < 20)
476 		return;
477 
478 	pkt = malloc(s->payload_size);
479 	if (pkt == NULL)
480 		return;
481 	memset(pkt, 1, s->payload_size);
482 	strlcpy(pkt, "1345678", s->payload_size);
483 
484 	if (s->frame_rate == 0 && s->no_timestamps) {
485 		send_file_fast(s, pkt);
486 		free(pkt);
487 		return;
488 	}
489 
490 	gettimeofday(&stop, NULL);
491 	stop.tv_sec += s->duration;
492 
493 	total_pkts = s->duration * s ->frame_rate;
494 
495 	gettimeofday(&start, NULL);
496 
497 	while (!s->stop) {
498 		counter++;
499 		WPA_PUT_BE32(&pkt[8], counter);
500 
501 		if (sleep_usec) {
502 			usleep(sleep_usec);
503 			total_sleep_usec += sleep_usec;
504 		}
505 
506 		gettimeofday(&now, NULL);
507 		if (now.tv_sec > stop.tv_sec ||
508 		    (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
509 			break;
510 
511 		if (s->frame_rate && (unsigned int) s->tx_frames >= total_pkts)
512 			break;
513 
514 		if (s->frame_rate == 0 || s->tx_frames == 0)
515 			sleep_usec = 0;
516 		else if (sleep_usec || s->frame_rate < 10 ||
517 			 counter % (s->frame_rate / 10) == 0) {
518 			/* Recalculate sleep_usec for every 100 ms approximately
519 			 */
520 			struct timeval tmp;
521 			int diff, duration;
522 
523 			timersub(&now, &start, &tmp);
524 
525 			diff = tmp.tv_sec * 1000000 + tmp.tv_usec;
526 			duration = (1000000 / s->frame_rate) * s->tx_frames;
527 
528 			if (duration > diff)
529 				sleep_usec = (total_sleep_usec +
530 					      (duration - diff)) / s->tx_frames;
531 			else
532 				sleep_usec = 0;
533 		}
534 
535 		WPA_PUT_BE32(&pkt[12], now.tv_sec);
536 		WPA_PUT_BE32(&pkt[16], now.tv_usec);
537 
538 		s->tx_act_frames++;
539 		res = send(s->sock, pkt, s->payload_size, MSG_DONTWAIT);
540 		if (res >= 0) {
541 			s->tx_frames++;
542 			s->tx_payload_bytes += res;
543 		} else {
544 			switch (errno) {
545 			case EAGAIN:
546 			case ENOBUFS:
547 				usleep(1000);
548 				break;
549 			case ECONNRESET:
550 			case EPIPE:
551 				s->stop = 1;
552 				break;
553 			default:
554 				perror("send");
555 				break;
556 			}
557 		}
558 	}
559 
560 	sigma_dut_print(s->dut, DUT_MSG_DEBUG,
561 			"send_file: counter %u s->tx_frames %d total_sleep_usec %u",
562 			counter, s->tx_frames, total_sleep_usec);
563 
564 	free(pkt);
565 }
566 
567 
send_transaction(struct sigma_stream * s)568 static void send_transaction(struct sigma_stream *s)
569 {
570 	char *pkt, *rpkt;
571 	struct timeval stop, now;
572 	int res;
573 	unsigned int counter = 0, rcounter;
574 	int wait_time;
575 	fd_set rfds;
576 	struct timeval tv;
577 
578 	if (s->duration <= 0 || s->frame_rate <= 0 || s->payload_size < 20)
579 		return;
580 
581 	pkt = malloc(s->payload_size);
582 	if (pkt == NULL)
583 		return;
584 	rpkt = malloc(s->payload_size);
585 	if (rpkt == NULL) {
586 		free(pkt);
587 		return;
588 	}
589 	memset(pkt, 1, s->payload_size);
590 	strlcpy(pkt, "1345678", s->payload_size);
591 
592 	gettimeofday(&stop, NULL);
593 	stop.tv_sec += s->duration;
594 
595 	wait_time = 1000000 / s->frame_rate;
596 
597 	while (!s->stop) {
598 		counter++;
599 		if (s->max_cnt && (int) counter > s->max_cnt)
600 			break;
601 		WPA_PUT_BE32(&pkt[8], counter);
602 
603 		gettimeofday(&now, NULL);
604 		if (now.tv_sec > stop.tv_sec ||
605 		    (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
606 			break;
607 		WPA_PUT_BE32(&pkt[12], now.tv_sec);
608 		WPA_PUT_BE32(&pkt[16], now.tv_usec);
609 
610 		res = send(s->sock, pkt, s->payload_size, MSG_DONTWAIT);
611 		if (res >= 0) {
612 			s->tx_frames++;
613 			s->tx_payload_bytes += res;
614 		} else {
615 			switch (errno) {
616 			case EAGAIN:
617 			case ENOBUFS:
618 				usleep(1000);
619 				break;
620 			case ECONNRESET:
621 			case EPIPE:
622 				s->stop = 1;
623 				break;
624 			default:
625 				perror("send");
626 				break;
627 			}
628 		}
629 
630 		/* Wait for response */
631 		tv.tv_sec = 0;
632 		tv.tv_usec = wait_time;
633 		FD_ZERO(&rfds);
634 		FD_SET(s->sock, &rfds);
635 		res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
636 		if (res < 0) {
637 			if (errno == EINTR)
638 				continue;
639 			perror("select");
640 			break;
641 		}
642 
643 		if (res == 0) {
644 			/* timeout */
645 			continue;
646 		}
647 
648 		if (FD_ISSET(s->sock, &rfds)) {
649 			/* response received */
650 			res = recv(s->sock, rpkt, s->payload_size, 0);
651 			if (res < 0) {
652 				perror("recv");
653 				break;
654 			}
655 			rcounter = WPA_GET_BE32(&rpkt[8]);
656 			if (rcounter != counter)
657 				s->out_of_seq_frames++;
658 			s->rx_frames++;
659 			s->rx_payload_bytes += res;
660 		}
661 	}
662 
663 	free(pkt);
664 	free(rpkt);
665 }
666 
667 
send_thread(void * ctx)668 static void * send_thread(void *ctx)
669 {
670 	struct sigma_stream *s = ctx;
671 
672 	sleep(s->start_delay);
673 
674 	switch (s->profile) {
675 	case SIGMA_PROFILE_FILE_TRANSFER:
676 		send_file(s);
677 		break;
678 	case SIGMA_PROFILE_MULTICAST:
679 		send_file(s);
680 		break;
681 	case SIGMA_PROFILE_IPTV:
682 		send_file(s);
683 		break;
684 	case SIGMA_PROFILE_TRANSACTION:
685 		send_transaction(s);
686 		break;
687 	case SIGMA_PROFILE_START_SYNC:
688 		break;
689 	case SIGMA_PROFILE_UAPSD:
690 		send_uapsd_console(s);
691 		break;
692 	}
693 
694 	return NULL;
695 }
696 
697 
698 struct traffic_agent_send_data {
699 	struct sigma_dut *dut;
700 	struct sigma_conn *conn;
701 	int streams[MAX_SIGMA_STREAMS];
702 	int count;
703 };
704 
705 
get_stream(struct sigma_dut * dut,int id)706 static struct sigma_stream * get_stream(struct sigma_dut *dut, int id)
707 {
708 	int i;
709 
710 	for (i = 0; i < dut->num_streams; i++) {
711 		if ((unsigned int) id == dut->streams[i].stream_id)
712 			return &dut->streams[i];
713 	}
714 
715 	return NULL;
716 }
717 
718 
send_report_thread(void * ctx)719 static void * send_report_thread(void *ctx)
720 {
721 	struct traffic_agent_send_data *data = ctx;
722 	struct sigma_dut *dut = data->dut;
723 	struct sigma_conn *conn = data->conn;
724 	int i, ret;
725 	char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
726 
727 	for (i = 0; i < data->count; i++) {
728 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: waiting "
729 				"for stream %d send to complete",
730 				data->streams[i]);
731 		stop_stream(get_stream(dut, data->streams[i]));
732 	}
733 
734 	buf[0] = '\0';
735 	pos = buf;
736 
737 	pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
738 	for (i = 0; i < data->count; i++) {
739 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
740 			       i > 0 ? " " : "", data->streams[i]);
741 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
742 			break;
743 		pos += ret;
744 	}
745 
746 	if (dut->program == PROGRAM_60GHZ) {
747 		sigma_dut_print(dut, DUT_MSG_INFO, "reporting tx_act_frames");
748 		pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
749 		for (i = 0; i < data->count; i++) {
750 			struct sigma_stream *s;
751 
752 			s = get_stream(dut, data->streams[i]);
753 			if (!s)
754 				continue;
755 			ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
756 				       i > 0 ? " " : "", s->tx_act_frames);
757 			if (ret < 0 || ret >= buf + sizeof(buf) - pos)
758 				break;
759 			pos += ret;
760 		}
761 	}
762 
763 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
764 	for (i = 0; i < data->count; i++) {
765 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
766 
767 		if (!s)
768 			continue;
769 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
770 			       i > 0 ? " " : "", s->tx_frames);
771 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
772 			break;
773 		pos += ret;
774 	}
775 
776 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
777 	for (i = 0; i < data->count; i++) {
778 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
779 
780 		if (!s)
781 			continue;
782 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
783 			       i > 0 ? " " : "", s->rx_frames);
784 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
785 			break;
786 		pos += ret;
787 	}
788 
789 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
790 	for (i = 0; i < data->count; i++) {
791 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
792 
793 		if (!s)
794 			continue;
795 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
796 			       i > 0 ? " " : "", s->tx_payload_bytes);
797 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
798 			break;
799 		pos += ret;
800 	}
801 
802 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
803 	for (i = 0; i < data->count; i++) {
804 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
805 
806 		if (!s)
807 			continue;
808 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
809 			       i > 0 ? " " : "", s->rx_payload_bytes);
810 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
811 			break;
812 		pos += ret;
813 	}
814 
815 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
816 	for (i = 0; i < data->count; i++) {
817 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
818 
819 		if (!s)
820 			continue;
821 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
822 			       i > 0 ? " " : "", s->out_of_seq_frames);
823 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
824 			break;
825 		pos += ret;
826 	}
827 
828 	for (i = 0; i < data->count; i++) {
829 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
830 		if (!s)
831 			continue;
832 		s->ta_send_in_progress = 0;
833 		if (s->trans_proto == IPPROTO_TCP) {
834 			/*
835 			 * Close the socket to make sure client side close the
836 			 * network before the server. Otherwise, the server
837 			 * might get "Address already in use" when trying to
838 			 * reuse the port.
839 			 */
840 			close(s->sock);
841 			s->sock = -1;
842 			sigma_dut_print(dut, DUT_MSG_DEBUG,
843 					"Closed the sender socket");
844 		}
845 	}
846 
847 	buf[sizeof(buf) - 1] = '\0';
848 
849 	if (conn->s < 0)
850 		sigma_dut_print(dut, DUT_MSG_INFO, "Cannot send traffic_agent response since control socket has already been closed");
851 	else
852 		send_resp(dut, conn, SIGMA_COMPLETE, buf);
853 	conn->waiting_completion = 0;
854 
855 	free(data);
856 
857 	return NULL;
858 }
859 
860 
cmd_traffic_agent_send(struct sigma_dut * dut,struct sigma_conn * conn,struct sigma_cmd * cmd)861 static enum sigma_cmd_result cmd_traffic_agent_send(struct sigma_dut *dut,
862 						    struct sigma_conn *conn,
863 						    struct sigma_cmd *cmd)
864 {
865 	const char *val;
866 	int i, j, res;
867 	char buf[100];
868 	struct traffic_agent_send_data *data;
869 
870 	val = get_param(cmd, "streamID");
871 	if (val == NULL)
872 		return INVALID_SEND_STATUS;
873 
874 	data = calloc(1, sizeof(*data));
875 	if (data == NULL)
876 		return ERROR_SEND_STATUS;
877 	data->dut = dut;
878 	data->conn = conn;
879 
880 	data->count = get_stream_id(val, data->streams);
881 	if (data->count < 0) {
882 		free(data);
883 		return ERROR_SEND_STATUS;
884 	}
885 	for (i = 0; i < data->count; i++) {
886 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
887 
888 		if (!s) {
889 			snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
890 				 "not configured", data->streams[i]);
891 			send_resp(dut, conn, SIGMA_INVALID, buf);
892 			free(data);
893 			return STATUS_SENT;
894 		}
895 		for (j = 0; j < i; j++)
896 			if (data->streams[i] == data->streams[j]) {
897 				free(data);
898 				return ERROR_SEND_STATUS;
899 			}
900 		if (!s->sender) {
901 			snprintf(buf, sizeof(buf), "errorCode,Not configured "
902 				 "as sender for streamID %d", data->streams[i]);
903 			send_resp(dut, conn, SIGMA_INVALID, buf);
904 			free(data);
905 			return STATUS_SENT;
906 		}
907 		if (s->ta_send_in_progress) {
908 			send_resp(dut, conn, SIGMA_ERROR,
909 				  "errorCode,Multiple concurrent send cmds on same streamID not supported");
910 			free(data);
911 			return STATUS_SENT;
912 		}
913 	}
914 
915 	for (i = 0; i < data->count; i++) {
916 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
917 
918 		if (!s)
919 			continue;
920 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
921 				"socket for send stream %d", data->streams[i]);
922 		if (open_socket(dut, s) < 0) {
923 			free(data);
924 			return ERROR_SEND_STATUS;
925 		}
926 	}
927 
928 	for (i = 0; i < data->count; i++) {
929 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
930 
931 		if (!s)
932 			continue;
933 
934 		/*
935 		 * Provide dut context to the thread to support debugging and
936 		 * returning of error messages.
937 		 */
938 		s->dut = dut;
939 
940 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
941 				"send for stream %d", data->streams[i]);
942 		res = pthread_create(&s->thr, NULL, send_thread, s);
943 		if (res) {
944 			sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
945 					"failed: %d", res);
946 			free(data);
947 			return ERROR_SEND_STATUS;
948 		}
949 		s->started = 1;
950 	}
951 
952 	sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start a thread to track sending streams");
953 	conn->waiting_completion = 1;
954 	res = pthread_create(&dut->thr, NULL, send_report_thread, data);
955 	if (res) {
956 		sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create failed: %d",
957 				res);
958 		free(data);
959 		conn->waiting_completion = 0;
960 		return ERROR_SEND_STATUS;
961 	}
962 
963 	for (i = 0; i < data->count; i++) {
964 		struct sigma_stream *s = get_stream(dut, data->streams[i]);
965 
966 		if (s)
967 			s->ta_send_in_progress = 1;
968 	}
969 
970 	/* Command will be completed in send_report_thread() */
971 
972 	return STATUS_SENT;
973 }
974 
975 
receive_file(struct sigma_stream * s)976 static void receive_file(struct sigma_stream *s)
977 {
978 	struct timeval tv, now;
979 	fd_set rfds;
980 	int res;
981 	char *pkt;
982 	int pktlen;
983 	unsigned int last_rx = 0, counter;
984 
985 	pktlen = 65536 + 1;
986 	pkt = malloc(pktlen);
987 	if (pkt == NULL)
988 		return;
989 
990 	while (!s->stop) {
991 		FD_ZERO(&rfds);
992 		FD_SET(s->sock, &rfds);
993 		tv.tv_sec = 0;
994 		tv.tv_usec = 300000;
995 		res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
996 		if (res < 0) {
997 			perror("select");
998 			usleep(10000);
999 		} else if (FD_ISSET(s->sock, &rfds)) {
1000 			res = recv(s->sock, pkt, pktlen, 0);
1001 			if (res >= 0) {
1002 				s->rx_frames++;
1003 				s->rx_payload_bytes += res;
1004 
1005 				counter = WPA_GET_BE32(&pkt[8]);
1006 				if (counter < last_rx)
1007 					s->out_of_seq_frames++;
1008 				last_rx = counter;
1009 			} else {
1010 				perror("recv");
1011 				break;
1012 			}
1013 
1014 			if (res >= 20 && s->stats &&
1015 			    s->num_stats < MAX_SIGMA_STATS) {
1016 				struct sigma_frame_stats *stats;
1017 				stats = &s->stats[s->num_stats];
1018 				s->num_stats++;
1019 				gettimeofday(&now, NULL);
1020 				stats->seqnum = counter;
1021 				stats->local_sec = now.tv_sec;
1022 				stats->local_usec = now.tv_usec;
1023 				stats->remote_sec = WPA_GET_BE32(&pkt[12]);
1024 				stats->remote_usec = WPA_GET_BE32(&pkt[16]);
1025 			}
1026 		}
1027 	}
1028 
1029 	free(pkt);
1030 }
1031 
1032 
receive_transaction(struct sigma_stream * s)1033 static void receive_transaction(struct sigma_stream *s)
1034 {
1035 	struct timeval tv;
1036 	fd_set rfds;
1037 	int res;
1038 	char *pkt;
1039 	int pktlen;
1040 	unsigned int last_rx = 0, counter;
1041 	struct sockaddr_in addr;
1042 	socklen_t addrlen;
1043 
1044 	if (s->payload_size)
1045 		pktlen = s->payload_size;
1046 	else
1047 		pktlen = 65536 + 1;
1048 	pkt = malloc(pktlen);
1049 	if (pkt == NULL)
1050 		return;
1051 
1052 	while (!s->stop) {
1053 		FD_ZERO(&rfds);
1054 		FD_SET(s->sock, &rfds);
1055 		tv.tv_sec = 0;
1056 		tv.tv_usec = 300000;
1057 		res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
1058 		if (res < 0) {
1059 			perror("select");
1060 			usleep(10000);
1061 		} else if (FD_ISSET(s->sock, &rfds)) {
1062 			addrlen = sizeof(addr);
1063 			res = recvfrom(s->sock, pkt, pktlen, 0,
1064 				       (struct sockaddr *) &addr, &addrlen);
1065 			if (res < 0) {
1066 				perror("recv");
1067 				break;
1068 			}
1069 
1070 			s->rx_frames++;
1071 			s->rx_payload_bytes += res;
1072 
1073 			counter = WPA_GET_BE32(&pkt[8]);
1074 			if (counter < last_rx)
1075 				s->out_of_seq_frames++;
1076 			last_rx = counter;
1077 
1078 			/* send response */
1079 			res = sendto(s->sock, pkt, pktlen, 0,
1080 				     (struct sockaddr *) &addr, addrlen);
1081 			if (res < 0) {
1082 				perror("sendto");
1083 			} else {
1084 				s->tx_frames++;
1085 				s->tx_payload_bytes += res;
1086 			}
1087 		}
1088 	}
1089 
1090 	free(pkt);
1091 }
1092 
1093 
receive_thread(void * ctx)1094 static void * receive_thread(void *ctx)
1095 {
1096 	struct sigma_stream *s = ctx;
1097 
1098 	if (s->trans_proto == IPPROTO_TCP) {
1099 		/* Wait for socket to be accepted */
1100 		struct sockaddr_in connected_addr;
1101 		int connected_sock; /* returned from accept on sock */
1102 		socklen_t connected_addr_len = sizeof(connected_addr);
1103 
1104 		sigma_dut_print(s->dut, DUT_MSG_DEBUG,
1105 				"Traffic agent: Waiting on accept");
1106 		connected_sock = accept(s->sock,
1107 					(struct sockaddr *) &connected_addr,
1108 					&connected_addr_len);
1109 		if (connected_sock < 0) {
1110 			sigma_dut_print(s->dut, DUT_MSG_ERROR,
1111 					"Traffic agent: Failed to accept: %s",
1112 					strerror(errno));
1113 			return NULL;
1114 		}
1115 
1116 		sigma_dut_print(s->dut, DUT_MSG_DEBUG,
1117 				"Traffic agent: Accepted client closing parent socket and talk over connected sock.");
1118 		close(s->sock);
1119 		s->sock = connected_sock;
1120 	}
1121 
1122 	switch (s->profile) {
1123 	case SIGMA_PROFILE_FILE_TRANSFER:
1124 		receive_file(s);
1125 		break;
1126 	case SIGMA_PROFILE_MULTICAST:
1127 		receive_file(s);
1128 		break;
1129 	case SIGMA_PROFILE_IPTV:
1130 		receive_file(s);
1131 		break;
1132 	case SIGMA_PROFILE_TRANSACTION:
1133 		receive_transaction(s);
1134 		break;
1135 	case SIGMA_PROFILE_START_SYNC:
1136 		break;
1137 	case SIGMA_PROFILE_UAPSD:
1138 		receive_uapsd(s);
1139 		break;
1140 	}
1141 
1142 	return NULL;
1143 }
1144 
1145 
1146 static enum sigma_cmd_result
cmd_traffic_agent_receive_start(struct sigma_dut * dut,struct sigma_conn * conn,struct sigma_cmd * cmd)1147 cmd_traffic_agent_receive_start(struct sigma_dut *dut, struct sigma_conn *conn,
1148 				struct sigma_cmd *cmd)
1149 {
1150 	const char *val;
1151 	int streams[MAX_SIGMA_STREAMS];
1152 	int i, j, count;
1153 	char buf[100];
1154 
1155 	val = get_param(cmd, "streamID");
1156 	if (val == NULL)
1157 		return INVALID_SEND_STATUS;
1158 	count = get_stream_id(val, streams);
1159 	if (count < 0)
1160 		return ERROR_SEND_STATUS;
1161 	for (i = 0; i < count; i++) {
1162 		struct sigma_stream *s = get_stream(dut, streams[i]);
1163 
1164 		if (!s) {
1165 			snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
1166 				 "not configured", streams[i]);
1167 			send_resp(dut, conn, SIGMA_INVALID, buf);
1168 			return STATUS_SENT;
1169 		}
1170 		for (j = 0; j < i; j++)
1171 			if (streams[i] == streams[j])
1172 				return ERROR_SEND_STATUS;
1173 		if (s->sender) {
1174 			snprintf(buf, sizeof(buf), "errorCode,Not configured "
1175 				 "as receiver for streamID %d", streams[i]);
1176 			send_resp(dut, conn, SIGMA_INVALID, buf);
1177 			return STATUS_SENT;
1178 		}
1179 	}
1180 
1181 	for (i = 0; i < count; i++) {
1182 		struct sigma_stream *s = get_stream(dut, streams[i]);
1183 
1184 		if (!s)
1185 			continue;
1186 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
1187 				"receive socket for stream %d", streams[i]);
1188 		if (open_socket(dut, s) < 0)
1189 			return ERROR_SEND_STATUS;
1190 	}
1191 
1192 	for (i = 0; i < count; i++) {
1193 		struct sigma_stream *s = get_stream(dut, streams[i]);
1194 		int res;
1195 
1196 		if (!s)
1197 			continue;
1198 		/*
1199 		 * Provide dut context to the thread to support debugging and
1200 		 * returning of error messages. Similarly, provide interface
1201 		 * information to the thread. If the Interface parameter is not
1202 		 * passed, get it from get_station_ifname() since the interface
1203 		 * name is needed for power save mode configuration for Uapsd
1204 		 * cases.
1205 		 */
1206 		s->dut = dut;
1207 		val = get_param(cmd, "Interface");
1208 		strlcpy(s->ifname, (val ? val : get_station_ifname(dut)),
1209 			sizeof(s->ifname));
1210 
1211 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
1212 				"receive for stream %d", streams[i]);
1213 		res = pthread_create(&s->thr, NULL, receive_thread, s);
1214 		if (res) {
1215 			sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
1216 					"failed: %d", res);
1217 			return ERROR_SEND_STATUS;
1218 		}
1219 		s->started = 1;
1220 	}
1221 
1222 	return SUCCESS_SEND_STATUS;
1223 }
1224 
1225 
write_frame_stats(struct sigma_dut * dut,struct sigma_stream * s,int id)1226 static void write_frame_stats(struct sigma_dut *dut, struct sigma_stream *s,
1227 			      int id)
1228 {
1229 	char fname[128];
1230 	FILE *f;
1231 	unsigned int i;
1232 
1233 	snprintf(fname, sizeof(fname), "%s/e2e%u-%d.txt",
1234 		 dut->sigma_tmpdir, (unsigned int) time(NULL), id);
1235 	f = fopen(fname, "w");
1236 	if (f == NULL) {
1237 		sigma_dut_print(dut, DUT_MSG_INFO, "Could not write %s",
1238 				fname);
1239 		return;
1240 	}
1241 	fprintf(f, "seqnum:local_sec:local_usec:remote_sec:remote_usec\n");
1242 
1243 	sigma_dut_print(dut, DUT_MSG_DEBUG, "Writing frame stats to %s",
1244 			fname);
1245 
1246 	for (i = 0; i < s->num_stats; i++) {
1247 		struct sigma_frame_stats *stats = &s->stats[i];
1248 		fprintf(f, "%u:%u:%u:%u:%u\n", stats->seqnum,
1249 			stats->local_sec, stats->local_usec,
1250 			stats->remote_sec, stats->remote_usec);
1251 	}
1252 
1253 	fclose(f);
1254 }
1255 
1256 
1257 static enum sigma_cmd_result
cmd_traffic_agent_receive_stop(struct sigma_dut * dut,struct sigma_conn * conn,struct sigma_cmd * cmd)1258 cmd_traffic_agent_receive_stop(struct sigma_dut *dut, struct sigma_conn *conn,
1259 			       struct sigma_cmd *cmd)
1260 {
1261 	const char *val;
1262 	int streams[MAX_SIGMA_STREAMS];
1263 	int i, j, ret, count;
1264 	char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
1265 
1266 	val = get_param(cmd, "streamID");
1267 	if (val == NULL)
1268 		return INVALID_SEND_STATUS;
1269 	count = get_stream_id(val, streams);
1270 	if (count < 0)
1271 		return ERROR_SEND_STATUS;
1272 	for (i = 0; i < count; i++) {
1273 		struct sigma_stream *s = get_stream(dut, streams[i]);
1274 
1275 		if (!s) {
1276 			snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
1277 				 "not configured", streams[i]);
1278 			send_resp(dut, conn, SIGMA_INVALID, buf);
1279 			return STATUS_SENT;
1280 		}
1281 		for (j = 0; j < i; j++)
1282 			if (streams[i] == streams[j])
1283 				return ERROR_SEND_STATUS;
1284 		if (!s->started) {
1285 			snprintf(buf, sizeof(buf), "errorCode,Receive not "
1286 				 "started for streamID %d", streams[i]);
1287 			send_resp(dut, conn, SIGMA_INVALID, buf);
1288 			return STATUS_SENT;
1289 		}
1290 	}
1291 
1292 	for (i = 0; i < count; i++) {
1293 		struct sigma_stream *s = get_stream(dut, streams[i]);
1294 
1295 		if (s)
1296 			s->stop = 1;
1297 	}
1298 
1299 	for (i = 0; i < count; i++) {
1300 		struct sigma_stream *s = get_stream(dut, streams[i]);
1301 
1302 		if (!s)
1303 			continue;
1304 		sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: stop "
1305 				"receive for stream %d", streams[i]);
1306 		stop_stream(s);
1307 	}
1308 
1309 	buf[0] = '\0';
1310 	pos = buf;
1311 
1312 	pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
1313 	for (i = 0; i < count; i++) {
1314 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1315 			       i > 0 ? " " : "", streams[i]);
1316 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1317 			break;
1318 		pos += ret;
1319 	}
1320 
1321 	if (dut->program == PROGRAM_60GHZ) {
1322 		pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
1323 		for (i = 0; i < count; i++) {
1324 			struct sigma_stream *s = get_stream(dut, streams[i]);
1325 
1326 			if (!s)
1327 				continue;
1328 			ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1329 				       i > 0 ? " " : "", s->tx_act_frames);
1330 			if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1331 				break;
1332 			pos += ret;
1333 		}
1334 	}
1335 
1336 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
1337 	for (i = 0; i < count; i++) {
1338 		struct sigma_stream *s = get_stream(dut, streams[i]);
1339 
1340 		if (!s)
1341 			continue;
1342 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1343 			       i > 0 ? " " : "", s->tx_frames);
1344 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1345 			break;
1346 		pos += ret;
1347 	}
1348 
1349 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
1350 	for (i = 0; i < count; i++) {
1351 		struct sigma_stream *s = get_stream(dut, streams[i]);
1352 
1353 		if (!s)
1354 			continue;
1355 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1356 			       i > 0 ? " " : "", s->rx_frames);
1357 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1358 			break;
1359 		pos += ret;
1360 	}
1361 
1362 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
1363 	for (i = 0; i < count; i++) {
1364 		struct sigma_stream *s = get_stream(dut, streams[i]);
1365 
1366 		if (!s)
1367 			continue;
1368 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
1369 			       i > 0 ? " " : "", s->tx_payload_bytes);
1370 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1371 			break;
1372 		pos += ret;
1373 	}
1374 
1375 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
1376 	for (i = 0; i < count; i++) {
1377 		struct sigma_stream *s = get_stream(dut, streams[i]);
1378 
1379 		if (!s)
1380 			continue;
1381 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
1382 			       i > 0 ? " " : "", s->rx_payload_bytes);
1383 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1384 			break;
1385 		pos += ret;
1386 	}
1387 
1388 	pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
1389 	for (i = 0; i < count; i++) {
1390 		struct sigma_stream *s = get_stream(dut, streams[i]);
1391 
1392 		if (!s)
1393 			continue;
1394 		ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1395 			       i > 0 ? " " : "", s->out_of_seq_frames);
1396 		if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1397 			break;
1398 		pos += ret;
1399 	}
1400 
1401 	buf[sizeof(buf) - 1] = '\0';
1402 
1403 	send_resp(dut, conn, SIGMA_COMPLETE, buf);
1404 
1405 	for (i = 0; i < count; i++) {
1406 		struct sigma_stream *s = get_stream(dut, streams[i]);
1407 
1408 		if (!s)
1409 			continue;
1410 		if (s->profile == SIGMA_PROFILE_IPTV && s->num_stats > 0 &&
1411 		    dut->write_stats)
1412 			write_frame_stats(dut, s, streams[i]);
1413 		free(s->stats);
1414 		s->stats = NULL;
1415 		s->num_stats = 0;
1416 	}
1417 
1418 	return STATUS_SENT;
1419 }
1420 
1421 
cmd_traffic_agent_version(struct sigma_dut * dut,struct sigma_conn * conn,struct sigma_cmd * cmd)1422 static enum sigma_cmd_result cmd_traffic_agent_version(struct sigma_dut *dut,
1423 						       struct sigma_conn *conn,
1424 						       struct sigma_cmd *cmd)
1425 {
1426 	send_resp(dut, conn, SIGMA_COMPLETE, "version,1.0");
1427 	return STATUS_SENT;
1428 }
1429 
1430 
traffic_agent_register_cmds(void)1431 void traffic_agent_register_cmds(void)
1432 {
1433 	sigma_dut_reg_cmd("traffic_agent_config", NULL,
1434 			  cmd_traffic_agent_config);
1435 	sigma_dut_reg_cmd("traffic_agent_reset", NULL,
1436 			  cmd_traffic_agent_reset);
1437 	sigma_dut_reg_cmd("traffic_agent_send", NULL,
1438 			  cmd_traffic_agent_send);
1439 	sigma_dut_reg_cmd("traffic_agent_receive_start", NULL,
1440 			  cmd_traffic_agent_receive_start);
1441 	sigma_dut_reg_cmd("traffic_agent_receive_stop", NULL,
1442 			  cmd_traffic_agent_receive_stop);
1443 	sigma_dut_reg_cmd("traffic_agent_version", NULL,
1444 			  cmd_traffic_agent_version);
1445 }
1446