403Webshell
Server IP : 104.21.93.65  /  Your IP : 104.23.197.32
Web Server : Apache
System : Linux server.localhost.com 6.8.0-85-generic #85-Ubuntu SMP PREEMPT_DYNAMIC Thu Sep 18 15:26:59 UTC 2025 x86_64
User : pahana ( 1029)
PHP Version : 7.4.33
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/firewall/core/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/firewall/core/rich.py
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright (C) 2013-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <[email protected]>

from firewall import functions
from firewall.core.ipset import check_ipset_name
from firewall.core.base import REJECT_TYPES
from firewall import errors
from firewall.errors import FirewallError


# Dummy class for EOL singleton instance. It's a class, so we
# can overwrite __repr__().
class _EOLType:
    def __repr__(self):
        # The string representation is the full name of the instance.
        return "firewall.core.rich.EOL"


# A EndOfLine singleton instance to indicate EOL token.
EOL = _EOLType()


class _Rich_Entry:
    def check(self, family=None):
        pass


class _Rich_EntryWithLimit(_Rich_Entry):
    def __init__(self, limit=None):
        self.limit = limit

    def check(self, family=None):
        if self.limit is not None:
            self.limit.check(family=family)


class _Rich_Element(_Rich_Entry):
    pass


class _Rich_Action(_Rich_EntryWithLimit):
    pass


class _Rich_Log(_Rich_EntryWithLimit):
    pass


class Rich_Source(_Rich_Entry):
    def __init__(self, addr, mac, ipset, invert=False):
        self.addr = addr
        if self.addr == "":
            self.addr = None
        self.mac = mac
        if self.mac == "" or self.mac is None:
            self.mac = None
        elif self.mac is not None:
            self.mac = self.mac.upper()
        self.ipset = ipset
        if self.ipset == "":
            self.ipset = None
        self.invert = invert
        if self.addr is None and self.mac is None and self.ipset is None:
            raise FirewallError(errors.INVALID_RULE, "no address, mac and ipset")

    def __str__(self):
        ret = "source%s " % (" NOT" if self.invert else "")
        if self.addr is not None:
            return ret + 'address="%s"' % self.addr
        elif self.mac is not None:
            return ret + 'mac="%s"' % self.mac
        elif self.ipset is not None:
            return ret + 'ipset="%s"' % self.ipset
        raise FirewallError(errors.INVALID_RULE, "no address, mac and ipset")

    def check(self, family=None):
        if self.addr is not None:
            if family is None:
                raise FirewallError(errors.INVALID_FAMILY)
            if self.mac is not None:
                raise FirewallError(errors.INVALID_RULE, "address and mac")
            if self.ipset is not None:
                raise FirewallError(errors.INVALID_RULE, "address and ipset")
            if not functions.check_address(family, self.addr):
                raise FirewallError(errors.INVALID_ADDR, str(self.addr))
        elif self.mac is not None:
            if self.ipset is not None:
                raise FirewallError(errors.INVALID_RULE, "mac and ipset")
            if not functions.check_mac(self.mac):
                raise FirewallError(errors.INVALID_MAC, str(self.mac))
        elif self.ipset is not None:
            if not check_ipset_name(self.ipset):
                raise FirewallError(errors.INVALID_IPSET, str(self.ipset))
        else:
            raise FirewallError(errors.INVALID_RULE, "invalid source")


class Rich_Destination(_Rich_Entry):
    def __init__(self, addr, ipset, invert=False):
        self.addr = addr
        if self.addr == "":
            self.addr = None
        self.ipset = ipset
        if self.ipset == "":
            self.ipset = None
        self.invert = invert
        if self.addr is None and self.ipset is None:
            raise FirewallError(errors.INVALID_RULE, "no address and ipset")

    def __str__(self):
        ret = "destination%s " % (" NOT" if self.invert else "")
        if self.addr is not None:
            return ret + 'address="%s"' % self.addr
        elif self.ipset is not None:
            return ret + 'ipset="%s"' % self.ipset
        raise FirewallError(errors.INVALID_RULE, "no address and ipset")

    def check(self, family=None):
        if self.addr is not None:
            if family is None:
                raise FirewallError(errors.INVALID_FAMILY)
            if self.ipset is not None:
                raise FirewallError(errors.INVALID_DESTINATION, "address and ipset")
            if not functions.check_address(family, self.addr):
                raise FirewallError(errors.INVALID_ADDR, str(self.addr))
        elif self.ipset is not None:
            if not check_ipset_name(self.ipset):
                raise FirewallError(errors.INVALID_IPSET, str(self.ipset))
        else:
            raise FirewallError(errors.INVALID_RULE, "invalid destination")


class Rich_Service(_Rich_Element):
    def __init__(self, name):
        self.name = name

    def __str__(self):
        return 'service name="%s"' % (self.name)

    def check(self, family=None):
        if self.name is None or len(self.name) < 1:
            raise FirewallError(errors.INVALID_SERVICE, str(self.name))


class Rich_Port(_Rich_Element):
    def __init__(self, port, protocol):
        self.port = port
        self.protocol = protocol

    def __str__(self):
        return 'port port="%s" protocol="%s"' % (self.port, self.protocol)

    def check(self, family=None):
        if not functions.check_port(self.port):
            raise FirewallError(errors.INVALID_PORT, self.port)
        if self.protocol not in ["tcp", "udp", "sctp", "dccp"]:
            raise FirewallError(errors.INVALID_PROTOCOL, self.protocol)


class Rich_SourcePort(_Rich_Element):
    def __init__(self, port, protocol):
        self.port = port
        self.protocol = protocol

    def __str__(self):
        return 'source-port port="%s" protocol="%s"' % (self.port, self.protocol)

    def check(self, family=None):
        if not functions.check_port(self.port):
            raise FirewallError(errors.INVALID_PORT, self.port)
        if self.protocol not in ["tcp", "udp", "sctp", "dccp"]:
            raise FirewallError(errors.INVALID_PROTOCOL, self.protocol)


class Rich_Protocol(_Rich_Element):
    def __init__(self, value):
        self.value = value

    def __str__(self):
        return 'protocol value="%s"' % (self.value)

    def check(self, family=None):
        if not functions.checkProtocol(self.value):
            raise FirewallError(errors.INVALID_PROTOCOL, self.value)


class Rich_Masquerade(_Rich_Element):
    def __str__(self):
        return "masquerade"


class Rich_IcmpBlock(_Rich_Element):
    def __init__(self, name):
        self.name = name

    def __str__(self):
        return 'icmp-block name="%s"' % (self.name)

    def check(self, family=None):
        if self.name is None or len(self.name) < 1:
            raise FirewallError(errors.INVALID_ICMPTYPE, str(self.name))


class Rich_IcmpType(_Rich_Element):
    def __init__(self, name):
        self.name = name

    def __str__(self):
        return 'icmp-type name="%s"' % (self.name)

    def check(self, family=None):
        if self.name is None or len(self.name) < 1:
            raise FirewallError(errors.INVALID_ICMPTYPE, str(self.name))


class Rich_Tcp_Mss_Clamp(_Rich_Element):
    def __init__(self, value):
        self.value = value

    def __str__(self):
        if self.value:
            return 'tcp-mss-clamp value="%s"' % (self.value)
        else:
            return "tcp-mss-clamp"

    def check(self, family=None):
        if self.value:
            if not functions.checkTcpMssClamp(self.value):
                raise FirewallError(errors.INVALID_RULE, self.value)


class Rich_ForwardPort(_Rich_Element):
    def __init__(self, port, protocol, to_port, to_address):
        self.port = port
        self.protocol = protocol
        self.to_port = to_port
        self.to_address = to_address
        # replace None with "" in to_port and/or to_address
        if self.to_port is None:
            self.to_port = ""
        if self.to_address is None:
            self.to_address = ""

    def __str__(self):
        return 'forward-port port="%s" protocol="%s"%s%s' % (
            self.port,
            self.protocol,
            ' to-port="%s"' % self.to_port if self.to_port != "" else "",
            ' to-addr="%s"' % self.to_address if self.to_address != "" else "",
        )

    def check(self, family=None):
        if not functions.check_port(self.port):
            raise FirewallError(errors.INVALID_PORT, self.port)
        if self.protocol not in ["tcp", "udp", "sctp", "dccp"]:
            raise FirewallError(errors.INVALID_PROTOCOL, self.protocol)
        if self.to_port == "" and self.to_address == "":
            raise FirewallError(errors.INVALID_PORT, self.to_port)
        if self.to_port != "" and not functions.check_port(self.to_port):
            raise FirewallError(errors.INVALID_PORT, self.to_port)
        if self.to_address != "" and not functions.check_single_address(
            family, self.to_address
        ):
            raise FirewallError(errors.INVALID_ADDR, self.to_address)
        if family is None:
            raise FirewallError(errors.INVALID_FAMILY)


class Rich_Log(_Rich_Log):
    def __init__(self, prefix=None, level=None, limit=None):
        super().__init__(limit=limit)
        # TODO check default level in iptables
        self.prefix = prefix
        self.level = level

    def __str__(self):
        return "log%s%s%s" % (
            ' prefix="%s"' % (self.prefix) if self.prefix else "",
            ' level="%s"' % (self.level) if self.level else "",
            " %s" % self.limit if self.limit else "",
        )

    def check(self, family=None):
        if self.prefix and len(self.prefix) > 127:
            raise FirewallError(
                errors.INVALID_LOG_PREFIX, "maximum accepted length of 'prefix' is 127."
            )

        if self.level and self.level not in [
            "emerg",
            "alert",
            "crit",
            "error",
            "warning",
            "notice",
            "info",
            "debug",
        ]:
            raise FirewallError(errors.INVALID_LOG_LEVEL, self.level)

        super().check(family=family)


class Rich_NFLog(_Rich_Log):
    def __init__(self, group=None, prefix=None, queue_size=None, limit=None):
        super().__init__(limit=limit)
        self.group = group
        self.prefix = prefix
        self.threshold = queue_size

    def __str__(self):
        return "nflog%s%s%s%s" % (
            ' group="%s"' % (self.group) if self.group else "",
            ' prefix="%s"' % (self.prefix) if self.prefix else "",
            ' queue-size="%s"' % (self.threshold) if self.threshold else "",
            " %s" % self.limit if self.limit else "",
        )

    def check(self, family=None):
        if self.group and not functions.checkUINT16(self.group):
            raise FirewallError(
                errors.INVALID_NFLOG_GROUP,
                "nflog 'group' must be an integer between 0 and 65535.",
            )

        if self.prefix and len(self.prefix) > 127:
            raise FirewallError(
                errors.INVALID_LOG_PREFIX, "maximum accepted length of 'prefix' is 127."
            )

        if self.threshold and not functions.checkUINT16(self.threshold):
            raise FirewallError(
                errors.INVALID_NFLOG_QUEUE,
                "nflog 'queue-size' must be an integer between 0 and 65535.",
            )

        super().check(family=family)


class Rich_Audit(_Rich_EntryWithLimit):
    def __init__(self, limit=None):
        # TODO check default level in iptables
        super().__init__(limit=limit)

    def __str__(self):
        return "audit%s" % (" %s" % self.limit if self.limit else "")


class Rich_Accept(_Rich_Action):
    def __init__(self, limit=None):
        super().__init__(limit=limit)

    def __str__(self):
        return "accept%s" % (" %s" % self.limit if self.limit else "")


class Rich_Reject(_Rich_Action):
    def __init__(self, _type=None, limit=None):
        super().__init__(limit=limit)
        self.type = _type

    def __str__(self):
        return "reject%s%s" % (
            ' type="%s"' % self.type if self.type else "",
            " %s" % self.limit if self.limit else "",
        )

    def check(self, family=None):
        if self.type:
            if family not in ["ipv4", "ipv6"]:
                raise FirewallError(
                    errors.INVALID_RULE,
                    "When using reject type you must specify also rule family.",
                )
            if self.type not in REJECT_TYPES[family]:
                valid_types = ", ".join(REJECT_TYPES[family])
                raise FirewallError(
                    errors.INVALID_RULE,
                    "Wrong reject type %s.\nUse one of: %s." % (self.type, valid_types),
                )

        super().check(family=family)


class Rich_Drop(_Rich_Action):
    def __init__(self, limit=None):
        super().__init__(limit=limit)

    def __str__(self):
        return "drop%s" % (" %s" % self.limit if self.limit else "")


class Rich_Mark(_Rich_Action):
    def __init__(self, _set, limit=None):
        super().__init__(limit=limit)
        self.set = _set

    def __str__(self):
        return "mark set=%s%s" % (self.set, " %s" % self.limit if self.limit else "")

    def check(self, family=None):
        if self.set is not None:
            x = self.set
        else:
            raise FirewallError(errors.INVALID_MARK, "no value set")

        if "/" in x:
            splits = x.split("/")
            if len(splits) != 2:
                raise FirewallError(errors.INVALID_MARK, x)
            if not functions.checkUINT32(splits[0]) or not functions.checkUINT32(
                splits[1]
            ):
                # value and mask are uint32
                raise FirewallError(errors.INVALID_MARK, x)
        else:
            if not functions.checkUINT32(x):
                # value is uint32
                raise FirewallError(errors.INVALID_MARK, x)

        super().check(family=family)


class Rich_Limit(_Rich_Entry):
    def __init__(self, value):
        self.value = value
        if "/" in self.value:
            splits = self.value.split("/")
            if len(splits) == 2 and splits[1] in ["second", "minute", "hour", "day"]:
                self.value = "%s/%s" % (splits[0], splits[1][:1])

    def check(self, family=None):
        splits = None
        if "/" in self.value:
            splits = self.value.split("/")
        if not splits or len(splits) != 2:
            raise FirewallError(errors.INVALID_LIMIT, self.value)
        (rate, duration) = splits
        try:
            rate = int(rate)
        except:
            raise FirewallError(errors.INVALID_LIMIT, self.value)

        if rate < 1 or duration not in ["s", "m", "h", "d"]:
            raise FirewallError(errors.INVALID_LIMIT, self.value)

        mult = 1
        if duration == "s":
            mult = 1
        elif duration == "m":
            mult = 60
        elif duration == "h":
            mult = 60 * 60
        elif duration == "d":
            mult = 24 * 60 * 60

        if 10000 * mult / rate == 0:
            raise FirewallError(errors.INVALID_LIMIT, "%s too fast" % self.value)

        if rate == 1 and duration == "d":
            # iptables (v1.4.21) doesn't accept 1/d
            raise FirewallError(errors.INVALID_LIMIT, "%s too slow" % self.value)

    def __str__(self):
        return 'limit value="%s"' % (self.value)

    def command(self):
        return ""


class Rich_Rule:
    priority_min = -32768
    priority_max = 32767

    def __init__(self, family=None, rule_str=None, priority=None):
        self.family = None
        self.priority = 0
        self.source = None
        self.destination = None
        self.element = None
        self.log = None
        self.audit = None
        self.action = None

        if rule_str is not None:
            self._import_from_string(rule_str)

        if priority is not None:
            self.priority = priority

        if family is not None:
            family = str(family)
            if self.family is None:
                self.family = family
            elif self.family != family:
                raise FirewallError(errors.INVALID_FAMILY, family)

        if rule_str is not None:
            self.check()

    @staticmethod
    def _lexer(rule_str):
        """Lexical analysis"""
        tokens = []

        for r in functions.splitArgs(rule_str):
            if "=" in r:
                attr = r.split("=")
                if len(attr) != 2 or not attr[0] or not attr[1]:
                    raise FirewallError(
                        errors.INVALID_RULE, "internal error in _lexer(): %s" % r
                    )
                tokens.append({"attr_name": attr[0], "attr_value": attr[1]})
            else:
                tokens.append({"element": r})
        tokens.append({"element": EOL})

        return tokens

    def _import_from_string(self, rule_str):
        if not rule_str:
            raise FirewallError(errors.INVALID_RULE, "empty rule")

        rule_str = functions.stripNonPrintableCharacters(rule_str)

        tokens = self._lexer(rule_str)
        if tokens and tokens[0].get("element") is EOL:
            raise FirewallError(errors.INVALID_RULE, "empty rule")

        attrs = {}  # attributes of elements
        in_elements = []  # stack with elements we are in
        index = 0  # index into tokens
        while not (tokens[index].get("element") is EOL and in_elements == ["rule"]):
            element = tokens[index].get("element")
            attr_name = tokens[index].get("attr_name")
            attr_value = tokens[index].get("attr_value")
            # print ("in_elements: ", in_elements)
            # print ("index: %s, element: %s, attribute: %s=%s" % (index, element, attr_name, attr_value))
            if attr_name:  # attribute
                if attr_name not in [
                    "priority",
                    "family",
                    "address",
                    "mac",
                    "ipset",
                    "invert",
                    "value",
                    "port",
                    "protocol",
                    "to-port",
                    "to-addr",
                    "name",
                    "group",
                    "prefix",
                    "level",
                    "queue-size",
                    "type",
                    "set",
                ]:
                    raise FirewallError(
                        errors.INVALID_RULE, "bad attribute '%s'" % attr_name
                    )
            else:  # element
                if element in [
                    "rule",
                    "source",
                    "destination",
                    "protocol",
                    "service",
                    "port",
                    "icmp-block",
                    "icmp-type",
                    "masquerade",
                    "forward-port",
                    "source-port",
                    "log",
                    "nflog",
                    "audit",
                    "accept",
                    "drop",
                    "reject",
                    "mark",
                    "limit",
                    "not",
                    "NOT",
                    EOL,
                    "tcp-mss-clamp",
                ]:
                    if element == "source" and self.source:
                        raise FirewallError(
                            errors.INVALID_RULE, "more than one 'source' element"
                        )
                    elif element == "destination" and self.destination:
                        raise FirewallError(
                            errors.INVALID_RULE, "more than one 'destination' element"
                        )
                    elif (
                        element
                        in [
                            "protocol",
                            "service",
                            "port",
                            "icmp-block",
                            "icmp-type",
                            "masquerade",
                            "forward-port",
                            "source-port",
                        ]
                        and self.element
                    ):
                        raise FirewallError(
                            errors.INVALID_RULE,
                            "more than one element. There cannot be both '%s' and '%s' in one rule."
                            % (element, self.element),
                        )
                    elif element in ["log", "nflog"] and self.log:
                        raise FirewallError(
                            errors.INVALID_RULE, "more than one logging element"
                        )
                    elif element == "audit" and self.audit:
                        raise FirewallError(
                            errors.INVALID_RULE, "more than one 'audit' element"
                        )
                    elif (
                        element in ["accept", "drop", "reject", "mark"] and self.action
                    ):
                        raise FirewallError(
                            errors.INVALID_RULE,
                            "more than one 'action' element. There cannot be both '%s' and '%s' in one rule."
                            % (element, self.action),
                        )
                else:
                    raise FirewallError(
                        errors.INVALID_RULE, "unknown element %s" % element
                    )

            in_element = (
                in_elements[len(in_elements) - 1] if len(in_elements) > 0 else ""
            )

            if in_element == "":
                if not element and attr_name:
                    if attr_name == "family":
                        raise FirewallError(
                            errors.INVALID_RULE,
                            "'family' outside of rule. Use 'rule family=...'.",
                        )
                    elif attr_name == "priority":
                        raise FirewallError(
                            errors.INVALID_RULE,
                            "'priority' outside of rule. Use 'rule priority=...'.",
                        )
                    else:
                        raise FirewallError(
                            errors.INVALID_RULE,
                            "'%s' outside of any element. Use 'rule <element> %s= ...'."
                            % (attr_name, attr_name),
                        )
                elif "rule" not in element:
                    raise FirewallError(
                        errors.INVALID_RULE,
                        "'%s' outside of rule. Use 'rule ... %s ...'."
                        % (element, element),
                    )
                else:
                    in_elements.append("rule")  # push into stack
            elif in_element == "rule":
                if attr_name == "family":
                    if attr_value not in ["ipv4", "ipv6"]:
                        raise FirewallError(
                            errors.INVALID_RULE,
                            "'family' attribute cannot have '%s' value. Use 'ipv4' or 'ipv6' instead."
                            % attr_value,
                        )
                    self.family = attr_value
                elif attr_name == "priority":
                    try:
                        self.priority = int(attr_value)
                    except ValueError:
                        raise FirewallError(
                            errors.INVALID_PRIORITY,
                            "invalid 'priority' attribute value '%s'." % attr_value,
                        )
                elif attr_name:
                    if attr_name == "protocol":
                        err_msg = "wrong 'protocol' usage. Use either 'rule protocol value=...' or  'rule [forward-]port protocol=...'."
                    else:
                        err_msg = (
                            "attribute '%s' outside of any element. Use 'rule <element> %s= ...'."
                            % (attr_name, attr_name)
                        )
                    raise FirewallError(errors.INVALID_RULE, err_msg)
                else:
                    in_elements.append(element)  # push into stack
            elif in_element == "source":
                if attr_name in ["address", "mac", "ipset", "invert"]:
                    attrs[attr_name] = attr_value
                elif element in ["not", "NOT"]:
                    attrs["invert"] = True
                else:
                    self.source = Rich_Source(
                        attrs.get("address"),
                        attrs.get("mac"),
                        attrs.get("ipset"),
                        attrs.get("invert", False),
                    )
                    in_elements.pop()  # source
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "destination":
                if attr_name in ["address", "ipset", "invert"]:
                    attrs[attr_name] = attr_value
                elif element in ["not", "NOT"]:
                    attrs["invert"] = True
                else:
                    self.destination = Rich_Destination(
                        attrs.get("address"),
                        attrs.get("ipset"),
                        attrs.get("invert", False),
                    )
                    in_elements.pop()  # destination
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "protocol":
                if attr_name == "value":
                    self.element = Rich_Protocol(attr_value)
                    in_elements.pop()  # protocol
                else:
                    raise FirewallError(
                        errors.INVALID_RULE, "invalid 'protocol' element"
                    )
            elif in_element == "tcp-mss-clamp":
                if attr_name == "value":
                    attrs[attr_name] = attr_value
                else:
                    self.element = Rich_Tcp_Mss_Clamp(attrs.get("value"))
                    in_elements.pop()
                    attrs.clear()
                    index = index - 1
            elif in_element == "service":
                if attr_name == "name":
                    self.element = Rich_Service(attr_value)
                    in_elements.pop()  # service
                else:
                    raise FirewallError(
                        errors.INVALID_RULE, "invalid 'service' element"
                    )
            elif in_element == "port":
                if attr_name in ["port", "protocol"]:
                    attrs[attr_name] = attr_value
                else:
                    self.element = Rich_Port(attrs.get("port"), attrs.get("protocol"))
                    in_elements.pop()  # port
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "icmp-block":
                if attr_name == "name":
                    self.element = Rich_IcmpBlock(attr_value)
                    in_elements.pop()  # icmp-block
                else:
                    raise FirewallError(
                        errors.INVALID_RULE, "invalid 'icmp-block' element"
                    )
            elif in_element == "icmp-type":
                if attr_name == "name":
                    self.element = Rich_IcmpType(attr_value)
                    in_elements.pop()  # icmp-type
                else:
                    raise FirewallError(
                        errors.INVALID_RULE, "invalid 'icmp-type' element"
                    )
            elif in_element == "masquerade":
                self.element = Rich_Masquerade()
                in_elements.pop()
                attrs.clear()
                index = index - 1  # return token to input
            elif in_element == "forward-port":
                if attr_name in ["port", "protocol", "to-port", "to-addr"]:
                    attrs[attr_name] = attr_value
                else:
                    self.element = Rich_ForwardPort(
                        attrs.get("port"),
                        attrs.get("protocol"),
                        attrs.get("to-port"),
                        attrs.get("to-addr"),
                    )
                    in_elements.pop()  # forward-port
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "source-port":
                if attr_name in ["port", "protocol"]:
                    attrs[attr_name] = attr_value
                else:
                    self.element = Rich_SourcePort(
                        attrs.get("port"), attrs.get("protocol")
                    )
                    in_elements.pop()  # source-port
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "log":
                if attr_name in ["prefix", "level"]:
                    attrs[attr_name] = attr_value
                elif element == "limit":
                    in_elements.append("limit")
                else:
                    self.log = Rich_Log(
                        attrs.get("prefix"), attrs.get("level"), attrs.get("limit")
                    )
                    in_elements.pop()  # log
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "nflog":
                if attr_name in ["group", "prefix", "queue-size"]:
                    attrs[attr_name] = attr_value
                elif element == "limit":
                    in_elements.append("limit")
                else:
                    self.log = Rich_NFLog(
                        attrs.get("group"),
                        attrs.get("prefix"),
                        attrs.get("queue-size"),
                        attrs.get("limit"),
                    )
                    in_elements.pop()  # nflog
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "audit":
                if element == "limit":
                    in_elements.append("limit")
                else:
                    self.audit = Rich_Audit(attrs.get("limit"))
                    in_elements.pop()  # audit
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "accept":
                if element == "limit":
                    in_elements.append("limit")
                else:
                    self.action = Rich_Accept(attrs.get("limit"))
                    in_elements.pop()  # accept
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "drop":
                if element == "limit":
                    in_elements.append("limit")
                else:
                    self.action = Rich_Drop(attrs.get("limit"))
                    in_elements.pop()  # drop
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "reject":
                if attr_name == "type":
                    attrs[attr_name] = attr_value
                elif element == "limit":
                    in_elements.append("limit")
                else:
                    self.action = Rich_Reject(attrs.get("type"), attrs.get("limit"))
                    in_elements.pop()  # accept
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "mark":
                if attr_name == "set":
                    attrs[attr_name] = attr_value
                elif element == "limit":
                    in_elements.append("limit")
                else:
                    self.action = Rich_Mark(attrs.get("set"), attrs.get("limit"))
                    in_elements.pop()  # accept
                    attrs.clear()
                    index = index - 1  # return token to input
            elif in_element == "limit":
                if attr_name == "value":
                    attrs["limit"] = Rich_Limit(attr_value)
                    in_elements.pop()  # limit
                else:
                    raise FirewallError(errors.INVALID_RULE, "invalid 'limit' element")

            index = index + 1

    def _check_entry(self, entry):
        if entry is not None:
            entry.check(family=self.family)

    def check(self):
        if self.family is not None:
            if self.family not in ["ipv4", "ipv6"]:
                raise FirewallError(errors.INVALID_FAMILY, self.family)
        else:
            if (
                self.source is not None and self.source.addr is not None
            ) or self.destination is not None:
                raise FirewallError(errors.MISSING_FAMILY)
            if isinstance(self.element, Rich_ForwardPort):
                raise FirewallError(errors.MISSING_FAMILY)

        if self.priority < self.priority_min or self.priority > self.priority_max:
            raise FirewallError(
                errors.INVALID_PRIORITY,
                "'priority' attribute must be between %d and %d."
                % (self.priority_min, self.priority_max),
            )

        if self.element is None and (
            self.log is None or (self.log is not None and self.priority == 0)
        ):
            if self.action is None:
                raise FirewallError(errors.INVALID_RULE, "no element, no action")
            if self.source is None and self.destination is None and self.priority == 0:
                raise FirewallError(
                    errors.INVALID_RULE, "no element, no source, no destination"
                )

        if type(self.element) not in [
            Rich_IcmpBlock,
            Rich_ForwardPort,
            Rich_Masquerade,
            Rich_Tcp_Mss_Clamp,
        ]:
            if self.log is None and self.audit is None and self.action is None:
                raise FirewallError(errors.INVALID_RULE, "no action, no log, no audit")

        self._check_entry(self.source)
        self._check_entry(self.destination)

        self._check_entry(self.element)

        if isinstance(self.element, Rich_Masquerade):
            if self.action is not None:
                raise FirewallError(errors.INVALID_RULE, "masquerade and action")
            if self.source is not None and self.source.mac is not None:
                raise FirewallError(errors.INVALID_RULE, "masquerade and mac source")
        elif isinstance(self.element, Rich_IcmpBlock):
            if self.action:
                raise FirewallError(errors.INVALID_RULE, "icmp-block and action")
        elif isinstance(self.element, Rich_ForwardPort):
            if self.action is not None:
                raise FirewallError(errors.INVALID_RULE, "forward-port and action")
        elif isinstance(self.element, Rich_Tcp_Mss_Clamp):
            if self.action is not None:
                raise FirewallError(
                    errors.INVALID_RULE,
                    "tcp-mss-clamp and %s are mutually exclusive" % self.action,
                )

        self._check_entry(self.log)

        if self.audit is not None:
            if type(self.action) not in [Rich_Accept, Rich_Reject, Rich_Drop]:
                raise FirewallError(errors.INVALID_AUDIT_TYPE, type(self.action))

        self._check_entry(self.audit)
        self._check_entry(self.action)

    def __str__(self):
        ret = "rule"
        if self.priority:
            ret += ' priority="%d"' % self.priority
        if self.family:
            ret += ' family="%s"' % self.family
        if self.source:
            ret += " %s" % self.source
        if self.destination:
            ret += " %s" % self.destination
        if self.element:
            ret += " %s" % self.element
        if self.log:
            ret += " %s" % self.log
        if self.audit:
            ret += " %s" % self.audit
        if self.action:
            ret += " %s" % self.action

        return ret


# class Rich_RawRule:
# class Rich_RuleSet:
# class Rich_AddressList:

Youez - 2016 - github.com/yon3zu
LinuXploit