PATH:
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
internals
import asyncio import socket import logging import struct logger = logging.getLogger(__name__) class UnixSocketAuthProtocol(asyncio.Protocol): """ This protocol uses SO_PEERCRED attribute of unix socket to get authentication data (pid, uid, gid) After connect, this values are stored in object's _pid, _uid, _gid attributes """ # ucred struct format (3 integers) # struct ucred # { # pid_t pid; /* PID of sending process. */ # uid_t uid; /* UID of sending process. */ # gid_t gid; /* GID of sending process. */ # }; # STRUCT_FORMAT = "3i" def connection_made(self, transport): self._transport = transport conn = self._transport.get_extra_info("socket") creds = conn.getsockopt( socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize(self.STRUCT_FORMAT), ) self._pid, self._uid, self._gid = struct.unpack( self.STRUCT_FORMAT, creds ) logger.debug( "New socket connection from pid=%s, uid=%s, gid=%s", self._pid, self._uid, self._gid, )
[-] persistent_message.py
[edit]
[-] global_scope.py
[edit]
[+]
..
[-] lazy_load.py
[edit]
[-] logging_protocol.py
[edit]
[-] auth_protocol.py
[edit]
[-] iaid.py
[edit]
[-] deadlock_detecting_lock.py
[edit]
[+]
__pycache__
[-] cln.py
[edit]
[-] logger.py
[edit]
[-] __init__.py
[edit]
[-] the_sink.py
[edit]