REDROOM
PHP 8.2.31
Path:
Logout
Edit File
Size: 5.32 KB
Close
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/hooks/execute.py
Text
Base64
import asyncio import json import os import stat import tempfile from defence360agent.contracts.config import Core from defence360agent.hooks import native as native_hooks from defence360agent.internals.logger import EventHookLogger from defence360agent.model.event_hook import EventHook from defence360agent.model.instance import db from defence360agent.utils import run, snake_case event_hook_logger = EventHookLogger() def get_hooks(event): # if database is not available (i.e. direct RPC call), do not try to # load hooks if db.deferred: return [] hooks = EventHook.select().where(EventHook.event == event) return list(hooks) def _validate_hook_path(path, native=False): """Raise ValueError if path is not a safe hook file. The original check rejected any path under /tmp, /var/tmp, /dev/shm on the grounds that those dirs are world-writable. That blanket- by-prefix rule was too coarse: pytest's tmp_path lives under /tmp/pytest-of-<user>/... and the agent's own integration fixtures legitimately put hook files there. The real threats are (a) an attacker-owned file (DB row points at a path the attacker controls) and (b) a hook whose immediate parent is world-writable so the file can be swapped between this check and the exec. The required permission bit differs between branches: subprocess hooks are exec'd by the kernel (needs X_OK), but native hooks are loaded via importlib's open()+exec_module path which only needs R_OK. A standard Python file in mode 0o644 is loadable but not executable, so requiring +x for native hooks would silently break the typical native-hook deployment (the `hook add-native` RPC has never required or documented an executable bit). """ if not os.path.isfile(path): raise ValueError( "Hook path does not exist or is not a file: {}".format(path) ) if native: if not os.access(path, os.R_OK): raise ValueError("Hook path is not readable: {}".format(path)) else: if not os.access(path, os.X_OK): raise ValueError("Hook path is not executable: {}".format(path)) real = os.path.realpath(path) try: st = os.stat(real) except OSError as exc: raise ValueError("Hook path stat failed: {}: {}".format(path, exc)) # Reject world-writable files: any unprivileged user could rewrite # them between this check and the subprocess/importlib load. if st.st_mode & stat.S_IWOTH: raise ValueError("Hook path is world-writable: {}".format(path)) parent = os.path.dirname(real) if parent and parent != "/": try: pst = os.stat(parent) except OSError as exc: raise ValueError( "Hook parent stat failed: {}: {}".format(parent, exc) ) # A world-writable parent without the sticky bit means an # attacker can replace our hook by deleting+recreating the # file. /tmp itself has the sticky bit so renames are owner- # only, which is safe; pytest's tmp_path subdirs are mode 700. if (pst.st_mode & stat.S_IWOTH) and not (pst.st_mode & stat.S_ISVTX): raise ValueError( "Hook path has world-writable parent without sticky bit" " {}: {}".format(parent, path) ) async def execute_hook(path, data, native=False): try: # Path validation runs filesystem syscalls (isfile/access/realpath) # which can block the event loop on slow/NFS storage; defer to a # threadpool executor. The same checks apply to native hooks # because native_hooks.execute_hook imports the file via importlib # straight in the agent's root process — a DB-sourced /tmp path # there is at least as dangerous as a subprocess fork. loop = asyncio.get_event_loop() await loop.run_in_executor(None, _validate_hook_path, path, native) if native: native_hooks.execute_hook(path, data) exit_code, err = 0, None else: data = json.dumps(data).encode() cwd = os.path.dirname(path) exit_code, _, err = await run( [path], shell=False, input=data, cwd=cwd ) except Exception as e: exit_code, err = None, repr(e) return exit_code, err async def execute_hooks(event, tempdir=Core.TMPDIR): dump = event.get("DUMP") params = dict(event) hooks = get_hooks(event.event) if not hooks: return with event_hook_logger(event.event, event.subtype) as event_logger: if dump: prefix = snake_case(event.__class__.__name__) + "_" tmp = tempfile.NamedTemporaryFile( mode="w+", prefix=prefix, suffix=".json", dir=tempdir ) json.dump(dump, tmp) tmp.flush() os.fsync(tmp.fileno()) params["tmp_filename"] = tmp.name data = { "event": event.event, "subtype": event.subtype, "params": params, } for hook in hooks: with event_logger(hook.path, native=hook.native) as hook_logger: hook_logger.begin() exit_code, err = await execute_hook( hook.path, data, native=hook.native ) hook_logger.finish(exit_code, err)
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 3
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-08 20:24:30
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
execute.py
5.32 KB
lrw-r--r--
2026-05-26 21:20:45
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
native.py
968 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
0 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).