REDROOM
PHP 8.2.31
Path:
Logout
Edit File
Size: 4.87 KB
Close
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/feature_flags.py
Text
Base64
""" Shared reader for the local feature flags file. The file is written by: - Go resident-agent FeatureFlags plugin (IM360 mode) - Python FeatureFlagsSync plugin (AV mode) Other subsystems (e.g. message_status_publisher) use this module to check individual flag values at runtime. Supported JSON shapes on disk (readers / ``is_enabled``): - Legacy object ``{"mqtt_tracking": true, ...}`` (preferred on disk after sync). - JSON array of enabled names ``["mqtt_tracking"]`` (still accepted). - Legacy wrapper ``{"flags": ["mqtt_tracking", ...]}``. The sync API checksum is always over a **canonical JSON array** of enabled names (same as correlation sync). ``sync_checksum_hex_from_flags_file`` derives that MD5 from whatever shape is on disk. The sync plugin also writes ``FLAGS_PLAIN_PATH`` (``/var/imunify360/feature_flags``): plain text, one enabled flag name per line (sorted), for scripts. """ from __future__ import annotations import hashlib import json import os from typing import Any FLAGS_PATH = "/var/imunify360/feature_flags.json" # Plain list of enabled flag names (one per line), same order as sorted JSON array. FLAGS_PLAIN_PATH = "/var/imunify360/feature_flags" _cached_flags: dict[str, Any] = {} _cached_mtime: float = 0.0 def _normalize_flags_from_file(raw: Any) -> dict[str, Any]: """Map file JSON to a flat name->value dict for :func:`is_enabled`.""" if raw is None: return {} if isinstance(raw, list): out: dict[str, Any] = {} for item in raw: if isinstance(item, str): out[item] = True return out if isinstance(raw, dict): inner = raw.get("flags") if isinstance(inner, list): return _normalize_flags_from_file(inner) return raw return {} def _read_flags() -> dict[str, Any]: global _cached_flags, _cached_mtime try: mtime = os.path.getmtime(FLAGS_PATH) except OSError: _cached_flags = {} _cached_mtime = 0.0 return _cached_flags if mtime == _cached_mtime: return _cached_flags try: with open(FLAGS_PATH) as f: raw = json.load(f) _cached_flags = _normalize_flags_from_file(raw) except (OSError, json.JSONDecodeError): _cached_flags = {} _cached_mtime = mtime return _cached_flags def enabled_flag_names_sorted(flags: Any) -> list[str]: """Return sorted enabled flag names for JSON and plain-text sidecar. Accepts the same shapes as :func:`_normalize_flags_from_file` (array, flat map, ``{"flags": [...]}``) so checksums and sidecars match Go ``enabledNamesSortedForChecksum`` / :func:`is_enabled`. """ if not isinstance(flags, (list, dict)): raise TypeError( f"flags must be list or dict, not {type(flags).__name__}" ) normalized = _normalize_flags_from_file(flags) return sorted(k for k, v in normalized.items() if v) def canonical_sync_flag_list_bytes(names: list[str]) -> bytes: """JSON array bytes used for sync MD5 (matches correlation_api checksum_for_sync_flag_list).""" ordered = sorted(names) return json.dumps(ordered, sort_keys=True, indent=2).encode() def sync_checksum_hex_from_flags_file(path: str) -> str: """MD5 hex of canonical enabled-name array for ``path``, or "" if missing/invalid.""" try: with open(path, encoding="utf-8") as f: raw = json.load(f) except (OSError, UnicodeDecodeError, json.JSONDecodeError): return "" names = enabled_flag_names_sorted(raw) payload = canonical_sync_flag_list_bytes(names) return hashlib.md5(payload, usedforsecurity=False).hexdigest() def legacy_feature_flags_map_bytes(names: list[str]) -> bytes: """On-disk legacy JSON: ``{flag: true, ...}`` with sorted keys.""" d = {n: True for n in sorted({x for x in names if isinstance(x, str)})} return json.dumps(d, sort_keys=True, indent=2).encode() def plain_text_payload_for_enabled_flags(flags: Any) -> bytes: """Body for ``FLAGS_PLAIN_PATH``: one name per line, trailing newline if non-empty.""" names = enabled_flag_names_sorted(flags) if not names: return b"" return ("\n".join(names) + "\n").encode() def serialize_feature_flags_file_payload(flags: Any) -> bytes: """Serialize dict flags for writing ``FLAGS_PATH`` (legacy map only).""" if isinstance(flags, dict): return json.dumps(flags, sort_keys=True, indent=2).encode() raise TypeError(f"flags must be dict, not {type(flags).__name__}") def is_enabled(flag_name: str, default: bool = False) -> bool: """Return whether *flag_name* is enabled. If the file is missing, unreadable, or the flag is absent, *default* is returned. Defaults to False so unknown flags are treated as disabled unless the caller explicitly opts in. """ flags = _read_flags() value = flags.get(flag_name) if value is None: return default return bool(value)
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 13
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-08 20:24:30
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
auth_protocol.py
1.18 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
cln.py
13.52 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
deadlock_detecting_lock.py
755 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
feature_flags.py
4.87 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
global_scope.py
462 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
iaid.py
14.32 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
lazy_load.py
195 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
logger.py
14.81 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
logging_protocol.py
1.07 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
message_status_publisher.py
5.68 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
persistent_message.py
3.02 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
the_sink.py
11.24 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
0 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).