-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
227 lines (211 loc) · 7.66 KB
/
cli.py
File metadata and controls
227 lines (211 loc) · 7.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import argparse
import os
import sys
import yaml
from datetime import datetime
from runner import launch_test
from device import get_available_vendors, load_vendor_class
from tee import Tee
from utils.file_tree import print_directory_tree
from tests.yp_tests import YPTest, YPTestSequence
from utils.check_required_executables import check_executables
def main():
check_executables([
"ssh",
"sshpass",
"pyang",
"yanglint",
"netgauze-pcap-decoder",
])
parser = argparse.ArgumentParser(
prog="yp_test",
description="YANG Push Test Suite (modular, multivendor version)",
epilog="enjoy",
)
available_vendors = get_available_vendors()
parser.add_argument(
"--vendor",
required=True,
choices=available_vendors,
help=f"Vendor name (choices: {', '.join(available_vendors)})",
)
parser.add_argument(
"-c", "--config", type=str, required=True, help="YAML config file"
)
parser.add_argument(
"-u",
"--receiver-username",
type=str,
help="user used for SSH connection (overrides config file)",
)
parser.add_argument(
"-i",
"--receiver-instance-name",
type=str,
help="YANG-Push receiver-instance-name (overrides config file)",
)
parser.add_argument(
"-r",
"--rpc",
type=str,
help="netconf rpc",
)
parser.add_argument("--get_schema_to_file", type=str, help="get YANG schema to file")
parser.add_argument("--backup_subscriptions", type=str, help="backup YANG Push configuration to file")
parser.add_argument("--restore_subscriptions", type=str, help="restore YANG Push configuration from file")
parser.add_argument("--all", action="store_true")
parser.add_argument("--override_result_dir", type=str)
parser.add_argument("--override_timer")
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-vv", "--very_verbose", action="store_true")
test_dict = {test.func.__name__: test for test in YPTest.registered_tests}
sequence_dict = {seq.name: seq for seq in YPTestSequence.registered_sequences}
all_choices = list(test_dict.keys()) + list(sequence_dict.keys())
def test_choice(value):
if value not in all_choices:
raise argparse.ArgumentTypeError(f"Invalid test: {value}")
return value
parser.add_argument(
"--tests",
nargs="+",
type=test_choice,
help="Select one or more tests or test sequences to run in order. \
Use --list-tests to see the choices.",
)
parser.add_argument(
"--list-tests",
action="store_true",
help="List all available tests and sequences, then exit.",
)
args, unknown = parser.parse_known_args()
if args.list_tests:
print("Available tests:")
for name in test_dict.keys():
print(f" {name}")
if sequence_dict:
print("\nAvailable test sequences:")
for name in sequence_dict.keys():
print(f" {name}")
sys.exit(0)
args = parser.parse_args()
with open(args.config) as f:
config = yaml.safe_load(f)
if args.receiver_username:
config["RECEIVER"]["username"] = args.receiver_username
if args.receiver_instance_name:
config["RECEIVER"]["instance_name"] = args.receiver_instance_name
vendors = config["INPUT"].get("vendors", "./vendors/")
vendor_xml_dir = os.path.join(vendors, args.vendor.lower())
vendor = load_vendor_class(args.vendor, vendor_xml_dir, config)
dut = config["DUTS"][args.vendor]
hostname = dut["hostname"]
username = dut["username"]
password = dut["password"]
host = dut["ip"]
if args.override_result_dir:
results_dir = args.override_result_dir
else:
results_dir = f"{config['OUTPUT_DIR']}{hostname}_{args.vendor.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(results_dir, exist_ok=True)
stdout_file = open(os.path.join(results_dir, "stdout.log"), "w")
sys.stdout = Tee(sys.__stdout__, stdout_file)
pcap_file = os.path.join(results_dir, "udp_notif_capture.pcap")
log_file = os.path.join(results_dir, "yp_test.log")
kafka_file = os.path.join(results_dir, "kafka_output.json")
import logging
if args.very_verbose:
logging.basicConfig(
level=logging.DEBUG,
format="%(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_file, delay=True),
],
)
elif args.verbose:
logging.basicConfig(
level=logging.WARNING,
format="%(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_file, delay=True),
],
)
else:
logging.basicConfig(
level=logging.ERROR,
format="%(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_file, delay=True),
],
)
selected_tests = args.tests
selected_individual_tests = []
selected_test_sequences = []
if args.all:
selected_tests = list(test_dict.keys())
elif args.get_schema_to_file or args.backup_subscriptions or args.restore_subscriptions:
selected_tests = []
elif not selected_tests:
print("Available tests:")
test_names = list(test_dict.keys())
for idx, name in enumerate(test_names, 1):
print(f"{idx}: {name}")
# List available sequences with letters
if sequence_dict:
print("\nAvailable test sequences:")
seq_names = list(sequence_dict.keys())
for idx, name in enumerate(seq_names):
print(f"{chr(ord('A') + idx)}: {name}")
print(
"\nEnter test numbers (for tests) and/or letters (for sequences) \
separated by spaces (e.g. 1 3 B):"
)
choices = input("> ").strip().split()
# Validate input and build the list
selected_tests = []
for c in choices:
if c.isdigit():
idx = int(c) - 1
if 0 <= idx < len(test_names):
selected_tests.append(test_names[idx])
selected_individual_tests.append(test_names[idx])
else:
print(f"Invalid selection: {c}")
elif c.isalpha():
idx = ord(c.upper()) - ord("A")
seq_names = list(sequence_dict.keys())
if 0 <= idx < len(seq_names):
# Expand sequence into its test names
selected_tests.extend(sequence_dict[seq_names[idx]].test_names)
selected_test_sequences.append(seq_names[idx])
else:
print(f"Invalid sequence selection: {c}")
else:
print(f"Invalid input: {c}")
final_selected_tests = []
for name in selected_tests:
if name in sequence_dict:
final_selected_tests.extend(sequence_dict[name].test_names)
else:
final_selected_tests.append(name)
launch_test(
host,
username,
password,
args,
vendor,
config,
test_dict,
final_selected_tests,
results_dir,
pcap_file,
kafka_file,
log_file,
)
if not len(selected_individual_tests) > 0:
print(f"Selected test sequences: {selected_test_sequences}")
print(f"Results saved in {results_dir}")
print("\nResults directory tree:")
print_directory_tree(results_dir)