403Webshell
Server IP : 172.67.206.42  /  Your IP : 104.23.243.51
Web Server : Apache
System : Linux server.localhost.com 6.8.0-85-generic #85-Ubuntu SMP PREEMPT_DYNAMIC Thu Sep 18 15:26:59 UTC 2025 x86_64
User : pahana ( 1029)
PHP Version : 7.4.33
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/firewall/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/firewall/dbus_utils.py
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright (C) 2011-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <[email protected]>

import dbus
import pwd
import xml.etree.ElementTree as ET

from firewall.core.logger import log


def command_of_pid(pid):
    """Get command for pid from /proc"""
    try:
        with open("/proc/%d/cmdline" % pid, "r") as f:
            cmd = f.readlines()[0].replace("\0", " ").strip()
    except Exception:
        return None
    return cmd


def pid_of_sender(bus, sender):
    """Get pid from sender string using
    org.freedesktop.DBus.GetConnectionUnixProcessID"""

    dbus_obj = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus")
    dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus")

    try:
        pid = int(dbus_iface.GetConnectionUnixProcessID(sender))
    except ValueError:
        return None
    return pid


def uid_of_sender(bus, sender):
    """Get user id from sender string using
    org.freedesktop.DBus.GetConnectionUnixUser"""

    dbus_obj = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus")
    dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus")

    try:
        uid = int(dbus_iface.GetConnectionUnixUser(sender))
    except ValueError:
        return None
    return uid


def user_of_uid(uid):
    """Get user for uid from pwd"""

    try:
        pws = pwd.getpwuid(uid)
    except Exception:
        return None
    return pws[0]


def context_of_sender(bus, sender):
    """Get SELinux context from sender string using
    org.freedesktop.DBus.GetConnectionSELinuxSecurityContext"""

    dbus_obj = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus")
    dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus")

    try:
        context = dbus_iface.GetConnectionSELinuxSecurityContext(sender)
    except Exception:
        return None

    return "".join(map(chr, dbus_to_python(context)))


def command_of_sender(bus, sender):
    """Return command of D-Bus sender"""

    return command_of_pid(pid_of_sender(bus, sender))


def user_of_sender(bus, sender):
    return user_of_uid(uid_of_sender(bus, sender))


def dbus_to_python(obj, expected_type=None):
    if obj is None:
        python_obj = obj
    elif isinstance(obj, dbus.Boolean):
        python_obj = bool(obj)
    elif isinstance(obj, dbus.String):
        python_obj = str(obj)
    elif isinstance(obj, dbus.ObjectPath):
        python_obj = str(obj)
    elif (
        isinstance(obj, dbus.Byte)
        or isinstance(obj, dbus.Int16)
        or isinstance(obj, dbus.Int32)
        or isinstance(obj, dbus.Int64)
        or isinstance(obj, dbus.UInt16)
        or isinstance(obj, dbus.UInt32)
        or isinstance(obj, dbus.UInt64)
    ):
        python_obj = int(obj)
    elif isinstance(obj, dbus.Double):
        python_obj = float(obj)
    elif isinstance(obj, dbus.Array):
        python_obj = [dbus_to_python(x) for x in obj]
    elif isinstance(obj, dbus.Struct):
        python_obj = tuple([dbus_to_python(x) for x in obj])
    elif isinstance(obj, dbus.Dictionary):
        python_obj = {dbus_to_python(k): dbus_to_python(v) for k, v in obj.items()}
    elif (
        isinstance(obj, bool)
        or isinstance(obj, str)
        or isinstance(obj, bytes)
        or isinstance(obj, int)
        or isinstance(obj, float)
        or isinstance(obj, list)
        or isinstance(obj, tuple)
        or isinstance(obj, dict)
    ):
        python_obj = obj
    else:
        raise TypeError("Unhandled %s" % repr(obj))

    if expected_type is None:
        # no type validation requested.
        pass
    elif isinstance(python_obj, expected_type):
        # we are good, the result has the requested type.
        # for example, expected_type is "str"
        pass
    elif isinstance(obj, expected_type):
        # we are also good. The expected_type might be something like
        # dbus.ObjectPath, and the caller asserts that this is an "o"
        # type. The result is of course just a plain "str" type.
        #
        # This allows to check that the type is for example a dbus.ObjectPath
        # and not merely a string (which provides additional consistency
        # guarantees, like not being empty and well-formed).
        pass
    else:
        raise TypeError(
            "%s is %s, expected %s" % (python_obj, type(python_obj), expected_type)
        )

    return python_obj


def dbus_to_python_args(dbus_args, *expected_types):
    # Checks that dbus_args is a list of D-Bus arguments, of the expected type.
    dbus_args = tuple(dbus_args)
    if len(dbus_args) != len(expected_types):
        # The number of arguments must match with the expected_types.
        raise TypeError("Unexpected number of arguments")
    return tuple(dbus_to_python(a, expected_types[i]) for i, a in enumerate(dbus_args))


def dbus_signature(obj):
    if isinstance(obj, dbus.Boolean):
        return "b"
    elif isinstance(obj, dbus.String):
        return "s"
    elif isinstance(obj, dbus.ObjectPath):
        return "o"
    elif isinstance(obj, dbus.Byte):
        return "y"
    elif isinstance(obj, dbus.Int16):
        return "n"
    elif isinstance(obj, dbus.Int32):
        return "i"
    elif isinstance(obj, dbus.Int64):
        return "x"
    elif isinstance(obj, dbus.UInt16):
        return "q"
    elif isinstance(obj, dbus.UInt32):
        return "u"
    elif isinstance(obj, dbus.UInt64):
        return "t"
    elif isinstance(obj, dbus.Double):
        return "d"
    elif isinstance(obj, dbus.Array):
        if len(obj.signature) > 1:
            return "a(%s)" % obj.signature
        else:
            return "a%s" % obj.signature
    elif isinstance(obj, dbus.Struct):
        return "(%s)" % obj.signature
    elif isinstance(obj, dbus.Dictionary):
        return "a{%s}" % obj.signature
    else:
        raise TypeError("Unhandled %s" % repr(obj))


def dbus_introspection_prepare_properties(obj, interface, access=None):
    if access is None:
        access = {}

    if not hasattr(obj, "_fw_dbus_properties"):
        setattr(obj, "_fw_dbus_properties", {})
    dip = getattr(obj, "_fw_dbus_properties")
    dip[interface] = {}

    try:
        _dict = obj.GetAll(interface)
    except Exception:
        _dict = {}
    for key, value in _dict.items():
        dip[interface][key] = {"type": dbus_signature(value)}
        if key in access:
            dip[interface][key]["access"] = access[key]
        else:
            dip[interface][key]["access"] = "read"


def dbus_introspection_add_properties(obj, data, interface):
    modified = False

    if hasattr(obj, "_fw_dbus_properties"):
        dip = getattr(obj, "_fw_dbus_properties")
        if isinstance(dip, dict) and interface in dip:
            doc = ET.fromstring(data)
            for node in doc.iter("interface"):
                if "name" in node.attrib and node.attrib["name"] == interface:
                    for key, value in dip[interface].items():
                        attrib = {
                            "name": key,
                            "type": value["type"],
                            "access": value["access"],
                        }
                        ET.SubElement(node, "property", attrib)
                        modified = True

    if modified:
        data = ET.tostring(doc, encoding="unicode")
        log.debug10(data)

    return data


def dbus_introspection_add_deprecated(
    obj, data, interface, deprecated_methods, deprecated_signals
):
    modified = False
    is_deprecated_method = interface in deprecated_methods
    is_deprecated_signal = interface in deprecated_signals

    if is_deprecated_method or is_deprecated_signal:
        attrib = {
            "name": "org.freedesktop.DBus.Deprecated",
            "value": "true",
        }
        doc = ET.fromstring(data)

        for node in doc.iter("interface"):
            if "name" in node.attrib and node.attrib["name"] == interface:
                if is_deprecated_method:
                    for method_node in node.iter("method"):
                        if (
                            "name" in method_node.attrib
                            and method_node.attrib["name"]
                            in deprecated_methods[interface]
                        ):
                            ET.SubElement(method_node, "annotation", attrib)
                            modified = True

                if is_deprecated_signal:
                    for signal_node in node.iter("signal"):
                        if (
                            "name" in signal_node.attrib
                            and signal_node.attrib["name"]
                            in deprecated_signals[interface]
                        ):
                            ET.SubElement(signal_node, "annotation", attrib)
                            modified = True

        if modified:
            data = ET.tostring(doc, encoding="unicode")
            log.debug10(data)

    return data

Youez - 2016 - github.com/yon3zu
LinuXploit