Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions nodescraper/plugins/inband/sys_settings/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from .analyzer_args import SysfsCheck, SysSettingsAnalyzerArgs
from .sys_settings_plugin import SysSettingsPlugin

__all__ = ["SysSettingsPlugin", "SysSettingsAnalyzerArgs", "SysfsCheck"]
68 changes: 68 additions & 0 deletions nodescraper/plugins/inband/sys_settings/analyzer_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from typing import Optional

from pydantic import BaseModel

from nodescraper.models import AnalyzerArgs


class SysfsCheck(BaseModel):
"""One sysfs check: path to read, acceptable values, and display name.

If expected is an empty list, the check is treated as passing (no constraint).
"""

path: str
expected: list[str]
name: str


class SysSettingsAnalyzerArgs(AnalyzerArgs):
"""Sysfs settings for analysis via a list of checks (path, expected values, name).

The path in each check is the sysfs path to read; the collector uses these paths
when collection_args is derived from analysis_args (e.g. by the plugin).
"""

checks: Optional[list[SysfsCheck]] = None

def paths_to_collect(self) -> list[str]:
"""Return the unique sysfs paths from checks, for use by the collector.

Returns:
List of unique path strings from self.checks, preserving order of first occurrence.
"""
if not self.checks:
return []
seen = set()
out = []
for c in self.checks:
p = c.path.rstrip("/")
if p not in seen:
seen.add(p)
out.append(c.path)
return out
32 changes: 32 additions & 0 deletions nodescraper/plugins/inband/sys_settings/collector_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from pydantic import BaseModel


class SysSettingsCollectorArgs(BaseModel):
"""Collection args for SysSettingsCollector: list of sysfs paths to read."""

paths: list[str] = []
127 changes: 127 additions & 0 deletions nodescraper/plugins/inband/sys_settings/sys_settings_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from typing import Optional, cast

from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus
from nodescraper.interfaces import DataAnalyzer
from nodescraper.models import TaskResult

from .analyzer_args import SysSettingsAnalyzerArgs
from .sys_settings_data import SysSettingsDataModel


def _get_actual_for_path(data: SysSettingsDataModel, path: str) -> Optional[str]:
"""Return the actual value from the data model for the given sysfs path.

Args:
data: Collected sysfs readings (path -> value).
path: Sysfs path (with or without trailing slash).

Returns:
Normalized value for that path, or None if not present.
"""
value = data.readings.get(path) or data.readings.get(path.rstrip("/"))
return (value or "").strip().lower() if value is not None else None


class SysSettingsAnalyzer(DataAnalyzer[SysSettingsDataModel, SysSettingsAnalyzerArgs]):
"""Check sysfs settings against expected values from the checks list."""

DATA_MODEL = SysSettingsDataModel

def analyze_data(
self, data: SysSettingsDataModel, args: Optional[SysSettingsAnalyzerArgs] = None
) -> TaskResult:
"""Compare sysfs data to expected settings from args.checks.

Args:
data: Collected sysfs readings to check.
args: Analyzer args with checks (path, expected, name). If None or no checks, returns OK.

Returns:
TaskResult with status OK if all checks pass, ERROR if any mismatch or missing path.
"""
mismatches = {}

if not args or not args.checks:
self.result.status = ExecutionStatus.OK
self.result.message = "No checks configured."
return self.result

for check in args.checks:
actual = _get_actual_for_path(data, check.path)
if actual is None:
mismatches[check.name] = {
"path": check.path,
"expected": check.expected,
"actual": None,
"reason": "path not collected by this plugin",
}
continue

if not check.expected:
continue
expected_normalized = [e.strip().lower() for e in check.expected]
if actual not in expected_normalized:
raw = data.readings.get(check.path) or data.readings.get(check.path.rstrip("/"))
mismatches[check.name] = {
"path": check.path,
"expected": check.expected,
"actual": raw,
}

if mismatches:
self.result.status = ExecutionStatus.ERROR
parts = []
for name, info in mismatches.items():
path = info.get("path", "")
expected = info.get("expected")
actual = cast(Optional[str], info.get("actual"))
reason = info.get("reason")
if reason:
part = f"{name} ({path})"
else:
part = f"{name} ({path}): expected one of {expected}, actual {repr(actual)}"
parts.append(part)
self.result.message = "Sysfs mismatch: " + "; ".join(parts)
self._log_event(
category=EventCategory.OS,
description="Sysfs mismatch detected",
data=mismatches,
priority=EventPriority.ERROR,
console_log=True,
)
else:
self._log_event(
category=EventCategory.OS,
description="Sysfs settings match expected",
priority=EventPriority.INFO,
console_log=True,
)
self.result.status = ExecutionStatus.OK
self.result.message = "Sysfs settings as expected."

return self.result
132 changes: 132 additions & 0 deletions nodescraper/plugins/inband/sys_settings/sys_settings_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
import re
from typing import Optional

from nodescraper.base import InBandDataCollector
from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily
from nodescraper.models import TaskResult

from .collector_args import SysSettingsCollectorArgs
from .sys_settings_data import SysSettingsDataModel

# Sysfs format: "[always] madvise never" -> extract bracketed value
BRACKETED_RE = re.compile(r"\[(\w+)\]")


def _parse_bracketed_setting(content: str) -> Optional[str]:
"""Extract the active setting from sysfs content (value in square brackets).

Args:
content: Raw sysfs file content (e.g. "[always] madvise never").

Returns:
The bracketed value if present, else None.
"""
if not content:
return None
match = BRACKETED_RE.search(content.strip())
return match.group(1).strip() if match else None


def _paths_from_args(args: Optional[SysSettingsCollectorArgs]) -> list[str]:
"""Extract list of sysfs paths from collection args.

Args:
args: Collector args containing paths to read, or None.

Returns:
List of sysfs paths; empty if args is None or args.paths is empty.
"""
if args is None:
return []
return list(args.paths) if args.paths else []


class SysSettingsCollector(InBandDataCollector[SysSettingsDataModel, SysSettingsCollectorArgs]):
"""Collect sysfs settings from user-specified paths (paths come from config/args)."""

DATA_MODEL = SysSettingsDataModel
SUPPORTED_OS_FAMILY: set[OSFamily] = {OSFamily.LINUX}

CMD = "cat {}"

def collect_data(
self, args: Optional[SysSettingsCollectorArgs] = None
) -> tuple[TaskResult, Optional[SysSettingsDataModel]]:
"""Collect sysfs values for each path in args.paths.

Args:
args: Collector args with paths to read; if None or empty paths, returns NOT_RAN.

Returns:
Tuple of (TaskResult, SysSettingsDataModel or None). Data is None on NOT_RAN or ERROR.
"""
if self.system_info.os_family != OSFamily.LINUX:
self._log_event(
category=EventCategory.OS,
description="Sysfs collection is only supported on Linux.",
priority=EventPriority.WARNING,
console_log=True,
)
return self.result, None

paths = _paths_from_args(args)
if not paths:
self.result.message = "No paths configured for sysfs collection"
self.result.status = ExecutionStatus.NOT_RAN
return self.result, None

readings: dict[str, str] = {}
for path in paths:
res = self._run_sut_cmd(self.CMD.format(path), sudo=False)
if res.exit_code == 0 and res.stdout:
value = _parse_bracketed_setting(res.stdout) or res.stdout.strip()
readings[path] = value
else:
self._log_event(
category=EventCategory.OS,
description=f"Failed to read sysfs path: {path}",
data={"exit_code": res.exit_code},
priority=EventPriority.WARNING,
console_log=True,
)

if not readings:
self.result.message = "Sysfs settings not read"
self.result.status = ExecutionStatus.ERROR
return self.result, None

sys_settings_data = SysSettingsDataModel(readings=readings)
self._log_event(
category=EventCategory.OS,
description="Sysfs settings collected",
data=sys_settings_data.model_dump(),
priority=EventPriority.INFO,
)
self.result.message = f"Sysfs collected {len(readings)} path(s)"
self.result.status = ExecutionStatus.OK
return self.result, sys_settings_data
Loading