403Webshell
Server IP : 172.67.206.42  /  Your IP : 104.23.243.50
Web Server : Apache
System : Linux server.localhost.com 6.8.0-85-generic #85-Ubuntu SMP PREEMPT_DYNAMIC Thu Sep 18 15:26:59 UTC 2025 x86_64
User : pahana ( 1029)
PHP Version : 7.4.33
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/firewall/core/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/firewall/core/ipXtables.py
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright (C) 2010-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <[email protected]>

import os.path
import copy

from firewall.core.prog import runProg
from firewall.core.logger import log
from firewall.functions import (
    tempFile,
    readfile,
    splitArgs,
    check_mac,
    portStr,
    check_single_address,
    check_address,
    normalizeIP6,
)
from firewall import config
from firewall.errors import (
    FirewallError,
    INVALID_PASSTHROUGH,
    INVALID_RULE,
    UNKNOWN_ERROR,
    INVALID_ADDR,
)
from firewall.core.rich import (
    Rich_Accept,
    Rich_Reject,
    Rich_Drop,
    Rich_Mark,
    Rich_NFLog,
    Rich_Masquerade,
    Rich_ForwardPort,
    Rich_IcmpBlock,
    Rich_Tcp_Mss_Clamp,
)
from firewall.core.base import DEFAULT_ZONE_TARGET
import string

POLICY_CHAIN_PREFIX = ""

BUILT_IN_CHAINS = {
    "security": ["INPUT", "OUTPUT", "FORWARD"],
    "raw": ["PREROUTING", "OUTPUT"],
    "mangle": ["PREROUTING", "POSTROUTING", "INPUT", "OUTPUT", "FORWARD"],
    "nat": ["PREROUTING", "POSTROUTING", "OUTPUT"],
    "filter": ["INPUT", "OUTPUT", "FORWARD"],
}

DEFAULT_REJECT_TYPE = {
    "ipv4": "icmp-host-prohibited",
    "ipv6": "icmp6-adm-prohibited",
}

ICMP = {
    "ipv4": "icmp",
    "ipv6": "ipv6-icmp",
}


def common_reverse_passthrough(args):
    """Reverse valid passthough rule"""

    replace_args = {
        # Append
        "-A": "-D",
        "--append": "--delete",
        # Insert
        "-I": "-D",
        "--insert": "--delete",
        # New chain
        "-N": "-X",
        "--new-chain": "--delete-chain",
    }

    ret_args = args[:]

    for x in replace_args:
        try:
            idx = ret_args.index(x)
        except ValueError:
            continue

        if x in ["-I", "--insert"]:
            # With insert rulenum, then remove it if it is a number
            # Opt at position idx, chain at position idx+1, [rulenum] at
            # position idx+2
            try:
                int(ret_args[idx + 2])
            except ValueError:
                pass
            else:
                ret_args.pop(idx + 2)

        ret_args[idx] = replace_args[x]
        return ret_args

    raise FirewallError(INVALID_PASSTHROUGH, "no '-A', '-I' or '-N' arg")


# ipv ebtables also uses this
#
def common_check_passthrough(args):
    """Check if passthough rule is valid (only add, insert and new chain
    rules are allowed)"""

    args = set(args)
    not_allowed = set(
        [
            "-C",
            "--check",  # check rule
            "-D",
            "--delete",  # delete rule
            "-R",
            "--replace",  # replace rule
            "-L",
            "--list",  # list rule
            "-S",
            "--list-rules",  # print rules
            "-F",
            "--flush",  # flush rules
            "-Z",
            "--zero",  # zero rules
            "-X",
            "--delete-chain",  # delete chain
            "-P",
            "--policy",  # policy
            "-E",
            "--rename-chain",
        ]
    )  # rename chain)
    # intersection of args and not_allowed is not empty, i.e.
    # something from args is not allowed
    if len(args & not_allowed) > 0:
        raise FirewallError(
            INVALID_PASSTHROUGH, "arg '%s' is not allowed" % list(args & not_allowed)[0]
        )

    # args need to contain one of -A, -I, -N
    needed = set(["-A", "--append", "-I", "--insert", "-N", "--new-chain"])
    # empty intersection of args and needed, i.e.
    # none from args contains any needed command
    if len(args & needed) == 0:
        raise FirewallError(INVALID_PASSTHROUGH, "no '-A', '-I' or '-N' arg")


class ip4tables:
    ipv = "ipv4"
    name = "ip4tables"
    policies_supported = True

    def __init__(self, fw):
        self._fw = fw
        self._command = config.COMMANDS[self.ipv]
        self._restore_command = config.COMMANDS["%s-restore" % self.ipv]
        self.wait_option = self._detect_wait_option()
        self.restore_wait_option = self._detect_restore_wait_option()
        self.fill_exists()
        self.available_tables = []
        self.rich_rule_priority_counts = {}
        self.policy_dispatch_index_cache = {}
        self.policy_dispatch_index_cache_ref_count = {}
        self.our_chains = {}  # chains created by firewalld

    def fill_exists(self):
        self.command_exists = os.path.exists(self._command)
        self.restore_command_exists = os.path.exists(self._restore_command)

    def __run(self, args):
        # convert to string list
        if self.wait_option and self.wait_option not in args:
            _args = [self.wait_option] + ["%s" % item for item in args]
        else:
            _args = ["%s" % item for item in args]
        log.debug2("%s: %s %s", self.__class__, self._command, " ".join(_args))
        (status, ret) = runProg(self._command, _args)
        if status != 0:
            raise ValueError(
                "'%s %s' failed: %s" % (self._command, " ".join(_args), ret)
            )
        return ret

    def _rule_replace(self, rule, pattern, replacement):
        try:
            i = rule.index(pattern)
        except ValueError:
            return False
        else:
            rule[i : i + 1] = replacement
            return True

    def is_chain_builtin(self, ipv, table, chain):
        return table in BUILT_IN_CHAINS and chain in BUILT_IN_CHAINS[table]

    def build_chain_rules(self, add, table, chain):
        rule = ["-t", table]
        if add:
            rule.append("-N")
        else:
            rule.append("-X")
        rule.append(chain)
        return [rule]

    def build_rule(self, add, table, chain, index, args):
        rule = ["-t", table]
        if add:
            rule += ["-I", chain, str(index)]
        else:
            rule += ["-D", chain]
        rule += args
        return rule

    def check_passthrough(self, args):
        common_check_passthrough(args)

    def reverse_passthrough(self, args):
        return common_reverse_passthrough(args)

    def passthrough_parse_table_chain(self, args):
        table = "filter"
        try:
            i = args.index("-t")
        except ValueError:
            pass
        else:
            if len(args) >= i + 1:
                table = args[i + 1]
        chain = None
        for opt in ["-A", "--append", "-I", "--insert", "-N", "--new-chain"]:
            try:
                i = args.index(opt)
            except ValueError:
                pass
            else:
                if len(args) >= i + 1:
                    chain = args[i + 1]
        return (table, chain)

    def _set_rule_replace_priority(self, rule, priority_counts, token):
        """
        Change something like
          -t filter -I public_IN %%RICH_RULE_PRIORITY%% 123
        or
          -t filter -A public_IN %%RICH_RULE_PRIORITY%% 321
        into
          -t filter -I public_IN 4
        or
          -t filter -I public_IN
        """
        try:
            i = rule.index(token)
        except ValueError:
            pass
        else:
            rule_add = True
            insert = False
            insert_add_index = -1
            rule.pop(i)
            priority = rule.pop(i)
            if not isinstance(priority, int):
                raise FirewallError(
                    INVALID_RULE, "priority must be followed by a number"
                )

            table = "filter"
            for opt in ["-t", "--table"]:
                try:
                    j = rule.index(opt)
                except ValueError:
                    pass
                else:
                    if len(rule) >= j + 1:
                        table = rule[j + 1]
            for opt in ["-A", "--append", "-I", "--insert", "-D", "--delete"]:
                try:
                    insert_add_index = rule.index(opt)
                except ValueError:
                    pass
                else:
                    if len(rule) >= insert_add_index + 1:
                        chain = rule[insert_add_index + 1]

                    if opt in ["-I", "--insert"]:
                        insert = True
                    if opt in ["-D", "--delete"]:
                        rule_add = False

            chain = (table, chain)

            # Add the rule to the priority counts. We don't need to store the
            # rule, just bump the ref count for the priority value.
            if not rule_add:
                if (
                    chain not in priority_counts
                    or priority not in priority_counts[chain]
                    or priority_counts[chain][priority] <= 0
                ):
                    raise FirewallError(
                        UNKNOWN_ERROR, "nonexistent or underflow of priority count"
                    )

                priority_counts[chain][priority] -= 1
            else:
                if chain not in priority_counts:
                    priority_counts[chain] = {}
                if priority not in priority_counts[chain]:
                    priority_counts[chain][priority] = 0

                # calculate index of new rule
                index = 1
                for p in sorted(priority_counts[chain].keys()):
                    if p == priority and insert:
                        break
                    index += priority_counts[chain][p]
                    if p == priority:
                        break

                priority_counts[chain][priority] += 1

                rule[insert_add_index] = "-I"
                rule.insert(insert_add_index + 2, "%d" % index)

    def _set_rule_sort_policy_dispatch(
        self, rule, policy_dispatch_index_cache, policy_dispatch_index_cache_ref_count
    ):
        try:
            i = rule.index("%%POLICY_SORT_KEY%%")
        except ValueError:
            return False

        rule.pop(i)
        sort_tuple = rule.pop(i)

        delete = False
        verb_index = -1

        table = "filter"
        for opt in ["-t", "--table"]:
            try:
                j = rule.index(opt)
            except ValueError:
                continue

            if len(rule) >= j + 1:
                table = rule[j + 1]
        for opt in ["-A", "--append", "-I", "--insert", "-D", "--delete"]:
            try:
                verb_index = rule.index(opt)
            except ValueError:
                continue

            if len(rule) >= verb_index + 1:
                chain = rule[verb_index + 1]

            if opt in ["-D", "--delete"]:
                delete = True

        chain = (table, chain)

        if delete:
            if (
                chain in policy_dispatch_index_cache
                and sort_tuple in policy_dispatch_index_cache[chain]
            ):
                if (
                    chain in policy_dispatch_index_cache_ref_count
                    and sort_tuple in policy_dispatch_index_cache_ref_count[chain]
                ):
                    if policy_dispatch_index_cache_ref_count[chain][sort_tuple] == 1:
                        del policy_dispatch_index_cache_ref_count[chain][sort_tuple]
                        policy_dispatch_index_cache[chain].remove(sort_tuple)
                        return False
                    else:
                        policy_dispatch_index_cache_ref_count[chain][sort_tuple] -= 1
                        return True
                else:  # only ever one, so no ref count created
                    policy_dispatch_index_cache[chain].remove(sort_tuple)
                    return False
            return True
        else:
            if chain not in policy_dispatch_index_cache:
                policy_dispatch_index_cache[chain] = []

            # rule de-duplication
            if sort_tuple in policy_dispatch_index_cache[chain]:
                if chain not in policy_dispatch_index_cache_ref_count:
                    policy_dispatch_index_cache_ref_count[chain] = {}
                if sort_tuple not in policy_dispatch_index_cache_ref_count[chain]:
                    policy_dispatch_index_cache_ref_count[chain][sort_tuple] = 1

                if sort_tuple in policy_dispatch_index_cache_ref_count[chain]:
                    policy_dispatch_index_cache_ref_count[chain][sort_tuple] += 1

                # Tell caller to skip this rule as we already have one
                return True

            policy_dispatch_index_cache[chain].append(sort_tuple)
            policy_dispatch_index_cache[chain].sort()

            index = policy_dispatch_index_cache[chain].index(sort_tuple)

            rule[verb_index] = "-I"
            rule.insert(verb_index + 2, f"{index + 1}")

            return False

    def set_rules(self, rules, log_denied):
        temp_file = tempFile()

        table_rules = {}
        rich_rule_priority_counts = copy.deepcopy(self.rich_rule_priority_counts)
        policy_dispatch_index_cache = copy.deepcopy(self.policy_dispatch_index_cache)
        policy_dispatch_index_cache_ref_count = copy.deepcopy(
            self.policy_dispatch_index_cache_ref_count
        )
        for _rule in rules:
            rule = _rule[:]

            # replace %%REJECT%%
            self._rule_replace(
                rule,
                "%%REJECT%%",
                ["REJECT", "--reject-with", DEFAULT_REJECT_TYPE[self.ipv]],
            )

            # replace %%ICMP%%
            self._rule_replace(rule, "%%ICMP%%", [ICMP[self.ipv]])

            # replace %%LOGTYPE%%
            try:
                i = rule.index("%%LOGTYPE%%")
            except ValueError:
                pass
            else:
                if log_denied == "off":
                    continue
                if log_denied in ["unicast", "broadcast", "multicast"]:
                    rule[i : i + 1] = ["-m", "pkttype", "--pkt-type", log_denied]
                else:
                    rule.pop(i)

            self._set_rule_replace_priority(
                rule, rich_rule_priority_counts, "%%RICH_RULE_PRIORITY%%"
            )
            skip = self._set_rule_sort_policy_dispatch(
                rule, policy_dispatch_index_cache, policy_dispatch_index_cache_ref_count
            )

            if skip:
                continue

            table = "filter"
            # get table form rule
            for opt in ["-t", "--table"]:
                try:
                    i = rule.index(opt)
                except ValueError:
                    pass
                else:
                    if len(rule) >= i + 1:
                        rule.pop(i)
                        table = rule.pop(i)

            # we can not use joinArgs here, because it would use "'" instead
            # of '"' for the start and end of the string, this breaks
            # iptables-restore
            for i, element in enumerate(rule):
                for c in string.whitespace:
                    if c in element and not (
                        element.startswith('"') and element.endswith('"')
                    ):
                        rule[i] = '"%s"' % element

            table_rules.setdefault(table, []).append(rule)

        for table in table_rules:
            rules = table_rules[table]

            temp_file.write("*%s\n" % table)
            for rule in rules:
                temp_file.write(" ".join(rule) + "\n")
            temp_file.write("COMMIT\n")

        temp_file.close()

        stat = os.stat(temp_file.name)
        log.debug2(
            "%s: %s %s",
            self.__class__,
            self._restore_command,
            "%s: %d" % (temp_file.name, stat.st_size),
        )
        args = []
        if self.restore_wait_option:
            args.append(self.restore_wait_option)
        args.append("-n")

        (status, ret) = runProg(self._restore_command, args, stdin=temp_file.name)

        if log.getDebugLogLevel() > 2:
            lines = readfile(temp_file.name)
            if lines is not None:
                i = 1
                for line in lines:
                    log.debug3("%8d: %s" % (i, line), nofmt=1, nl=0)
                    if not line.endswith("\n"):
                        log.debug3("", nofmt=1)
                    i += 1

        os.unlink(temp_file.name)

        if status != 0:
            raise ValueError(
                "'%s %s' failed: %s" % (self._restore_command, " ".join(args), ret)
            )
        self.rich_rule_priority_counts = rich_rule_priority_counts
        self.policy_dispatch_index_cache = policy_dispatch_index_cache
        self.policy_dispatch_index_cache_ref_count = (
            policy_dispatch_index_cache_ref_count
        )

    def set_rule(self, rule, log_denied):
        # replace %%REJECT%%
        self._rule_replace(
            rule,
            "%%REJECT%%",
            ["REJECT", "--reject-with", DEFAULT_REJECT_TYPE[self.ipv]],
        )

        # replace %%ICMP%%
        self._rule_replace(rule, "%%ICMP%%", [ICMP[self.ipv]])

        # replace %%LOGTYPE%%
        try:
            i = rule.index("%%LOGTYPE%%")
        except ValueError:
            pass
        else:
            if log_denied == "off":
                return ""
            if log_denied in ["unicast", "broadcast", "multicast"]:
                rule[i : i + 1] = ["-m", "pkttype", "--pkt-type", log_denied]
            else:
                rule.pop(i)

        rich_rule_priority_counts = copy.deepcopy(self.rich_rule_priority_counts)
        policy_dispatch_index_cache = copy.deepcopy(self.policy_dispatch_index_cache)
        policy_dispatch_index_cache_ref_count = copy.deepcopy(
            self.policy_dispatch_index_cache_ref_count
        )
        self._set_rule_replace_priority(
            rule, rich_rule_priority_counts, "%%RICH_RULE_PRIORITY%%"
        )
        skip = self._set_rule_sort_policy_dispatch(
            rule, policy_dispatch_index_cache, policy_dispatch_index_cache_ref_count
        )

        output = ""
        if not skip:
            output = self.__run(rule)

        self.rich_rule_priority_counts = rich_rule_priority_counts
        self.policy_dispatch_index_cache = policy_dispatch_index_cache
        self.policy_dispatch_index_cache_ref_count = (
            policy_dispatch_index_cache_ref_count
        )
        return output

    def get_available_tables(self, table=None):
        ret = []
        tables = [table] if table else BUILT_IN_CHAINS.keys()
        for table in tables:
            if table in self.available_tables:
                ret.append(table)
            else:
                try:
                    self.__run(["-t", table, "-L", "-n"])
                    self.available_tables.append(table)
                    ret.append(table)
                except ValueError:
                    log.debug1(
                        "%s table '%s' does not exist (or not enough permission to check)."
                        % (self.ipv, table)
                    )

        return ret

    def _detect_wait_option(self):
        wait_option = ""
        ret = runProg(self._command, ["-w", "-L", "-n"])  # since iptables-1.4.20
        log.debug3(
            '%s: %s: probe for wait option (%s): ret=%u, output="%s"',
            self.__class__,
            self._command,
            "-w",
            ret[0],
            ret[1],
        )
        if ret[0] == 0:
            wait_option = "-w"  # wait for xtables lock
            ret = runProg(
                self._command, ["-w10", "-L", "-n"]
            )  # since iptables > 1.4.21
            log.debug3(
                '%s: %s: probe for wait option (%s): ret=%u, output="%s"',
                self.__class__,
                self._command,
                "-w10",
                ret[0],
                ret[1],
            )
            if ret[0] == 0:
                wait_option = "-w10"  # wait max 10 seconds
            log.debug2(
                "%s: %s will be using %s option.",
                self.__class__,
                self._command,
                wait_option,
            )

        return wait_option

    def _detect_restore_wait_option(self):
        temp_file = tempFile()
        temp_file.write("#foo")
        temp_file.close()

        wait_option = ""
        for test_option in ["-w", "--wait=2"]:
            ret = runProg(self._restore_command, [test_option], stdin=temp_file.name)
            log.debug3(
                '%s: %s: probe for wait option (%s): ret=%u, output="%s"',
                self.__class__,
                self._command,
                test_option,
                ret[0],
                ret[1],
            )
            if (
                ret[0] == 0
                and "invalid option" not in ret[1]
                and "unrecognized option" not in ret[1]
            ):
                wait_option = test_option
                break

        log.debug2(
            "%s: %s will be using %s option.",
            self.__class__,
            self._restore_command,
            wait_option,
        )

        os.unlink(temp_file.name)

        return wait_option

    def build_flush_rules(self):
        self.rich_rule_priority_counts = {}
        self.policy_dispatch_index_cache = {}
        self.policy_dispatch_index_cache_ref_count = {}
        rules = []
        for table in BUILT_IN_CHAINS.keys():
            if not self.get_available_tables(table):
                continue
            # Flush firewall rules: -F
            # Delete firewall chains: -X
            # Set counter to zero: -Z
            for flag in ["-F", "-X", "-Z"]:
                rules.append(["-t", table, flag])
        return rules

    def build_set_policy_rules(self, policy, policy_details):
        rules = []
        _policy = "DROP" if policy == "PANIC" else policy
        for table in BUILT_IN_CHAINS.keys():
            if not self.get_available_tables(table):
                continue
            if table == "nat":
                continue
            for chain in BUILT_IN_CHAINS[table]:
                if table == "filter":
                    p = policy_details[chain]
                    if p == "REJECT":
                        rules.append(["-t", table, "-A", chain, "-j", "REJECT"])
                        p = "DROP"
                else:
                    p = _policy
                rules.append(["-t", table, "-P", chain, p])
        return rules

    def supported_icmp_types(self, ipv=None):
        """Return ICMP types that are supported by the iptables/ip6tables command and kernel"""
        output = ""
        try:
            output = self.__run(
                ["-p", "icmp" if self.ipv == "ipv4" else "ipv6-icmp", "--help"]
            )
        except ValueError as ex:
            if self.ipv == "ipv4":
                log.debug1("iptables error: %s" % ex)
            else:
                log.debug1("ip6tables error: %s" % ex)

        return self._parse_supported_icmp_types(self.ipv, output)

    @staticmethod
    def _parse_supported_icmp_types(ipv, output):
        in_types = False
        ret = []
        for line in output.splitlines():
            if in_types:
                line = line.strip().lower()
                splits = line.split()
                for split in splits:
                    if split.startswith("(") and split.endswith(")"):
                        x = split[1:-1]
                    else:
                        x = split
                    if x not in ret:
                        ret.append(x)
            if (
                ipv == "ipv4"
                and line.startswith("Valid ICMP Types:")
                or ipv == "ipv6"
                and line.startswith("Valid ICMPv6 Types:")
            ):
                in_types = True
        return ret

    def build_default_tables(self):
        # nothing to do, they always exist
        return []

    def build_default_rules(self, log_denied="off"):
        default_rules = {}

        if self.get_available_tables("security"):
            default_rules["security"] = []
            self.our_chains["security"] = set()
            for chain in BUILT_IN_CHAINS["security"]:
                default_rules["security"].append("-N %s_direct" % chain)
                default_rules["security"].append("-A %s -j %s_direct" % (chain, chain))
                self.our_chains["security"].add("%s_direct" % chain)

        if self.get_available_tables("raw"):
            default_rules["raw"] = []
            self.our_chains["raw"] = set()
            for chain in BUILT_IN_CHAINS["raw"]:
                default_rules["raw"].append("-N %s_direct" % chain)
                default_rules["raw"].append("-A %s -j %s_direct" % (chain, chain))
                self.our_chains["raw"].add("%s_direct" % chain)

                if chain == "PREROUTING":
                    default_rules["raw"].append("-N %s_POLICIES" % (chain))
                    self.our_chains["raw"].update(set(["%s_POLICIES" % (chain)]))
                    default_rules["raw"].append("-A %s -j %s_POLICIES" % (chain, chain))

        if self.get_available_tables("mangle"):
            default_rules["mangle"] = []
            self.our_chains["mangle"] = set()
            for chain in BUILT_IN_CHAINS["mangle"]:
                default_rules["mangle"].append("-N %s_direct" % chain)
                default_rules["mangle"].append("-A %s -j %s_direct" % (chain, chain))
                self.our_chains["mangle"].add("%s_direct" % chain)

                if chain == "PREROUTING":
                    default_rules["mangle"].append("-N %s_POLICIES" % (chain))
                    self.our_chains["mangle"].update(set(["%s_POLICIES" % (chain)]))
                    default_rules["mangle"].append(
                        "-A %s -j %s_POLICIES" % (chain, chain)
                    )

        if self.get_available_tables("nat"):
            default_rules["nat"] = []
            self.our_chains["nat"] = set()
            for chain in BUILT_IN_CHAINS["nat"]:
                default_rules["nat"].append("-N %s_direct" % chain)
                default_rules["nat"].append("-A %s -j %s_direct" % (chain, chain))
                self.our_chains["nat"].add("%s_direct" % chain)

                default_rules["nat"].append("-N %s_POLICIES" % (chain))
                self.our_chains["nat"].update(set(["%s_POLICIES" % (chain)]))
                default_rules["nat"].append("-A %s -j %s_POLICIES" % (chain, chain))

        default_rules["filter"] = []
        self.our_chains["filter"] = set()
        default_rules["filter"].append(
            "-A INPUT -m conntrack --ctstate RELATED,ESTABLISHED,DNAT -j ACCEPT"
        )
        default_rules["filter"].append("-A INPUT -i lo -j ACCEPT")
        if log_denied != "off":
            default_rules["filter"].append(
                "-A INPUT -m conntrack --ctstate INVALID %%LOGTYPE%% -j LOG --log-prefix 'STATE_INVALID_DROP: '"
            )
        default_rules["filter"].append(
            "-A INPUT -m conntrack --ctstate INVALID -j DROP"
        )
        default_rules["filter"].append("-N INPUT_direct")
        default_rules["filter"].append("-A INPUT -j INPUT_direct")
        self.our_chains["filter"].update(set("INPUT_direct"))
        default_rules["filter"].append("-N INPUT_POLICIES")
        self.our_chains["filter"].update(set("INPUT_POLICIES"))
        default_rules["filter"].append("-A INPUT -j INPUT_POLICIES")
        if log_denied != "off":
            default_rules["filter"].append(
                "-A INPUT %%LOGTYPE%% -j LOG --log-prefix 'FINAL_REJECT: '"
            )
        default_rules["filter"].append("-A INPUT -j %%REJECT%%")

        default_rules["filter"].append(
            "-A FORWARD -m conntrack --ctstate RELATED,ESTABLISHED,DNAT -j ACCEPT"
        )
        default_rules["filter"].append("-A FORWARD -i lo -j ACCEPT")
        if log_denied != "off":
            default_rules["filter"].append(
                "-A FORWARD -m conntrack --ctstate INVALID %%LOGTYPE%% -j LOG --log-prefix 'STATE_INVALID_DROP: '"
            )
        default_rules["filter"].append(
            "-A FORWARD -m conntrack --ctstate INVALID -j DROP"
        )
        default_rules["filter"].append("-N FORWARD_direct")
        default_rules["filter"].append("-A FORWARD -j FORWARD_direct")
        self.our_chains["filter"].update(set("FORWARD_direct"))
        default_rules["filter"].append("-N FORWARD_POLICIES")
        self.our_chains["filter"].update(set("FORWARD_POLICIES"))
        default_rules["filter"].append("-A FORWARD -j FORWARD_POLICIES")
        if log_denied != "off":
            default_rules["filter"].append(
                "-A FORWARD %%LOGTYPE%% -j LOG --log-prefix 'FINAL_REJECT: '"
            )
        default_rules["filter"].append("-A FORWARD -j %%REJECT%%")

        default_rules["filter"] += [
            "-N OUTPUT_direct",
            "-A OUTPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT",
            "-A OUTPUT -o lo -j ACCEPT",
            "-A OUTPUT -j OUTPUT_direct",
        ]
        self.our_chains["filter"].update(set("OUTPUT_direct"))
        default_rules["filter"].append("-N OUTPUT_POLICIES")
        default_rules["filter"].append("-A OUTPUT -j OUTPUT_POLICIES")
        self.our_chains["filter"].update(set("OUTPUT_POLICIES"))

        final_default_rules = []
        for table in default_rules:
            if table not in self.get_available_tables():
                continue
            for rule in default_rules[table]:
                final_default_rules.append(["-t", table] + splitArgs(rule))

        return final_default_rules

    def get_zone_table_chains(self, table):
        if table == "filter":
            return {"INPUT", "FORWARD"}
        if table == "mangle":
            if "mangle" in self.get_available_tables():
                return {"PREROUTING"}
        if table == "nat":
            if "nat" in self.get_available_tables():
                return {"PREROUTING", "POSTROUTING"}
        if table == "raw":
            if "raw" in self.get_available_tables():
                return {"PREROUTING"}

        return {}

    def _policy_dispatch_sort_key(
        self,
        policy,
        ingress_zone,
        egress_zone,
        ingress_interface,
        ingress_source,
        egress_interface,
        egress_source,
        priority,
        last=False,
        prerouting=False,
        postrouting=False,
        log_denied=False,
    ):
        p_obj = self._fw.policy.get_policy(policy)
        ingress_priority = (
            0
            if ingress_zone == "HOST"
            else self._fw.zone.get_zone(ingress_zone).ingress_priority
        )
        egress_priority = (
            0
            if egress_zone == "HOST"
            else self._fw.zone.get_zone(egress_zone).egress_priority
        )

        ingress_sort_order = 0  # 0 means output chain
        if ingress_source:
            ingress_sort_order = 1
        elif ingress_interface:
            ingress_sort_order = 2
            if postrouting:
                ingress_zone = ""
                ingress_interface = ""

        egress_sort_order = 0  # 0 means input chain
        if egress_source:
            egress_sort_order = 1
        elif egress_interface or (p_obj.derived_from_zone and prerouting):
            egress_sort_order = 2
            if prerouting:
                egress_zone = ""
                egress_interface = ""

        # default zone is always sorted to last as it's a "catch-all"
        if ingress_interface == "+":
            ingress_priority = self._fw.zone.get_zone(ingress_zone).priority_max + 1
        if egress_interface == "+":
            egress_priority = self._fw.zone.get_zone(egress_zone).priority_max + 1

        last_sort_order = 0
        if last:
            if log_denied:
                last_sort_order = 1
            else:
                last_sort_order = 2

        ingress = (
            ingress_priority,
            ingress_sort_order,
            ingress_zone,
            ingress_source,
            ingress_interface,
        )
        egress = (
            egress_priority,
            egress_sort_order,
            egress_zone,
            egress_source,
            egress_interface,
        )
        suffix = (last_sort_order, priority)

        if postrouting:
            return ["%%POLICY_SORT_KEY%%", egress + ingress + suffix + (policy,)]
        else:
            return ["%%POLICY_SORT_KEY%%", ingress + egress + suffix + (policy,)]

    def build_policy_ingress_egress_pair_rules(
        self,
        enable,
        policy,
        table,
        chain,
        ingress_zone,
        egress_zone,
        ingress_interface,
        ingress_source,
        egress_interface,
        egress_source,
        last=False,
    ):
        add_del = {True: "-I", False: "-D"}[enable]
        p_obj = self._fw.policy.get_policy(policy)
        isSNAT = True if (table == "nat" and chain == "POSTROUTING") else False
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX, isSNAT
        )
        prerouting = True if chain == "PREROUTING" else False
        postrouting = True if chain == "POSTROUTING" else False

        # iptables can not match a destination MAC
        if check_mac(egress_source) or (postrouting and check_mac(ingress_source)):
            return []

        rules = []

        rule = ["-t", table, add_del, f"{chain}_POLICIES"]

        # iptables-legacy cannot match -i in postrouting
        if ingress_interface and ingress_interface != "+" and not postrouting:
            rule.extend(["-i", ingress_interface])
        if egress_interface and egress_interface != "+" and not prerouting:
            rule.extend(["-o", egress_interface])
        if ingress_source:
            rule.extend(self._rule_addr_fragment("-s", ingress_source))
        if egress_source:
            rule.extend(self._rule_addr_fragment("-d", egress_source))

        if not last:
            rule.extend(["-j", _policy])
        elif table != "filter" or chain in ["PREROUTING", "OUTPUT"]:
            rule.extend(["-j", "RETURN"])
        elif p_obj.target in [
            DEFAULT_ZONE_TARGET,
            "ACCEPT",
            "REJECT",
            "%%REJECT%%",
            "DROP",
        ]:
            if self._fw.get_log_denied() != "off" and p_obj.target in [
                DEFAULT_ZONE_TARGET,
                "%%REJECT%%",
                "REJECT",
                "DROP",
            ]:
                _rule = rule[:]
                _log_suffix = "DROP" if p_obj.target == "DROP" else "REJECT"

                _rule.extend(
                    [
                        "%%LOGTYPE%%",
                        "-j",
                        "LOG",
                        "--log-prefix",
                        f"{_policy}_{_log_suffix}: ",
                    ]
                )
                _rule.extend(
                    self._policy_dispatch_sort_key(
                        policy,
                        ingress_zone,
                        egress_zone,
                        ingress_interface,
                        ingress_source,
                        egress_interface,
                        egress_source,
                        p_obj.priority,
                        last=True,
                        log_denied=True,
                        postrouting=postrouting,
                        prerouting=prerouting,
                    )
                )
                rules.append(_rule)

            if p_obj.target == DEFAULT_ZONE_TARGET:
                rule.extend(["-j", "REJECT"])
            else:
                rule.extend(["-j", p_obj.target])

        rule.extend(
            self._policy_dispatch_sort_key(
                policy,
                ingress_zone,
                egress_zone,
                ingress_interface,
                ingress_source,
                egress_interface,
                egress_source,
                p_obj.priority,
                last=last,
                postrouting=postrouting,
                prerouting=prerouting,
            )
        )

        rules.append(rule)

        return rules

    def _rule_addr_fragment(self, opt, address, invert=False):
        if address.startswith("ipset:"):
            name = address[6:]
            if opt == "-d":
                opt = "dst"
            else:
                opt = "src"
            flags = ",".join([opt] * self._fw.ipset.get_dimension(name))
            return ["-m", "set", "--match-set", name, flags]
        elif check_mac(address):
            # outgoing can not be set
            if opt == "-d":
                raise FirewallError(INVALID_ADDR, "Can't match a destination MAC.")
            return ["-m", "mac", "--mac-source", address.upper()]
        else:
            if check_single_address("ipv6", address):
                address = normalizeIP6(address)
            elif check_address("ipv6", address):
                addr_split = address.split("/")
                address = normalizeIP6(addr_split[0]) + "/" + addr_split[1]
            return [opt, address]

    def build_policy_chain_rules(self, enable, policy, table, chain):
        add_del_chain = {True: "-N", False: "-X"}[enable]
        add_del_rule = {True: "-A", False: "-D"}[enable]
        isSNAT = True if (table == "nat" and chain == "POSTROUTING") else False
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX, isSNAT=isSNAT
        )
        p_obj = self._fw.policy.get_policy(policy)

        self.our_chains[table].update(
            set(
                [
                    _policy,
                    "%s_log" % _policy,
                    "%s_deny" % _policy,
                    "%s_pre" % _policy,
                    "%s_post" % _policy,
                    "%s_allow" % _policy,
                ]
            )
        )

        rules = []
        rules.append([add_del_chain, _policy, "-t", table])
        rules.append([add_del_chain, "%s_pre" % _policy, "-t", table])
        rules.append([add_del_chain, "%s_log" % _policy, "-t", table])
        rules.append([add_del_chain, "%s_deny" % _policy, "-t", table])
        rules.append([add_del_chain, "%s_allow" % _policy, "-t", table])
        rules.append([add_del_chain, "%s_post" % _policy, "-t", table])
        rules.append([add_del_rule, _policy, "-t", table, "-j", "%s_pre" % _policy])
        rules.append([add_del_rule, _policy, "-t", table, "-j", "%s_log" % _policy])
        rules.append([add_del_rule, _policy, "-t", table, "-j", "%s_deny" % _policy])
        rules.append([add_del_rule, _policy, "-t", table, "-j", "%s_allow" % _policy])
        rules.append([add_del_rule, _policy, "-t", table, "-j", "%s_post" % _policy])

        if not p_obj.derived_from_zone and table == "filter":
            target = self._fw.policy._policies[policy].target

            if self._fw.get_log_denied() != "off":
                if target in ["REJECT", "%%REJECT%%"]:
                    rules.append(
                        [
                            add_del_rule,
                            _policy,
                            "-t",
                            table,
                            "%%LOGTYPE%%",
                            "-j",
                            "LOG",
                            "--log-prefix",
                            "%s_REJECT: " % _policy,
                        ]
                    )
                elif target == "DROP":
                    rules.append(
                        [
                            add_del_rule,
                            _policy,
                            "-t",
                            table,
                            "%%LOGTYPE%%",
                            "-j",
                            "LOG",
                            "--log-prefix",
                            "%s_DROP: " % _policy,
                        ]
                    )

            if target in ["ACCEPT", "REJECT", "%%REJECT%%", "DROP"]:
                rules.append([add_del_rule, _policy, "-t", table, "-j", target])

        if not enable:
            rules.reverse()

        return rules

    def _rule_limit(self, limit):
        if limit:
            return ["-m", "limit", "--limit", limit.value]
        return []

    def _rich_rule_chain_suffix(self, rich_rule):
        if type(rich_rule.element) in [
            Rich_Masquerade,
            Rich_ForwardPort,
            Rich_IcmpBlock,
            Rich_Tcp_Mss_Clamp,
        ]:
            # These are special and don't have an explicit action
            pass
        elif rich_rule.action:
            if type(rich_rule.action) not in [
                Rich_Accept,
                Rich_Reject,
                Rich_Drop,
                Rich_Mark,
            ]:
                raise FirewallError(
                    INVALID_RULE, "Unknown action %s" % type(rich_rule.action)
                )
        else:
            raise FirewallError(INVALID_RULE, "No rule action specified.")

        if rich_rule.priority == 0:
            if type(rich_rule.element) in [
                Rich_Masquerade,
                Rich_ForwardPort,
                Rich_Tcp_Mss_Clamp,
            ] or type(rich_rule.action) in [Rich_Accept, Rich_Mark]:
                return "allow"
            elif type(rich_rule.element) in [Rich_IcmpBlock] or type(
                rich_rule.action
            ) in [Rich_Reject, Rich_Drop]:
                return "deny"
        elif rich_rule.priority < 0:
            return "pre"
        else:
            return "post"

    def _rich_rule_chain_suffix_from_log(self, rich_rule):
        if not rich_rule.log and not rich_rule.audit:
            raise FirewallError(INVALID_RULE, "Not log or audit")

        if rich_rule.priority == 0:
            return "log"
        elif rich_rule.priority < 0:
            return "pre"
        else:
            return "post"

    def _rich_rule_priority_fragment(self, rich_rule):
        if rich_rule.priority == 0:
            return []
        return ["%%RICH_RULE_PRIORITY%%", rich_rule.priority]

    def _rich_rule_log(self, policy, rich_rule, enable, table, rule_fragment):
        if not rich_rule.log:
            return []

        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        add_del = {True: "-A", False: "-D"}[enable]

        chain_suffix = self._rich_rule_chain_suffix_from_log(rich_rule)
        rule = ["-t", table, add_del, "%s_%s" % (_policy, chain_suffix)]
        rule += self._rich_rule_priority_fragment(rich_rule)
        if isinstance(rich_rule.log, Rich_NFLog):
            rule += rule_fragment + ["-j", "NFLOG"]
            if rich_rule.log.group:
                rule += ["--nflog-group", rich_rule.log.group]
            if rich_rule.log.prefix:
                rule += ["--nflog-prefix", "%s" % rich_rule.log.prefix]
            if rich_rule.log.threshold:
                rule += ["--nflog-threshold", rich_rule.log.threshold]
        else:
            rule += rule_fragment + ["-j", "LOG"]
            if rich_rule.log.prefix:
                rule += ["--log-prefix", "%s" % rich_rule.log.prefix]
            if rich_rule.log.level:
                rule += ["--log-level", "%s" % rich_rule.log.level]
        rule += self._rule_limit(rich_rule.log.limit)

        return rule

    def _rich_rule_audit(self, policy, rich_rule, enable, table, rule_fragment):
        if not rich_rule.audit:
            return []

        add_del = {True: "-A", False: "-D"}[enable]

        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        chain_suffix = self._rich_rule_chain_suffix_from_log(rich_rule)
        rule = ["-t", table, add_del, "%s_%s" % (_policy, chain_suffix)]
        rule += self._rich_rule_priority_fragment(rich_rule)
        rule += rule_fragment
        if isinstance(rich_rule.action, Rich_Accept):
            _type = "accept"
        elif isinstance(rich_rule.action, Rich_Reject):
            _type = "reject"
        elif isinstance(rich_rule.action, Rich_Drop):
            _type = "drop"
        else:
            _type = "unknown"
        rule += ["-j", "AUDIT", "--type", _type]
        rule += self._rule_limit(rich_rule.audit.limit)

        return rule

    def _rich_rule_action(self, policy, rich_rule, enable, table, rule_fragment):
        if not rich_rule.action:
            return []

        add_del = {True: "-A", False: "-D"}[enable]

        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        chain_suffix = self._rich_rule_chain_suffix(rich_rule)
        chain = "%s_%s" % (_policy, chain_suffix)
        if isinstance(rich_rule.action, Rich_Accept):
            rule_action = ["-j", "ACCEPT"]
        elif isinstance(rich_rule.action, Rich_Reject):
            rule_action = ["-j", "REJECT"]
            if rich_rule.action.type:
                rule_action += ["--reject-with", rich_rule.action.type]
        elif isinstance(rich_rule.action, Rich_Drop):
            rule_action = ["-j", "DROP"]
        elif isinstance(rich_rule.action, Rich_Mark):
            table = "mangle"
            _policy = self._fw.policy.policy_base_chain_name(
                policy, table, POLICY_CHAIN_PREFIX
            )
            chain = "%s_%s" % (_policy, chain_suffix)
            rule_action = ["-j", "MARK", "--set-xmark", rich_rule.action.set]
        else:
            raise FirewallError(
                INVALID_RULE, "Unknown action %s" % type(rich_rule.action)
            )

        rule = ["-t", table, add_del, chain]
        rule += self._rich_rule_priority_fragment(rich_rule)
        rule += rule_fragment + rule_action
        rule += self._rule_limit(rich_rule.action.limit)

        return rule

    def _rich_rule_destination_fragment(self, rich_dest):
        if not rich_dest:
            return []

        rule_fragment = []
        if rich_dest.addr:
            if rich_dest.invert:
                rule_fragment.append("!")
            if check_single_address("ipv6", rich_dest.addr):
                rule_fragment += ["-d", normalizeIP6(rich_dest.addr)]
            elif check_address("ipv6", rich_dest.addr):
                addr_split = rich_dest.addr.split("/")
                rule_fragment += [
                    "-d",
                    normalizeIP6(addr_split[0]) + "/" + addr_split[1],
                ]
            else:
                rule_fragment += ["-d", rich_dest.addr]
        elif rich_dest.ipset:
            rule_fragment += ["-m", "set"]
            if rich_dest.invert:
                rule_fragment.append("!")
            flags = self._fw.zone._ipset_match_flags(rich_dest.ipset, "dst")
            rule_fragment += ["--match-set", rich_dest.ipset, flags]

        return rule_fragment

    def _rich_rule_source_fragment(self, rich_source):
        if not rich_source:
            return []

        rule_fragment = []
        if rich_source.addr:
            if rich_source.invert:
                rule_fragment.append("!")
            if check_single_address("ipv6", rich_source.addr):
                rule_fragment += ["-s", normalizeIP6(rich_source.addr)]
            elif check_address("ipv6", rich_source.addr):
                addr_split = rich_source.addr.split("/")
                rule_fragment += [
                    "-s",
                    normalizeIP6(addr_split[0]) + "/" + addr_split[1],
                ]
            else:
                rule_fragment += ["-s", rich_source.addr]
        elif hasattr(rich_source, "mac") and rich_source.mac:
            rule_fragment += ["-m", "mac"]
            if rich_source.invert:
                rule_fragment.append("!")
            rule_fragment += ["--mac-source", rich_source.mac]
        elif hasattr(rich_source, "ipset") and rich_source.ipset:
            rule_fragment += ["-m", "set"]
            if rich_source.invert:
                rule_fragment.append("!")
            flags = self._fw.zone._ipset_match_flags(rich_source.ipset, "src")
            rule_fragment += ["--match-set", rich_source.ipset, flags]

        return rule_fragment

    def build_policy_ports_rules(
        self, enable, policy, proto, port, destination=None, rich_rule=None
    ):
        add_del = {True: "-A", False: "-D"}[enable]
        table = "filter"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        rule_fragment = ["-p", proto]
        if port:
            rule_fragment += ["--dport", "%s" % portStr(port)]
        if destination:
            rule_fragment += ["-d", destination]
        if rich_rule:
            rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
            rule_fragment += self._rich_rule_source_fragment(rich_rule.source)

        rules = []
        if rich_rule:
            rules.append(
                self._rich_rule_log(policy, rich_rule, enable, table, rule_fragment)
            )
            rules.append(
                self._rich_rule_audit(policy, rich_rule, enable, table, rule_fragment)
            )
            rules.append(
                self._rich_rule_action(policy, rich_rule, enable, table, rule_fragment)
            )
        else:
            rules.append(
                [add_del, "%s_allow" % (_policy), "-t", table]
                + rule_fragment
                + ["-j", "ACCEPT"]
            )

        return rules

    def build_policy_protocol_rules(
        self, enable, policy, protocol, destination=None, rich_rule=None
    ):
        add_del = {True: "-A", False: "-D"}[enable]
        table = "filter"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        rule_fragment = ["-p", protocol]
        if destination:
            rule_fragment += ["-d", destination]
        if rich_rule:
            rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
            rule_fragment += self._rich_rule_source_fragment(rich_rule.source)

        rules = []
        if rich_rule:
            rules.append(
                self._rich_rule_log(policy, rich_rule, enable, table, rule_fragment)
            )
            rules.append(
                self._rich_rule_audit(policy, rich_rule, enable, table, rule_fragment)
            )
            rules.append(
                self._rich_rule_action(policy, rich_rule, enable, table, rule_fragment)
            )
        else:
            rules.append(
                [add_del, "%s_allow" % (_policy), "-t", table]
                + rule_fragment
                + ["-j", "ACCEPT"]
            )

        return rules

    def build_policy_tcp_mss_clamp_rules(
        self, enable, policy, tcp_mss_clamp_value, destination=None, rich_rule=None
    ):
        chain_suffix = "allow"
        table = "filter"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )
        add_del = {True: "-A", False: "-D"}[enable]

        rule_fragment = ["-p", "tcp", "--tcp-flags", "SYN,RST", "SYN"]
        if rich_rule:
            chain_suffix = self._rich_rule_chain_suffix(rich_rule)
            rule_fragment += self._rich_rule_priority_fragment(rich_rule)
            rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
            rule_fragment += self._rich_rule_source_fragment(rich_rule.source)

        if tcp_mss_clamp_value == "pmtu" or tcp_mss_clamp_value is None:
            rule_fragment += ["-j", "TCPMSS", "--clamp-mss-to-pmtu"]
        else:
            rule_fragment += ["-j", "TCPMSS", "--set-mss", tcp_mss_clamp_value]

        return [
            ["-t", "filter", add_del, "%s_%s" % (_policy, chain_suffix)] + rule_fragment
        ]

    def build_policy_source_ports_rules(
        self, enable, policy, proto, port, destination=None, rich_rule=None
    ):
        add_del = {True: "-A", False: "-D"}[enable]
        table = "filter"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        rule_fragment = ["-p", proto]
        if port:
            rule_fragment += ["--sport", "%s" % portStr(port)]
        if destination:
            rule_fragment += ["-d", destination]
        if rich_rule:
            rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
            rule_fragment += self._rich_rule_source_fragment(rich_rule.source)

        rules = []
        if rich_rule:
            rules.append(
                self._rich_rule_log(policy, rich_rule, enable, table, rule_fragment)
            )
            rules.append(
                self._rich_rule_audit(policy, rich_rule, enable, table, rule_fragment)
            )
            rules.append(
                self._rich_rule_action(policy, rich_rule, enable, table, rule_fragment)
            )
        else:
            rules.append(
                [add_del, "%s_allow" % (_policy), "-t", table]
                + rule_fragment
                + ["-j", "ACCEPT"]
            )

        return rules

    def build_policy_helper_ports_rules(
        self, enable, policy, proto, port, destination, helper_name, module_short_name
    ):
        table = "raw"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )
        add_del = {True: "-A", False: "-D"}[enable]

        rule = [add_del, "%s_allow" % (_policy), "-t", "raw", "-p", proto]
        if port:
            rule += ["--dport", "%s" % portStr(port)]
        if destination:
            rule += ["-d", destination]
        rule += ["-j", "CT", "--helper", module_short_name]

        return [rule]

    def build_zone_forward_rules(
        self, enable, zone, policy, table, interface=None, source=None
    ):
        add_del = {True: "-A", False: "-D"}[enable]
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        rules = []
        if interface:
            rules.append(
                [
                    "-t",
                    "filter",
                    add_del,
                    "%s_allow" % _policy,
                    "-o",
                    interface,
                    "-j",
                    "ACCEPT",
                ]
            )
        else:  # source
            # iptables can not match destination MAC
            if check_mac(source):
                return []

            rules.append(
                ["-t", "filter", add_del, "%s_allow" % _policy]
                + self._rule_addr_fragment("-d", source)
                + ["-j", "ACCEPT"]
            )
        return rules

    def build_policy_masquerade_rules(self, enable, policy, rich_rule=None):
        table = "nat"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX, isSNAT=True
        )

        add_del = {True: "-A", False: "-D"}[enable]

        rule_fragment = []
        if rich_rule:
            chain_suffix = self._rich_rule_chain_suffix(rich_rule)
            rule_fragment += self._rich_rule_priority_fragment(rich_rule)
            rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
            rule_fragment += self._rich_rule_source_fragment(rich_rule.source)
        else:
            chain_suffix = "allow"

        rules = []
        rules.append(
            ["-t", "nat", add_del, "%s_%s" % (_policy, chain_suffix)]
            + rule_fragment
            + ["!", "-o", "lo", "-j", "MASQUERADE"]
        )

        return rules

    def build_policy_forward_port_rules(
        self, enable, policy, port, protocol, toport, toaddr, rich_rule=None
    ):
        table = "nat"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )
        add_del = {True: "-A", False: "-D"}[enable]

        to = ""
        if toaddr:
            if check_single_address("ipv6", toaddr):
                to += "[%s]" % normalizeIP6(toaddr)
            else:
                to += toaddr
        if toport and toport != "":
            to += ":%s" % portStr(toport, "-")

        rules = []
        rule_fragment = []
        port_fragment = ["-p", protocol, "--dport", portStr(port)]
        if rich_rule:
            chain_suffix = self._rich_rule_chain_suffix(rich_rule)
            rule_fragment = self._rich_rule_priority_fragment(rich_rule)
            rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
            rule_fragment += self._rich_rule_source_fragment(rich_rule.source)

            rules.append(
                self._rich_rule_log(
                    policy, rich_rule, enable, "nat", rule_fragment + port_fragment
                )
            )
        else:
            chain_suffix = "allow"

        rules.append(
            ["-t", "nat", add_del, "%s_%s" % (_policy, chain_suffix)]
            + rule_fragment
            + port_fragment
            + ["-j", "DNAT", "--to-destination", to]
        )

        return rules

    def build_policy_icmp_block_rules(self, enable, policy, ict, rich_rule=None):
        table = "filter"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )
        add_del = {True: "-A", False: "-D"}[enable]

        if self.ipv == "ipv4":
            proto = ["-p", "icmp"]
            match = ["-m", "icmp", "--icmp-type", ict.name]
        else:
            proto = ["-p", "ipv6-icmp"]
            match = ["-m", "icmp6", "--icmpv6-type", ict.name]

        rules = []
        if self._fw.policy.query_icmp_block_inversion(policy):
            final_chain = "%s_allow" % (_policy)
            final_target = "ACCEPT"
        else:
            final_chain = "%s_deny" % (_policy)
            final_target = "%%REJECT%%"

        rule_fragment = []
        if rich_rule:
            rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
            rule_fragment += self._rich_rule_source_fragment(rich_rule.source)
        rule_fragment += proto + match

        if rich_rule:
            rules.append(
                self._rich_rule_log(policy, rich_rule, enable, table, rule_fragment)
            )
            rules.append(
                self._rich_rule_audit(policy, rich_rule, enable, table, rule_fragment)
            )
            if rich_rule.action:
                rules.append(
                    self._rich_rule_action(
                        policy, rich_rule, enable, table, rule_fragment
                    )
                )
            else:
                chain_suffix = self._rich_rule_chain_suffix(rich_rule)
                rules.append(
                    ["-t", table, add_del, "%s_%s" % (_policy, chain_suffix)]
                    + self._rich_rule_priority_fragment(rich_rule)
                    + rule_fragment
                    + ["-j", "%%REJECT%%"]
                )
        else:
            if self._fw.get_log_denied() != "off" and final_target != "ACCEPT":
                rules.append(
                    [add_del, final_chain, "-t", table]
                    + rule_fragment
                    + [
                        "%%LOGTYPE%%",
                        "-j",
                        "LOG",
                        "--log-prefix",
                        "%s_ICMP_BLOCK: " % policy,
                    ]
                )
            rules.append(
                [add_del, final_chain, "-t", table]
                + rule_fragment
                + ["-j", final_target]
            )

        return rules

    def build_policy_icmp_block_inversion_rules(self, enable, policy):
        table = "filter"
        _policy = self._fw.policy.policy_base_chain_name(
            policy, table, POLICY_CHAIN_PREFIX
        )

        rules = []
        rule_idx = 6

        if self._fw.policy.query_icmp_block_inversion(policy):
            ibi_target = "%%REJECT%%"

            if self._fw.get_log_denied() != "off":
                if enable:
                    rule = ["-I", _policy, str(rule_idx)]
                else:
                    rule = ["-D", _policy]

                rule = rule + [
                    "-t",
                    table,
                    "-p",
                    "%%ICMP%%",
                    "%%LOGTYPE%%",
                    "-j",
                    "LOG",
                    "--log-prefix",
                    "%s_ICMP_BLOCK: " % _policy,
                ]
                rules.append(rule)
                rule_idx += 1
        else:
            ibi_target = "ACCEPT"

        if enable:
            rule = ["-I", _policy, str(rule_idx)]
        else:
            rule = ["-D", _policy]
        rule = rule + ["-t", table, "-p", "%%ICMP%%", "-j", ibi_target]
        rules.append(rule)

        return rules

    def build_policy_rich_source_destination_rules(self, enable, policy, rich_rule):
        table = "filter"

        rule_fragment = []
        rule_fragment += self._rich_rule_destination_fragment(rich_rule.destination)
        rule_fragment += self._rich_rule_source_fragment(rich_rule.source)

        rules = []
        rules.append(
            self._rich_rule_log(policy, rich_rule, enable, table, rule_fragment)
        )
        rules.append(
            self._rich_rule_audit(policy, rich_rule, enable, table, rule_fragment)
        )
        rules.append(
            self._rich_rule_action(policy, rich_rule, enable, table, rule_fragment)
        )

        return rules

    def is_ipv_supported(self, ipv):
        return ipv == self.ipv


class ip6tables(ip4tables):
    ipv = "ipv6"
    name = "ip6tables"

    def build_rpfilter_rules(self, log_denied=False):
        rules = []
        rules.append(
            [
                "-I",
                "PREROUTING",
                "-t",
                "mangle",
                "-m",
                "rpfilter",
                "--invert",
                "--validmark",
                "-j",
                "DROP",
            ]
        )
        if log_denied != "off":
            rules.append(
                [
                    "-I",
                    "PREROUTING",
                    "-t",
                    "mangle",
                    "-m",
                    "rpfilter",
                    "--invert",
                    "--validmark",
                    "-j",
                    "LOG",
                    "--log-prefix",
                    "rpfilter_DROP: ",
                ]
            )
        rules.append(
            [
                "-I",
                "PREROUTING",
                "-t",
                "mangle",
                "-p",
                "ipv6-icmp",
                "--icmpv6-type=neighbour-solicitation",
                "-j",
                "ACCEPT",
            ]
        )  # RHBZ#1575431, kernel bug in 4.16-4.17
        rules.append(
            [
                "-I",
                "PREROUTING",
                "-t",
                "mangle",
                "-p",
                "ipv6-icmp",
                "--icmpv6-type=router-advertisement",
                "-j",
                "ACCEPT",
            ]
        )  # RHBZ#1058505
        return rules

    def build_rfc3964_ipv4_rules(self):
        daddr_list = [
            "::0.0.0.0/96",  # IPv4 compatible
            "::ffff:0.0.0.0/96",  # IPv4 mapped
            "2002:0000::/24",  # 0.0.0.0/8 (the system has no address assigned yet)
            "2002:0a00::/24",  # 10.0.0.0/8 (private)
            "2002:7f00::/24",  # 127.0.0.0/8 (loopback)
            "2002:ac10::/28",  # 172.16.0.0/12 (private)
            "2002:c0a8::/32",  # 192.168.0.0/16 (private)
            "2002:a9fe::/32",  # 169.254.0.0/16 (IANA Assigned DHCP link-local)
            "2002:e000::/19",  # 224.0.0.0/4 (multicast), 240.0.0.0/4 (reserved and broadcast)
        ]

        chain_name = "RFC3964_IPv4"
        self.our_chains["filter"].add(chain_name)

        rules = []
        rules.append(["-t", "filter", "-N", chain_name])
        for daddr in daddr_list:
            rules.append(
                [
                    "-t",
                    "filter",
                    "-I",
                    chain_name,
                    "-d",
                    daddr,
                    "-j",
                    "REJECT",
                    "--reject-with",
                    "addr-unreach",
                ]
            )
            if self._fw._log_denied in ["unicast", "all"]:
                rules.append(
                    [
                        "-t",
                        "filter",
                        "-I",
                        chain_name,
                        "-d",
                        daddr,
                        "-j",
                        "LOG",
                        "--log-prefix",
                        "RFC3964_IPv4_REJECT: ",
                    ]
                )

        # Inject into FORWARD and OUTPUT chains
        rules.append(["-t", "filter", "-I", "OUTPUT", "4", "-j", chain_name])
        rules.append(
            [
                "-t",
                "filter",
                "-I",
                "FORWARD",
                "6" if self._fw.get_log_denied() != "off" else "5",
                "-j",
                chain_name,
            ]
        )
        return rules

Youez - 2016 - github.com/yon3zu
LinuXploit