PATH:
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
application
import json import logging import subprocess from pathlib import Path from defence360agent.contracts import sentry from defence360agent.contracts.config import Core as Config from defence360agent.contracts.license import LicenseCLN from defence360agent.internals.iaid import IndependentAgentIDAPI from defence360agent.subsys.panels import hosting_panel from defence360agent.utils import ( stub_unexpected_error, is_root_user, ) from defence360agent.utils.ipecho import IPEchoAPI logger = logging.getLogger(__name__) SENTRY_TAGS_CACHE_PATH = Path("/var/imunify360/.sentry_tags") def dump_sentry_tags(): SENTRY_TAGS_CACHE_PATH.write_text(json.dumps(sentry._TAGS)) def cached_fill(): if SENTRY_TAGS_CACHE_PATH.exists(): try: sentry._TAGS = json.loads(SENTRY_TAGS_CACHE_PATH.read_text()) return except (json.JSONDecodeError, FileNotFoundError) as e: logger.warning( "Sentry cache file %s is malformed: %s", SENTRY_TAGS_CACHE_PATH, e, ) except PermissionError: pass fill(dump=False) def _set_additional_tags(): PRIMARY_IDS_STRATEGY = "PRIMARY_IDS" CSF_COOP_STRATEGY = "CSF_COOP" def _is_firewalld_running() -> bool: try: subprocess.check_output( ["firewall-cmd", "--state"], timeout=5, stderr=subprocess.DEVNULL, ) return True except ( IOError, subprocess.CalledProcessError, subprocess.TimeoutExpired, ): return False def _is_csf_running() -> bool: try: out = subprocess.check_output( ["/usr/sbin/csf", "--status"], stderr=subprocess.DEVNULL ) except (FileNotFoundError, subprocess.CalledProcessError): return False return (b"have been disabled" not in out) and ( b"You have an unresolved error when starting csf:" not in out ) @stub_unexpected_error def _get_current_firewall(): if _is_csf_running(): return "csf" if _is_firewalld_running(): return "firewalld" return "iptables" fw = _get_current_firewall() sentry.set_firewall_type(fw) if fw == "csf": sentry.set_strategy(CSF_COOP_STRATEGY) else: sentry.set_strategy(PRIMARY_IDS_STRATEGY) def fill(dump=True) -> None: @stub_unexpected_error def _get_hosting_panel(): return hosting_panel.HostingPanel().NAME sentry.set_av_version(Config.AV_VERSION) sentry.set_version(Config.VERSION) sentry.set_product_name(LicenseCLN.get_product_name()) if not is_root_user(): return sentry.set_server_id(LicenseCLN.get_server_id()) sentry.set_iaid(IndependentAgentIDAPI.get_iaid()) sentry.set_ip(stub_unexpected_error(IPEchoAPI.server_ip)()) sentry.set_hosting_panel(_get_hosting_panel()) sentry.set_test_env() _set_additional_tags() if dump: dump_sentry_tags()
[+]
..
[+]
__pycache__
[-] determine_hosting_panel.py
[edit]
[-] tags.py
[edit]
[-] __init__.py
[edit]
[-] settings.py
[edit]