Coverage for bugscpp/taxonomy/taxonomy.py : 94%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import enum
2import json
3from collections.abc import Mapping
4from dataclasses import dataclass, fields
5from os.path import dirname, exists, join
6from pkgutil import iter_modules
7from typing import Any, Callable, Dict, Iterator, List, Optional
9from config import config
10from errors.internal import DppMetaDataInitKeyError, DppMetaDataInitTypeError, DppTaxonomyInitInternalError
13class CommandType(enum.IntEnum):
14 Docker = 1
15 Script = 2
18@dataclass(frozen=True)
19class Command:
20 type: CommandType
21 lines: List[str]
24@dataclass(frozen=True)
25class ExtraTest(Command):
26 is_pass: bool
29class TestType(enum.IntEnum):
30 Automake = 1
31 Ctest = 2
32 Googletest = 3
33 Kyua = 4
36@dataclass(frozen=True)
37class Common:
38 build_command: List[Command]
39 build_coverage_command: List[Command]
40 test_type: TestType
41 test_command: List[Command]
42 test_coverage_command: List[Command]
43 gcov: "Gcov"
46@dataclass(frozen=True)
47class Gcov:
48 exclude: List[str]
49 command: List[Command]
52@dataclass(frozen=True)
53class Defect:
54 hash: str
55 buggy_patch: str
56 fixed_patch: str
57 common_patch: str
58 split_patch: str
59 id: int
60 num_cases: int
61 case: List[int]
62 tags: List[str]
63 description: str
64 extra_tests: List[ExtraTest]
67@dataclass(frozen=True)
68class MetaInfo:
69 url: str
70 description: str
71 vcs: str
74def create_command(value: List[Dict[str, Any]]) -> List[Command]:
75 return [Command(CommandType[v["type"].capitalize()], v["lines"]) for v in value]
78def create_gcov(value: Dict[str, Any]) -> Gcov:
79 return Gcov(
80 [d for d in value["exclude"]],
81 create_command(value["commands"]),
82 )
85def create_common(value: Dict[str, Any]) -> Common:
86 return Common(
87 create_command(value["build"]["commands"]),
88 create_command(value["build-coverage"]["commands"]),
89 TestType[value["test-type"].capitalize()],
90 create_command(value["test"]["commands"]),
91 create_command(value["test-coverage"]["commands"]),
92 create_gcov(value["gcov"]),
93 )
96def create_info(value: Dict[str, Any]) -> MetaInfo:
97 return MetaInfo(value["url"], value["short-desc"], value["vcs"])
100class _MetaDataVariables(Mapping):
101 def __init__(self, *args, **kwargs):
102 self._store = dict(*args, **kwargs)
104 def __getitem__(self, k: str) -> str:
105 # Remove cmake export macro when DPP_CMAKE_COMPILATION_DB_TOOL is set.
106 if k == "@DPP_CMAKE_GEN_COMPILATION_DB@" and getattr(
107 config, "DPP_CMAKE_COMPILATION_DB_TOOL"
108 ):
109 return ""
111 v: Optional[str] = self._store[k]
112 if v is None:
113 v = getattr(config, k.strip("@"))
114 return v
116 def __len__(self) -> int:
117 return len(self._store)
119 def __iter__(self) -> Iterator:
120 return iter(self._store)
122 def __eq__(self, other):
123 return self._store == other
126def _do_replace(variables: Dict, string: str) -> str:
127 return " ".join([variables.get(w, w) for w in string.split()])
130def _do_strip(variables: Dict, string: str) -> str:
131 return " ".join([w for w in string.split() if w not in variables])
134class MetaData:
135 _variables = _MetaDataVariables(
136 {
137 "@DPP_PARALLEL_BUILD@": None,
138 }
139 )
140 _common_variables = _MetaDataVariables(
141 {
142 "@DPP_CMAKE_GEN_COMPILATION_DB@": "-DCMAKE_EXPORT_COMPILE_COMMANDS=1",
143 "@DPP_COMPILATION_DB_TOOL@": None,
144 "@DPP_CMAKE_COMPILATION_DB_TOOL@": None,
145 "@DPP_ADDITIONAL_GCOV_OPTIONS@": None,
146 }
147 )
149 def __init__(self, name: str, path: str):
150 self.name = name
151 self._path: str = path
152 self._info: Optional[MetaInfo] = None
153 self._common: Optional[Common] = None
154 self._defects: List[Defect] = []
156 @property
157 def dockerfile(self) -> str:
158 return f"{self._path}/Dockerfile"
160 @property
161 def info(self):
162 if not self._info:
163 self._load()
164 return self._info
166 @property
167 def common(self):
168 if not self._common:
169 self._load()
170 return self._preprocess_common(self._common, False)
172 @property
173 def common_capture(self):
174 if not self._common:
175 self._load()
176 return self._preprocess_common(self._common, True)
178 @property
179 def common_gcov_replaced(self):
180 if not self._common:
181 self._load()
182 return self._preprocess_common(self._common, True)
184 @property
185 def defects(self):
186 if not self._defects:
187 self._load()
188 return self._defects
190 def _load(self):
191 with open(f"{self._path}/meta.json", "r", encoding="utf-8") as fp:
192 contents: str = fp.read()
193 for key, value in self._variables.items():
194 contents = contents.replace(key, value)
195 meta = json.loads(contents)
196 self._load_info(meta)
197 self._load_common(meta)
198 self._load_defects(meta)
200 def _load_info(self, meta: Dict):
201 try:
202 self._info = create_info(meta["info"])
203 except KeyError as e:
204 raise DppTaxonomyInitInternalError(e.args[0], Defect.__name__)
206 def _load_common(self, meta: Dict):
207 try:
208 self._common = create_common(meta["common"])
209 except KeyError as e:
210 raise DppTaxonomyInitInternalError(e.args[0], Common.__name__)
212 def _load_defects(self, meta: Dict):
213 def check_path(path: str) -> str:
214 return path if exists(path) else ""
216 try:
217 self._defects = [
218 Defect(
219 defect["hash"],
220 check_path(f"{self._path}/patch/{index:04}-buggy.patch"),
221 check_path(f"{self._path}/patch/{index:04}-fixed.patch"),
222 check_path(f"{self._path}/patch/{index:04}-common.patch"),
223 check_path(f"{self._path}/patch/{index:04}-split.patch"),
224 defect["id"],
225 defect["num_cases"],
226 defect["case"],
227 defect["tags"],
228 defect["description"],
229 [
230 [ExtraTest(e["type"], e["lines"], e["is_pass"]) for e in et]
231 for et in defect["extra_tests"]
232 ]
233 if "extra_tests" in defect
234 else [],
235 )
236 for index, defect in enumerate(meta["defects"], start=1)
237 ]
238 except KeyError as e:
239 raise DppTaxonomyInitInternalError(e.args[0], MetaInfo.__name__)
241 @staticmethod
242 def _preprocess_common(common: Common, replace: bool) -> Common:
243 data: Dict = {
244 "test_type": common.test_type,
245 "gcov": common.gcov,
246 }
248 func: Callable[[Dict, str], str] = _do_replace if replace else _do_strip
249 command_fields = [f for f in fields(common) if f.type == List[Command]]
250 for command_field in command_fields:
251 objs: List[Command] = getattr(common, command_field.name)
252 data[command_field.name] = [
253 Command(
254 obj.type,
255 [func(MetaData._common_variables, line) for line in obj.lines],
256 )
257 for obj in objs
258 ]
260 MetaData._preprocess_build_command(func, data)
261 MetaData._preprocess_gcov_command(func, data)
262 return Common(**data)
264 @staticmethod
265 def _preprocess_build_command(
266 func: Callable[[Dict, str], str], data: Dict[str, Any]
267 ):
268 def create_commands(steps: List[Dict[str, Any]]) -> List[Command]:
269 commands: List[Command] = []
270 for step in steps:
271 try:
272 assert step["type"].capitalize() in valid_keys
273 commands.append(
274 Command(
275 step["type"].capitalize(),
276 [
277 func(MetaData._common_variables, line)
278 for line in step["lines"]
279 ],
280 )
281 )
282 except KeyError:
283 raise DppMetaDataInitKeyError(step)
284 except AssertionError:
285 raise DppMetaDataInitTypeError(step)
286 return commands
288 valid_keys = tuple(e.name for e in CommandType)
289 build_fields = ("build_command", "build_coverage_command")
291 for build_field in build_fields:
292 pre_steps = create_commands(config.DPP_BUILD_PRE_STEPS)
293 post_steps = create_commands(config.DPP_BUILD_POST_STEPS)
294 data[build_field] = pre_steps + data[build_field] + post_steps
296 # TODO: extend below functions for test and gcov command.
297 @staticmethod
298 def _preprocess_test_command(
299 func: Callable[[Dict, str], str], data: Dict[str, Any]
300 ):
301 pass
303 @staticmethod
304 def _preprocess_gcov_command(
305 func: Callable[[Dict, str], str], data: Dict[str, Any]
306 ):
307 for command_index, command in enumerate(data["gcov"].command):
308 for line_index, line in enumerate(command.lines):
309 command.lines[line_index] = func(MetaData._common_variables, line)
310 data["gcov"].command[command_index] = command
313class _LazyTaxonomy:
314 def __set_name__(self, owner, name):
315 self.name = f"_{name}"
317 def __get__(self, instance, owner):
318 try:
319 return getattr(owner, self.name)
320 except AttributeError:
321 setattr(owner, self.name, self._load_taxonomy(instance.base))
322 return getattr(owner, self.name)
324 @staticmethod
325 def _load_taxonomy(base: str) -> Dict[str, MetaData]:
326 d = dict(
327 [
328 (name, MetaData(name, f"{join(base, name)}"))
329 for _, name, is_pkg in iter_modules([dirname(__file__)])
330 if is_pkg
331 ]
332 )
333 return d
336class Taxonomy(Mapping):
337 _lazy_taxonomy = _LazyTaxonomy()
339 def __init__(self):
340 self.base: str = dirname(__file__)
342 @property
343 def _store(self) -> Dict[str, MetaData]:
344 return self._lazy_taxonomy
346 def __getitem__(self, key: str) -> MetaData:
347 return self._store[self._keytransform(key)]
349 def __iter__(self) -> Iterator:
350 return iter(self._store)
352 def __len__(self) -> int:
353 return len(self._store)
355 def _keytransform(self, key: str):
356 assert exists(
357 join(self.base, key, "meta.json")
358 ), f"Taxonomy '{key}' does not exist"
359 return key