Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2Test command. 

3 

4Run tests of the project inside a container. 

5""" 

6import argparse 

7import shutil 

8from dataclasses import dataclass 

9from os import getcwd, system 

10from pathlib import Path 

11from textwrap import dedent 

12from typing import Callable, Generator, List, Optional, Set, Union, cast 

13 

14from config import config 

15from errors import (DppAdditionalGcovOptionsWithoutCoverage, DppArgparseFileNotFoundError, 

16 DppCaseExpressionInternalError) 

17from errors.argparser import DppArgparseInvalidCaseExpressionError 

18from message import message 

19from processor.core.argparser import create_common_project_parser 

20from processor.core.command import DockerCommand, DockerCommandScript, DockerCommandScriptGenerator 

21from processor.core.data import Worktree 

22from taxonomy import Command, CommandType, Defect, MetaData 

23 

24 

25class AdditionalGcovOptions(argparse.Action): 

26 """ 

27 Additional options for gcov command. 

28 """ 

29 

30 def __call__(self, parser, namespace, values, option_string=None): 

31 config.DPP_ADDITIONAL_GCOV_OPTIONS = values 

32 setattr(namespace, self.dest, values) 

33 

34 

35class ValidateCase(argparse.Action): 

36 def __call__(self, parser, namespace, values, option_string=None): 

37 """ 

38 case_expression == INCLUDE[:EXCLUDE] 

39 INCLUDE | EXCLUDE 

40 * select: ',' 

41 * range: '-' 

42 e.g. 

43 1-100:3,6,7 (to 100 from 1 except 3, 6 and 7) 

44 20-30,40-88:47-52 (to 30 from 20 and to 88 from 40 except to 62 from 47) 

45 """ 

46 

47 def expr2set(expr: str) -> Set[int]: 

48 if not expr: 

49 return set() 

50 val: Set[int] = set() 

51 partitions = expr.split(",") 

52 for partition in partitions: 

53 tokens = partition.split("-") 

54 if len(tokens) == 1: 

55 val.add(int(tokens[0])) 

56 else: 

57 val.update(range(int(tokens[0]), int(tokens[1]) + 1)) 

58 return val 

59 

60 def validate_each_case(max_num_cases: int, case_set: Set[int]) -> Set[int]: 

61 if all(0 < case <= max_num_cases for case in case_set): 

62 return case_set 

63 raise DppArgparseInvalidCaseExpressionError( 

64 index, metadata.name, max_num_cases, values 

65 ) 

66 

67 try: 

68 metadata: MetaData = namespace.metadata 

69 index: int = namespace.worktree.index 

70 except AttributeError: 

71 raise DppCaseExpressionInternalError(namespace) 

72 

73 num_cases = metadata.defects[index - 1].num_cases 

74 expr_tokens = values.split(":") 

75 included_cases = validate_each_case(num_cases, expr2set(expr_tokens[0])) 

76 excluded_cases = validate_each_case( 

77 num_cases, expr2set(expr_tokens[1]) if len(expr_tokens) > 1 else set() 

78 ) 

79 setattr(namespace, self.dest, (included_cases, excluded_cases)) 

80 

81 

82class ValidateOutputDirectory(argparse.Action): 

83 def __call__(self, parser, namespace, values, option_string=None): 

84 Path(values).mkdir(parents=True, exist_ok=True) 

85 setattr(namespace, self.dest, values) 

86 

87 

88class ObservableAttributeMeta(type): 

89 def __new__(mcs, name, bases, attr, methods=None): 

90 if methods is None: 

91 methods = [] 

92 for method in methods: 

93 attr[method] = mcs.wrap(attr[method]) 

94 return super().__new__(mcs, name, bases, attr) 

95 

96 @classmethod 

97 def wrap(mcs, fn): 

98 def update(obj, *args, **kwargs): 

99 output = fn(obj, *args, **kwargs) 

100 for callback in obj.callbacks: 

101 callback(*args, **kwargs) 

102 return output 

103 

104 return update 

105 

106 

107class ObservableAttribute(metaclass=ObservableAttributeMeta): 

108 def __init__(self, callbacks: List[Callable]): 

109 self._callbacks = callbacks 

110 

111 @property 

112 def callbacks(self): 

113 return self._callbacks 

114 

115 

116class ManagedAttribute(ObservableAttribute, methods=["__set__"]): 

117 def __set_name__(self, owner, name): 

118 self.name = f"_{name}" 

119 

120 def __get__(self, instance, owner): 

121 return getattr(instance, self.name) 

122 

123 def __set__(self, instance, value): 

124 setattr(instance, self.name, value) 

125 

126 

127@dataclass 

128class CapturedOutput: 

129 exit_code: int 

130 stream: str 

131 

132 

133class CapturedOutputAttributeMixin: 

134 _captured_output: Optional[Union[ManagedAttribute, CapturedOutput]] = None 

135 

136 @property 

137 def captured_output(self) -> CapturedOutput: 

138 return self._captured_output 

139 

140 @captured_output.setter 

141 def captured_output(self, value): 

142 self._captured_output = value 

143 

144 

145class TestCommandScript(DockerCommandScript, CapturedOutputAttributeMixin): 

146 """ 

147 Script to execute test. 

148 """ 

149 

150 __test__ = False 

151 

152 def __init__( 

153 self, 

154 case: int, 

155 command_type: CommandType, 

156 command: List[str], 

157 ): 

158 super().__init__(command_type, command) 

159 self._case = case 

160 

161 @property 

162 def case(self) -> int: 

163 return self._case 

164 

165 def before(self): 

166 message.stdout_progress_detail(f"case #{self._case}") 

167 

168 def step(self, linenr: int, line: str): 

169 pass 

170 

171 def output(self, linenr: Optional[int], exit_code: int, output: str): 

172 if linenr == len(self.lines): 

173 self.captured_output = CapturedOutput(exit_code, output) 

174 

175 def after(self): 

176 pass 

177 

178 

179class SetupTestCommandScript(TestCommandScript): 

180 """ 

181 Script to execute before running actual code. 

182 

183 It prepends an extra command which writes an index number to file addition to the given commands. 

184 It is script's responsibility to read the file and parse into appropriate filter name. 

185 For instance, if the test is automake-generated, it might convert the number into 'TESTS' value. 

186 If the test is cmake-generated, it might convert the number into regex value of '--tests-regex'. 

187 """ 

188 

189 OUTPUT_NAME = "DPP_TEST_INDEX" 

190 

191 def __init__( 

192 self, 

193 case: int, 

194 ): 

195 super().__init__( 

196 case, 

197 CommandType.Docker, 

198 [f"sh -c 'echo {case} > {SetupTestCommandScript.OUTPUT_NAME}'"], 

199 ) 

200 

201 def before(self): 

202 # Override TestCommandScript.before method to prevent echoing. 

203 pass 

204 

205 

206class CoverageTestCommandScript(TestCommandScript): 

207 """ 

208 Script to execute test with coverage. 

209 """ 

210 

211 def __init__( 

212 self, 

213 case: int, 

214 command_type: CommandType, 

215 command: List[str], 

216 ): 

217 super().__init__(case, command_type, command) 

218 

219 

220class TeardownTestCommandScript(TestCommandScript): 

221 """ 

222 Script to execute after running CoverageTestCommandScript. 

223 

224 Clear the coverage data by remove gcov directory and its contents. 

225 related to: https://github.com/Suresoft-GLaDOS/defects4cpp/issues/66 

226 """ 

227 

228 __test__ = False 

229 

230 def __init__( 

231 self, 

232 case: int, 

233 ): 

234 super().__init__( 

235 case, 

236 CommandType.Docker, 

237 ["sh -c 'rm -rf gcov'"], 

238 ) 

239 

240 def before(self): 

241 # Override TestCommandScript.before method to prevent echoing. 

242 pass 

243 

244 

245class GcovCommandScript(DockerCommandScript, CapturedOutputAttributeMixin): 

246 """ 

247 Script to execute gcov. 

248 """ 

249 

250 def __init__( 

251 self, 

252 case: int, 

253 command_type: CommandType, 

254 command: List[str], 

255 ): 

256 super().__init__(command_type, command) 

257 self._case = case 

258 

259 @property 

260 def case(self) -> int: 

261 return self._case 

262 

263 def before(self): 

264 pass 

265 

266 def step(self, linenr: int, line: str): 

267 pass 

268 

269 def output(self, linenr: Optional[int], exit_code: Optional[int], output: str): 

270 if linenr == len(self.lines): 

271 self.captured_output = CapturedOutput(exit_code, output) 

272 

273 def after(self): 

274 pass 

275 

276 

277class RunGcovrTestCommandScript(TestCommandScript): 

278 """ 

279 Script to execute gcovr to make summary.json. 

280 

281 Clear the coverage data by remove gcov directory and its contents. 

282 related to: https://github.com/Suresoft-GLaDOS/defects4cpp/issues/66 

283 """ 

284 

285 def before(self): 

286 pass 

287 

288 def __init__( 

289 self, 

290 case: int, 

291 exclude: List[str], 

292 ): 

293 exclude_flags = " ".join( 

294 [f"--gcov-exclude {excluded_gcov}" for excluded_gcov in exclude] 

295 ) 

296 super().__init__( 

297 case, 

298 CommandType.Docker, 

299 [ 

300 f"gcovr {exclude_flags} --keep --use-gcov-files --json-pretty --json gcov/summary.json gcov", 

301 f"gcovr {exclude_flags} --keep --use-gcov-files --html gcov/summary.html gcov", 

302 ], 

303 ) 

304 

305 

306class TestCommandScriptGenerator(DockerCommandScriptGenerator): 

307 """ 

308 Factory class of CommandScript 

309 """ 

310 

311 __test__ = False 

312 

313 def __init__( 

314 self, 

315 defect: Defect, 

316 coverage: bool, 

317 test_command: List[Command], 

318 test_cases: Set[int], 

319 callbacks: List[Callable], 

320 metadata: MetaData, 

321 worktree: Worktree, 

322 stream: bool, 

323 ): 

324 super().__init__(metadata, worktree, stream) 

325 self._defect = defect 

326 self._coverage = coverage 

327 self._test_command = test_command 

328 self._test_cases = test_cases 

329 self._callbacks = callbacks 

330 self._extra_tests = defect.extra_tests 

331 self._gcov = metadata.common_gcov_replaced.gcov 

332 

333 def create(self) -> Generator[TestCommandScript, None, None]: 

334 self._attach(CapturedOutputAttributeMixin, "_captured_output") 

335 

336 if self._coverage: 

337 yield from self._create_coverage_impl() 

338 else: 

339 yield from self._create_impl() 

340 

341 def _attach(self, klass, field_name: str): 

342 descriptor = ManagedAttribute(self._callbacks) 

343 setattr(klass, field_name, descriptor) 

344 descriptor.__set_name__(klass, field_name) 

345 

346 def _create_impl(self) -> Generator[TestCommandScript, None, None]: 

347 for case in sorted(self._test_cases): 

348 yield SetupTestCommandScript(case) 

349 test_cmd = ( 

350 self._test_command 

351 if case <= self._defect.num_cases - len(self._extra_tests) 

352 else self._extra_tests[ 

353 case - self._defect.num_cases + len(self._extra_tests) - 1 

354 ] 

355 ) 

356 for t in test_cmd: 

357 yield TestCommandScript(case, t.type, t.lines) 

358 

359 def _create_coverage_impl(self) -> Generator[TestCommandScript, None, None]: 

360 for case in sorted(self._test_cases): 

361 yield SetupTestCommandScript(case) 

362 test_cmd = ( 

363 self._test_command 

364 if case <= self._defect.num_cases - len(self._extra_tests) 

365 else self._extra_tests[ 

366 case - self._defect.num_cases + len(self._extra_tests) - 1 

367 ] 

368 ) 

369 for t in test_cmd: 

370 yield CoverageTestCommandScript(case, t.type, t.lines) 

371 for gcov_cmd in self._gcov.command: 

372 yield GcovCommandScript(case, gcov_cmd.type, gcov_cmd.lines) 

373 yield RunGcovrTestCommandScript(case, self._gcov.exclude) 

374 yield TeardownTestCommandScript(case) 

375 

376 

377class TestCommand(DockerCommand): 

378 """ 

379 Run test command either with or without coverage. 

380 """ 

381 

382 __test__ = False 

383 

384 def __init__(self): 

385 super().__init__(parser=create_common_project_parser()) 

386 # TODO: write argparse description in detail 

387 self.parser.add_argument( 

388 "-c", 

389 "--case", 

390 help="expression to filter cases (see `example <case-example_>`_)", 

391 type=str, 

392 dest="case", 

393 action=ValidateCase, 

394 ) 

395 self.parser.add_argument( 

396 "--output-dir", 

397 help="output directory to generate coverage data instead of the current directory.", 

398 type=str, 

399 dest="output_dir", 

400 action=ValidateOutputDirectory, 

401 ) 

402 self.parser.add_argument( 

403 "--additional-gcov-options", 

404 type=str, 

405 dest="additional_gcov_options", 

406 action=AdditionalGcovOptions, 

407 help="set additional options to gcov command", 

408 ) 

409 self.parser.usage = ( 

410 "bugcpp.py test PATH [-j|--jobs=JOBS] " 

411 "[--coverage [--additional-gcov-options=ADDITIONAL_GCOV_OPTIONS]] " 

412 "[-v|--verbose] [-c|--case=expr] [--output-dir=directory]" 

413 ) 

414 self.parser.description = dedent( 

415 """\ 

416 Run testsuite inside docker. The project must have been built previously. 

417 """ 

418 ) 

419 self.metadata: Optional[MetaData] = None 

420 self.worktree: Optional[Worktree] = None 

421 self.coverage: Optional[bool] = None 

422 self.output: str = getcwd() 

423 self.coverage_files: List[str] = [] 

424 self.failed_coverage_files: List[str] = [] 

425 

426 def create_script_generator( 

427 self, args: argparse.Namespace 

428 ) -> DockerCommandScriptGenerator: 

429 if not args.coverage and args.additional_gcov_options: 

430 raise DppAdditionalGcovOptionsWithoutCoverage() 

431 metadata = self.metadata = args.metadata 

432 worktree = self.worktree = args.worktree 

433 self.coverage = True if args.coverage else False 

434 if args.output_dir: 

435 self.output = args.output_dir 

436 

437 # FIXME: metadata's values are fixed when metadata.commonXXXX is called 

438 # When we call metadata.common after calling metadata.common_gcov_replace, 

439 # metadata.common.gcov_replace.gcov will not not replaced 

440 test_command = ( 

441 metadata.common_gcov_replaced.test_coverage_command 

442 if self.coverage 

443 else metadata.common.test_command 

444 ) 

445 index = worktree.index 

446 

447 # Select cases to run. If none is given, select all. 

448 selected_defect = metadata.defects[index - 1] 

449 

450 if not args.case: 

451 cases = set(range(1, selected_defect.num_cases + 1)) 

452 else: 

453 included_cases, excluded_cases = args.case 

454 if not included_cases: 

455 included_cases = set(range(1, selected_defect.num_cases + 1)) 

456 cases = included_cases.difference(excluded_cases) 

457 

458 return TestCommandScriptGenerator( 

459 selected_defect, 

460 self.coverage, 

461 test_command, 

462 cases, 

463 [self.script_callback], 

464 metadata, 

465 worktree, 

466 stream=True if args.verbose else False, 

467 ) 

468 

469 def setup(self, generator: DockerCommandScriptGenerator): 

470 message.info(__name__, f"'{generator.metadata.name}'") 

471 if not self.coverage: 

472 message.stdout_progress(f"[{generator.metadata.name}] running test suites") 

473 else: 

474 message.stdout_progress( 

475 f"[{generator.metadata.name}] running test suites (coverage)" 

476 ) 

477 

478 def teardown(self, generator: DockerCommandScriptGenerator): 

479 message.info(__name__, "done") 

480 message.stdout_progress(f"[{generator.metadata.name}] done") 

481 if self.coverage: 

482 if self.coverage_files: 

483 created = [f" - {c}\n" for c in self.coverage_files] 

484 message.stdout_progress_detail( 

485 f"Successfully created:\n{''.join(created)}" 

486 ) 

487 if self.failed_coverage_files: 

488 not_created = [f" - {c}\n" for c in self.failed_coverage_files] 

489 message.stdout_progress_detail( 

490 f"Could not create files:\n{''.join(not_created)}" 

491 ) 

492 

493 def summary_dir(self, case: int) -> Path: 

494 """ 

495 Return path where coverage data should be created. 

496 

497 Parameters 

498 ---------- 

499 case : int 

500 Case number. 

501 

502 Returns 

503 ------- 

504 pathlib.Path 

505 """ 

506 p = Path(self.output) / f"{self.metadata.name}-{self.worktree.suffix}-{case}" 

507 if not p.exists(): 

508 p.mkdir(parents=True, exist_ok=True) 

509 return p 

510 

511 def script_callback(self, script: TestCommandScript, *args, **kwargs): 

512 """ 

513 Callback function to register used to collect data after each command is executed. 

514 

515 Parameters 

516 ---------- 

517 script : TestCommandScript 

518 Script instance which has been executed. 

519 

520 Returns 

521 ------- 

522 None 

523 """ 

524 if ( 

525 type(script) is TestCommandScript 

526 or type(script) is CoverageTestCommandScript 

527 ): 

528 self._save_result(script) 

529 elif type(script) is RunGcovrTestCommandScript: 

530 self._save_coverage(cast(GcovCommandScript, script)) 

531 else: 

532 pass 

533 

534 def _save_result(self, script: TestCommandScript): 

535 """ 

536 Write exit code and captured stdout to file. 

537 

538 - {case}.output: contains captured output 

539 - {case}.test: either 'passed' or 'failed' string. 

540 

541 It should be invoked only after test command is executed. 

542 

543 Parameters 

544 ---------- 

545 script : TestCommandScript 

546 Script instance which has been executed. 

547 

548 Returns 

549 ------- 

550 None 

551 """ 

552 d = self.summary_dir(script.case) 

553 with open(d / f"{script.case}.output", "w+", encoding="utf-8") as output_file: 

554 output_file.write(script.captured_output.stream) 

555 with open(d / f"{script.case}.test", "w+", encoding="utf-8") as result_file: 

556 result_file.write( 

557 "passed" if script.captured_output.exit_code == 0 else "failed" 

558 ) 

559 

560 def _save_coverage(self, script: GcovCommandScript): 

561 """ 

562 Move json files to the target directory, and create summary.json file. 

563 Output format should be the following: 

564 {project-name}-{type}#{index}-{case}/summary.json 

565 

566 It should be invoked only after each coverage command is executed. 

567 

568 Parameters 

569 ---------- 

570 script : GcovCommandScript 

571 Script instance which has been executed. 

572 

573 Returns 

574 ------- 

575 None 

576 """ 

577 coverage = self.worktree.host / "gcov" 

578 coverage_dest = self.summary_dir(script.case) 

579 

580 if coverage.exists(): 

581 for file in coverage.glob("*"): 

582 # Full path should be passed to overwrite if already exists. 

583 try: 

584 shutil.move(str(file), str(coverage_dest / file.name)) 

585 except PermissionError: 

586 print(f"PermissionError: {file}") 

587 # FIXME: Is this the right way to handle this? 

588 system(f"sudo mv {str(file)} {str(coverage_dest / file.name)}") 

589 continue 

590 else: 

591 # Do not rmdir (TeardownTestCommandScript will do) 

592 # related to https://github.com/Suresoft-GLaDOS/defects4cpp/issues/66 

593 self.coverage_files.append(str(coverage_dest / coverage.name)) 

594 else: 

595 self.failed_coverage_files.append(str(coverage_dest / coverage.name)) 

596 

597 @property 

598 def help(self) -> str: 

599 return "Run test"