Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import enum 

2import json 

3from collections.abc import Mapping 

4from dataclasses import dataclass, fields 

5from os.path import dirname, exists, join 

6from pkgutil import iter_modules 

7from typing import Any, Callable, Dict, Iterator, List, Optional 

8 

9from config import config 

10from errors.internal import DppMetaDataInitKeyError, DppMetaDataInitTypeError, DppTaxonomyInitInternalError 

11 

12 

13class CommandType(enum.IntEnum): 

14 Docker = 1 

15 Script = 2 

16 

17 

18@dataclass(frozen=True) 

19class Command: 

20 type: CommandType 

21 lines: List[str] 

22 

23 

24@dataclass(frozen=True) 

25class ExtraTest(Command): 

26 is_pass: bool 

27 

28 

29class TestType(enum.IntEnum): 

30 Automake = 1 

31 Ctest = 2 

32 Googletest = 3 

33 Kyua = 4 

34 

35 

36@dataclass(frozen=True) 

37class Common: 

38 build_command: List[Command] 

39 build_coverage_command: List[Command] 

40 test_type: TestType 

41 test_command: List[Command] 

42 test_coverage_command: List[Command] 

43 gcov: "Gcov" 

44 

45 

46@dataclass(frozen=True) 

47class Gcov: 

48 exclude: List[str] 

49 command: List[Command] 

50 

51 

52@dataclass(frozen=True) 

53class Defect: 

54 hash: str 

55 buggy_patch: str 

56 fixed_patch: str 

57 common_patch: str 

58 split_patch: str 

59 id: int 

60 num_cases: int 

61 case: List[int] 

62 tags: List[str] 

63 description: str 

64 extra_tests: List[ExtraTest] 

65 

66 

67@dataclass(frozen=True) 

68class MetaInfo: 

69 url: str 

70 description: str 

71 vcs: str 

72 

73 

74def create_command(value: List[Dict[str, Any]]) -> List[Command]: 

75 return [Command(CommandType[v["type"].capitalize()], v["lines"]) for v in value] 

76 

77 

78def create_gcov(value: Dict[str, Any]) -> Gcov: 

79 return Gcov( 

80 [d for d in value["exclude"]], 

81 create_command(value["commands"]), 

82 ) 

83 

84 

85def create_common(value: Dict[str, Any]) -> Common: 

86 return Common( 

87 create_command(value["build"]["commands"]), 

88 create_command(value["build-coverage"]["commands"]), 

89 TestType[value["test-type"].capitalize()], 

90 create_command(value["test"]["commands"]), 

91 create_command(value["test-coverage"]["commands"]), 

92 create_gcov(value["gcov"]), 

93 ) 

94 

95 

96def create_info(value: Dict[str, Any]) -> MetaInfo: 

97 return MetaInfo(value["url"], value["short-desc"], value["vcs"]) 

98 

99 

100class _MetaDataVariables(Mapping): 

101 def __init__(self, *args, **kwargs): 

102 self._store = dict(*args, **kwargs) 

103 

104 def __getitem__(self, k: str) -> str: 

105 # Remove cmake export macro when DPP_CMAKE_COMPILATION_DB_TOOL is set. 

106 if k == "@DPP_CMAKE_GEN_COMPILATION_DB@" and getattr( 

107 config, "DPP_CMAKE_COMPILATION_DB_TOOL" 

108 ): 

109 return "" 

110 

111 v: Optional[str] = self._store[k] 

112 if v is None: 

113 v = getattr(config, k.strip("@")) 

114 return v 

115 

116 def __len__(self) -> int: 

117 return len(self._store) 

118 

119 def __iter__(self) -> Iterator: 

120 return iter(self._store) 

121 

122 def __eq__(self, other): 

123 return self._store == other 

124 

125 

126def _do_replace(variables: Dict, string: str) -> str: 

127 return " ".join([variables.get(w, w) for w in string.split()]) 

128 

129 

130def _do_strip(variables: Dict, string: str) -> str: 

131 return " ".join([w for w in string.split() if w not in variables]) 

132 

133 

134class MetaData: 

135 _variables = _MetaDataVariables( 

136 { 

137 "@DPP_PARALLEL_BUILD@": None, 

138 } 

139 ) 

140 _common_variables = _MetaDataVariables( 

141 { 

142 "@DPP_CMAKE_GEN_COMPILATION_DB@": "-DCMAKE_EXPORT_COMPILE_COMMANDS=1", 

143 "@DPP_COMPILATION_DB_TOOL@": None, 

144 "@DPP_CMAKE_COMPILATION_DB_TOOL@": None, 

145 "@DPP_ADDITIONAL_GCOV_OPTIONS@": None, 

146 } 

147 ) 

148 

149 def __init__(self, name: str, path: str): 

150 self.name = name 

151 self._path: str = path 

152 self._info: Optional[MetaInfo] = None 

153 self._common: Optional[Common] = None 

154 self._defects: List[Defect] = [] 

155 

156 @property 

157 def dockerfile(self) -> str: 

158 return f"{self._path}/Dockerfile" 

159 

160 @property 

161 def info(self): 

162 if not self._info: 

163 self._load() 

164 return self._info 

165 

166 @property 

167 def common(self): 

168 if not self._common: 

169 self._load() 

170 return self._preprocess_common(self._common, False) 

171 

172 @property 

173 def common_capture(self): 

174 if not self._common: 

175 self._load() 

176 return self._preprocess_common(self._common, True) 

177 

178 @property 

179 def common_gcov_replaced(self): 

180 if not self._common: 

181 self._load() 

182 return self._preprocess_common(self._common, True) 

183 

184 @property 

185 def defects(self): 

186 if not self._defects: 

187 self._load() 

188 return self._defects 

189 

190 def _load(self): 

191 with open(f"{self._path}/meta.json", "r", encoding="utf-8") as fp: 

192 contents: str = fp.read() 

193 for key, value in self._variables.items(): 

194 contents = contents.replace(key, value) 

195 meta = json.loads(contents) 

196 self._load_info(meta) 

197 self._load_common(meta) 

198 self._load_defects(meta) 

199 

200 def _load_info(self, meta: Dict): 

201 try: 

202 self._info = create_info(meta["info"]) 

203 except KeyError as e: 

204 raise DppTaxonomyInitInternalError(e.args[0], Defect.__name__) 

205 

206 def _load_common(self, meta: Dict): 

207 try: 

208 self._common = create_common(meta["common"]) 

209 except KeyError as e: 

210 raise DppTaxonomyInitInternalError(e.args[0], Common.__name__) 

211 

212 def _load_defects(self, meta: Dict): 

213 def check_path(path: str) -> str: 

214 return path if exists(path) else "" 

215 

216 try: 

217 self._defects = [ 

218 Defect( 

219 defect["hash"], 

220 check_path(f"{self._path}/patch/{index:04}-buggy.patch"), 

221 check_path(f"{self._path}/patch/{index:04}-fixed.patch"), 

222 check_path(f"{self._path}/patch/{index:04}-common.patch"), 

223 check_path(f"{self._path}/patch/{index:04}-split.patch"), 

224 defect["id"], 

225 defect["num_cases"], 

226 defect["case"], 

227 defect["tags"], 

228 defect["description"], 

229 [ 

230 [ExtraTest(e["type"], e["lines"], e["is_pass"]) for e in et] 

231 for et in defect["extra_tests"] 

232 ] 

233 if "extra_tests" in defect 

234 else [], 

235 ) 

236 for index, defect in enumerate(meta["defects"], start=1) 

237 ] 

238 except KeyError as e: 

239 raise DppTaxonomyInitInternalError(e.args[0], MetaInfo.__name__) 

240 

241 @staticmethod 

242 def _preprocess_common(common: Common, replace: bool) -> Common: 

243 data: Dict = { 

244 "test_type": common.test_type, 

245 "gcov": common.gcov, 

246 } 

247 

248 func: Callable[[Dict, str], str] = _do_replace if replace else _do_strip 

249 command_fields = [f for f in fields(common) if f.type == List[Command]] 

250 for command_field in command_fields: 

251 objs: List[Command] = getattr(common, command_field.name) 

252 data[command_field.name] = [ 

253 Command( 

254 obj.type, 

255 [func(MetaData._common_variables, line) for line in obj.lines], 

256 ) 

257 for obj in objs 

258 ] 

259 

260 MetaData._preprocess_build_command(func, data) 

261 MetaData._preprocess_gcov_command(func, data) 

262 return Common(**data) 

263 

264 @staticmethod 

265 def _preprocess_build_command( 

266 func: Callable[[Dict, str], str], data: Dict[str, Any] 

267 ): 

268 def create_commands(steps: List[Dict[str, Any]]) -> List[Command]: 

269 commands: List[Command] = [] 

270 for step in steps: 

271 try: 

272 assert step["type"].capitalize() in valid_keys 

273 commands.append( 

274 Command( 

275 step["type"].capitalize(), 

276 [ 

277 func(MetaData._common_variables, line) 

278 for line in step["lines"] 

279 ], 

280 ) 

281 ) 

282 except KeyError: 

283 raise DppMetaDataInitKeyError(step) 

284 except AssertionError: 

285 raise DppMetaDataInitTypeError(step) 

286 return commands 

287 

288 valid_keys = tuple(e.name for e in CommandType) 

289 build_fields = ("build_command", "build_coverage_command") 

290 

291 for build_field in build_fields: 

292 pre_steps = create_commands(config.DPP_BUILD_PRE_STEPS) 

293 post_steps = create_commands(config.DPP_BUILD_POST_STEPS) 

294 data[build_field] = pre_steps + data[build_field] + post_steps 

295 

296 # TODO: extend below functions for test and gcov command. 

297 @staticmethod 

298 def _preprocess_test_command( 

299 func: Callable[[Dict, str], str], data: Dict[str, Any] 

300 ): 

301 pass 

302 

303 @staticmethod 

304 def _preprocess_gcov_command( 

305 func: Callable[[Dict, str], str], data: Dict[str, Any] 

306 ): 

307 for command_index, command in enumerate(data["gcov"].command): 

308 for line_index, line in enumerate(command.lines): 

309 command.lines[line_index] = func(MetaData._common_variables, line) 

310 data["gcov"].command[command_index] = command 

311 

312 

313class _LazyTaxonomy: 

314 def __set_name__(self, owner, name): 

315 self.name = f"_{name}" 

316 

317 def __get__(self, instance, owner): 

318 try: 

319 return getattr(owner, self.name) 

320 except AttributeError: 

321 setattr(owner, self.name, self._load_taxonomy(instance.base)) 

322 return getattr(owner, self.name) 

323 

324 @staticmethod 

325 def _load_taxonomy(base: str) -> Dict[str, MetaData]: 

326 d = dict( 

327 [ 

328 (name, MetaData(name, f"{join(base, name)}")) 

329 for _, name, is_pkg in iter_modules([dirname(__file__)]) 

330 if is_pkg 

331 ] 

332 ) 

333 return d 

334 

335 

336class Taxonomy(Mapping): 

337 _lazy_taxonomy = _LazyTaxonomy() 

338 

339 def __init__(self): 

340 self.base: str = dirname(__file__) 

341 

342 @property 

343 def _store(self) -> Dict[str, MetaData]: 

344 return self._lazy_taxonomy 

345 

346 def __getitem__(self, key: str) -> MetaData: 

347 return self._store[self._keytransform(key)] 

348 

349 def __iter__(self) -> Iterator: 

350 return iter(self._store) 

351 

352 def __len__(self) -> int: 

353 return len(self._store) 

354 

355 def _keytransform(self, key: str): 

356 assert exists( 

357 join(self.base, key, "meta.json") 

358 ), f"Taxonomy '{key}' does not exist" 

359 return key