|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import copy |
| 4 | +import gzip |
| 5 | +import hashlib |
| 6 | +import logging |
| 7 | +import os |
| 8 | +from typing import TYPE_CHECKING, Any |
| 9 | +from urllib.parse import urlparse |
| 10 | + |
| 11 | +import orjson |
| 12 | + |
| 13 | +from vunnel.tool import fixdate |
| 14 | +from vunnel.utils import http_wrapper as http |
| 15 | +from vunnel.utils import vulnerability |
| 16 | + |
| 17 | +if TYPE_CHECKING: |
| 18 | + from collections.abc import Generator |
| 19 | + from types import TracebackType |
| 20 | + |
| 21 | + from vunnel import workspace |
| 22 | + |
| 23 | + |
| 24 | +class Parser: |
| 25 | + _release_ = "rolling" |
| 26 | + _secdb_dir_ = "secdb" |
| 27 | + _security_reference_url_ = "https://security.secureos.io" |
| 28 | + |
| 29 | + def __init__( # noqa: PLR0913 |
| 30 | + self, |
| 31 | + workspace: workspace.Workspace, |
| 32 | + url: str, |
| 33 | + namespace: str, |
| 34 | + fixdater: fixdate.Finder | None = None, |
| 35 | + download_timeout: int = 125, |
| 36 | + logger: logging.Logger | None = None, |
| 37 | + security_reference_url: str | None = None, |
| 38 | + ): |
| 39 | + if not fixdater: |
| 40 | + fixdater = fixdate.default_finder(workspace) |
| 41 | + self.fixdater = fixdater |
| 42 | + self.download_timeout = download_timeout |
| 43 | + self.secdb_dir_path = os.path.join(workspace.input_path, self._secdb_dir_) |
| 44 | + self.metadata_url = url.strip("/") if url else None |
| 45 | + self.url = url |
| 46 | + self.namespace = namespace |
| 47 | + self.security_reference_url = security_reference_url.strip("/") if security_reference_url else Parser._security_reference_url_ |
| 48 | + self.urls: list[str] = [] |
| 49 | + # Default filename for secdb (same as fixture filename) |
| 50 | + self._db_filename = "secdb.json" |
| 51 | + |
| 52 | + if not logger: |
| 53 | + logger = logging.getLogger(self.__class__.__name__) |
| 54 | + self.logger = logger |
| 55 | + |
| 56 | + def __enter__(self) -> Parser: |
| 57 | + self.fixdater.__enter__() |
| 58 | + return self |
| 59 | + |
| 60 | + def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None: |
| 61 | + self.fixdater.__exit__(exc_type, exc_val, exc_tb) |
| 62 | + |
| 63 | + @staticmethod |
| 64 | + def _extract_filename_from_url(url: str) -> str: |
| 65 | + return os.path.basename(urlparse(url).path) |
| 66 | + |
| 67 | + def build_reference_links(self, vulnerability_id: str) -> list[str]: |
| 68 | + urls = [] |
| 69 | + if vulnerability_id.startswith("CVE-"): |
| 70 | + urls.append(f"{self.security_reference_url}/{vulnerability_id}") |
| 71 | + links = vulnerability.build_reference_links(vulnerability_id) |
| 72 | + if links: |
| 73 | + urls.extend(links) |
| 74 | + return urls |
| 75 | + |
| 76 | + def _verify_sha256(self, file_path: str, expected_sha256: str) -> bool: |
| 77 | + """Verify the SHA256 hash of a file.""" |
| 78 | + sha256_hash = hashlib.sha256() |
| 79 | + with open(file_path, "rb") as f: |
| 80 | + for byte_block in iter(lambda: f.read(4096), b""): |
| 81 | + sha256_hash.update(byte_block) |
| 82 | + calculated_hash = sha256_hash.hexdigest() |
| 83 | + return calculated_hash == expected_sha256 |
| 84 | + |
| 85 | + def _download(self) -> None: |
| 86 | + """ |
| 87 | + Downloads secureos sec db files |
| 88 | + :return: |
| 89 | + """ |
| 90 | + if not os.path.exists(self.secdb_dir_path): |
| 91 | + os.makedirs(self.secdb_dir_path, exist_ok=True) |
| 92 | + |
| 93 | + self.fixdater.download() |
| 94 | + |
| 95 | + try: |
| 96 | + # First, fetch the latest.json metadata |
| 97 | + self.logger.info(f"downloading {self.namespace} metadata from {self.url}") |
| 98 | + r = http.get(self.url, self.logger, timeout=self.download_timeout) |
| 99 | + self.urls.append(self.url) |
| 100 | + |
| 101 | + metadata = orjson.loads(r.content) |
| 102 | + latest_url = metadata.get("latest_url") |
| 103 | + expected_sha256 = metadata.get("sha256") |
| 104 | + |
| 105 | + if not latest_url or not expected_sha256: |
| 106 | + raise ValueError("latest.json must contain 'latest_url' and 'sha256' fields") |
| 107 | + |
| 108 | + self.logger.info(f"downloading {self.namespace} secdb from {latest_url}") |
| 109 | + self.urls.append(latest_url) |
| 110 | + |
| 111 | + # Download the gzipped secdb file |
| 112 | + r = http.get(latest_url, self.logger, stream=True, timeout=self.download_timeout) |
| 113 | + |
| 114 | + gz_filename = self._extract_filename_from_url(latest_url) |
| 115 | + gz_file_path = os.path.join(self.secdb_dir_path, gz_filename) |
| 116 | + |
| 117 | + with open(gz_file_path, "wb") as fp: |
| 118 | + for chunk in r.iter_content(): |
| 119 | + fp.write(chunk) |
| 120 | + |
| 121 | + # Verify SHA256 |
| 122 | + self.logger.info(f"verifying SHA256 hash of {gz_filename}") |
| 123 | + if not self._verify_sha256(gz_file_path, expected_sha256): |
| 124 | + raise ValueError(f"SHA256 verification failed for {gz_filename}") |
| 125 | + |
| 126 | + self.logger.info("SHA256 verification successful") |
| 127 | + |
| 128 | + # Extract the gzipped file |
| 129 | + json_filename = gz_filename.replace(".gz", "") |
| 130 | + json_file_path = os.path.join(self.secdb_dir_path, json_filename) |
| 131 | + |
| 132 | + self.logger.info(f"extracting {gz_filename} to {json_filename}") |
| 133 | + with gzip.open(gz_file_path, "rb") as f_in, open(json_file_path, "wb") as f_out: |
| 134 | + f_out.write(f_in.read()) |
| 135 | + |
| 136 | + # Store the extracted filename for loading |
| 137 | + self._db_filename = json_filename |
| 138 | + |
| 139 | + except Exception: |
| 140 | + self.logger.exception(f"ignoring error processing secdb for {self.url}") |
| 141 | + |
| 142 | + def _load(self) -> Generator[tuple[str, dict[str, Any]], None, None]: |
| 143 | + """ |
| 144 | + Loads the secdb json and yields it |
| 145 | + :return: |
| 146 | + """ |
| 147 | + try: |
| 148 | + db_file_path = os.path.join(self.secdb_dir_path, self._db_filename) |
| 149 | + self.logger.debug(f"loading secdb data from: {db_file_path}") |
| 150 | + |
| 151 | + with open(db_file_path, "rb") as fh: |
| 152 | + dbtype_data_dict = orjson.loads(fh.read()) |
| 153 | + yield self._release_, dbtype_data_dict |
| 154 | + except Exception: |
| 155 | + self.logger.exception(f"failed to load {self.namespace} sec db data") |
| 156 | + raise |
| 157 | + |
| 158 | + def _normalize(self, release: str, data: dict[str, Any]) -> dict[str, Any]: # noqa: C901 |
| 159 | + """ |
| 160 | + Normalize all the sec db entries into vulnerability payload records |
| 161 | + :param release: |
| 162 | + :param data: |
| 163 | + :return: |
| 164 | + """ |
| 165 | + |
| 166 | + vuln_dict = {} |
| 167 | + |
| 168 | + self.logger.debug("normalizing vulnerability data") |
| 169 | + |
| 170 | + for el in data["packages"]: |
| 171 | + pkg_el = el["pkg"] |
| 172 | + |
| 173 | + pkg = pkg_el["name"] |
| 174 | + for fix_version in pkg_el["secfixes"]: |
| 175 | + vids = [] |
| 176 | + if pkg_el["secfixes"][fix_version]: |
| 177 | + for rawvid in pkg_el["secfixes"][fix_version]: |
| 178 | + tmp = rawvid.split() |
| 179 | + for newvid in tmp: |
| 180 | + if newvid not in vids: |
| 181 | + vids.append(newvid) |
| 182 | + |
| 183 | + for vid in vids: |
| 184 | + if vid not in vuln_dict: |
| 185 | + # create a new record |
| 186 | + vuln_dict[vid] = copy.deepcopy(vulnerability.vulnerability_element) |
| 187 | + vuln_record = vuln_dict[vid] |
| 188 | + reference_links = self.build_reference_links(vid) |
| 189 | + |
| 190 | + # populate the static information about the new vuln record |
| 191 | + vuln_record["Vulnerability"]["Name"] = str(vid) |
| 192 | + vuln_record["Vulnerability"]["NamespaceName"] = self.namespace + ":" + str(release) |
| 193 | + |
| 194 | + if reference_links: |
| 195 | + vuln_record["Vulnerability"]["Link"] = reference_links[0] |
| 196 | + |
| 197 | + vuln_record["Vulnerability"]["Severity"] = "Unknown" |
| 198 | + else: |
| 199 | + vuln_record = vuln_dict[vid] |
| 200 | + |
| 201 | + # SET UP fixedins |
| 202 | + ecosystem = self.namespace + ":" + str(release) |
| 203 | + fixed_el = { |
| 204 | + "Name": pkg, |
| 205 | + "Version": fix_version, |
| 206 | + "VersionFormat": "apk", |
| 207 | + "NamespaceName": ecosystem, |
| 208 | + } |
| 209 | + |
| 210 | + result = self.fixdater.best( |
| 211 | + vuln_id=str(vid), |
| 212 | + cpe_or_package=pkg, |
| 213 | + fix_version=fix_version, |
| 214 | + ecosystem=ecosystem, |
| 215 | + ) |
| 216 | + if result and result.date: |
| 217 | + fixed_el["Available"] = { |
| 218 | + "Date": result.date.isoformat(), |
| 219 | + "Kind": result.kind, |
| 220 | + } |
| 221 | + |
| 222 | + fixed_in = vuln_record["Vulnerability"]["FixedIn"] |
| 223 | + if isinstance(fixed_in, list): |
| 224 | + fixed_in.append(fixed_el) |
| 225 | + |
| 226 | + return vuln_dict |
| 227 | + |
| 228 | + @property |
| 229 | + def target_url(self) -> str: |
| 230 | + return self.url |
| 231 | + |
| 232 | + def get(self) -> Generator[tuple[str, dict[str, Any]], None, None]: |
| 233 | + """ |
| 234 | + Download, load and normalize secureos sec db and return a dict of release - list of vulnerability records |
| 235 | + :return: |
| 236 | + """ |
| 237 | + # download the data |
| 238 | + self._download() |
| 239 | + |
| 240 | + # load the data |
| 241 | + for release, dbtype_data_dict in self._load(): |
| 242 | + # normalize the loaded data |
| 243 | + yield release, self._normalize(release, dbtype_data_dict) |
0 commit comments