Preview: advice_manager.py
Size: 6.20 KB
//opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/myimunify/advice/advice_manager.py
import logging
import hashlib
from clcommon.clwpos_lib import find_wp_paths
from defence360agent.api.server.events import EventsAPI
from defence360agent.contracts import config
from defence360agent.contracts.myimunify_id import get_myimunify_users
from defence360agent.utils.whmcs import get_upgrade_url_link
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.myimunify.advice.dataclass import MyImunifyWPAdvice
from defence360agent.myimunify.model import MyImunify
logger = logging.getLogger(__name__)
ADV_TYPE = "IMUNIFY_PROTECTION"
def get_myimunify_protection_status(username):
item = [username]
response = MyImunify.select().where(MyImunify.user.in_(item)).dicts()
if response and response[0].get("protection", False):
return "active"
else:
return "no"
async def _make_advice(imunify_advice):
"""
imunify advice item:
{"id": 123,
"server_id": null,
"type": "malware_found_myimun_2",
"date": 123,
"severity": 1,
"translation_id": "1",
"parameters": {},
"description": null,
"link_text": null,
"link": null,
"dashboard": false,
"popup": false,
"snoozed_until": 0,
"popup_title": null,
"popup_description": null,
"config_action": {}, "ignore": {},
"notification": false,
"smartadvice": true,
"smartadvice_title": "Web hosting user account is infected",
"smartadvice_description": "\nImunify detected live malware on the user account hosting this website:\n\n* inf1\n\n* inf2\n",
"smartadvice_user": "isuser",
"smartadvice_domain": "isuser.com",
"smartadvice_docroot": "/",
"ts": 123,
"first_generated": 123,
"iaid": "agent-iaid-123",
"notification_body_html": null,
"notification_period_limit": 0,
"notification_subject": null,
"notification_user": null}
->
{
"created_at": "2024-10-02T01:22:11.918688+00:00",
"updated_at": "2024-10-02T01:22:11.918688+00:00",
"metadata": {
"app": "imunify"
"username": "tkcpanel",
"domain": "tk-cpanel.com",
"website": "/",
"panel_url": "https://10.193.176.2:2083/cpsess0000000000/frontend/paper_lantern/lveversion/wpos.live.pl",
},
"advice": {
"id": "287718",
"type": "CPCSS",
"status": "review",
"description": "Turn on Critical Path CSS",
"is_premium": true,
"module_name": "critical_css",
"license_status": "NOT_REQUIRED",
"subscription": {
"status": "active",
"upgrade_url": "https://whmcs.dev.cloudlinux.com?username=tkcpanel&domain=tk-cpanel.com&server_ip=10.193.176.2&m=cloudlinux_advantage&action=provisioning&suite=accelerate_wp_premium"
},
"total_stages": 0,
"completed_stages": 0,
"detailed_description": "Critical Path CSS eliminates render-blocking CSS on your website and improves browser page render performance. Your website will load much faster for your visitors.\nNote: Applying the current advice will also enable the AccelerateWP feature."
}
}
"""
advices_by_docroot = []
username = imunify_advice["smartadvice_user"]
protection_status = get_myimunify_protection_status(username)
domain = imunify_advice["smartadvice_domain"]
infected_docroot = imunify_advice["smartadvice_docroot"]
upgrade_url = imunify_advice["upgrade_url"]
description = imunify_advice["smartadvice_title"]
detailed_description = imunify_advice["smartadvice_description"]
iaid = imunify_advice["iaid"]
panel_url = await HostingPanel().panel_user_link(username)
for site in find_wp_paths(infected_docroot):
website = f"/{site}"
# for analytics: iaid-hash(username-domain-website)
hashed_udw = hashlib.md5(
f"{username}-{domain}-{website}".encode("utf-8")
).hexdigest()
adv_id = f"{iaid}_{hashed_udw}"
im360_protection_advice = MyImunifyWPAdvice(
username=username,
domain=domain,
website=website,
panel_url=panel_url
+ "?show_cleanup_dialog=true", # Flag tells UI to display cleanup dialog
id=adv_id,
type=ADV_TYPE,
status="review",
description=description,
detailed_description=detailed_description,
is_premium=False,
module_name="imunify",
license_status="NOT_REQUIRED",
subscription_status=protection_status,
upgrade_url=upgrade_url,
total_stages=0,
completed_stages=0,
)
advices_by_docroot.append(im360_protection_advice.to_advice())
return advices_by_docroot
async def make_advice() -> list:
advices = []
advice_list = await get_advice_notifications()
logger.info("IM360 advice list: %s", str(advice_list))
if advice_list:
for item in advice_list:
try:
advice_item = await _make_advice(item)
except KeyError as e:
logger.error(
"Unable to make advice based on item: %s, malformed"
" error: %s",
str(item),
str(e),
)
continue
advices.extend(advice_item)
return advices
async def get_advice_notifications() -> [dict]:
users_to_report = set(
[
item["username"]
for item in await get_myimunify_users()
if not item["protection"]
]
)
users_to_pop = set()
for user in users_to_report:
conf = config.ConfigFile(user)
if not conf.get("CONTROL_PANEL", "smart_advice_allowed"):
users_to_pop.add(user)
users_to_report = users_to_report - users_to_pop
response = await EventsAPI.smart_advices()
logger.info(
"Smart Advice events API response "
"with notifications: %s, users to report: %s",
str(response),
str(users_to_report),
)
data = [
event
for event in response
if event.get("smartadvice_user") in users_to_report
]
for item in data:
item["upgrade_url"] = get_upgrade_url_link(
item.get("smartadvice_user"), item.get("smartadvice_domain")
)
return data
Directory Contents
Dirs: 1 × Files: 4