| Server IP : 172.67.206.42 / Your IP : 104.23.243.51 Web Server : Apache System : Linux server.localhost.com 6.8.0-85-generic #85-Ubuntu SMP PREEMPT_DYNAMIC Thu Sep 18 15:26:59 UTC 2025 x86_64 User : pahana ( 1029) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : OFF Directory : /usr/lib/python3/dist-packages/firewall/core/ |
Upload File : |
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <[email protected]>
"""Transaction classes for firewalld"""
import traceback
from firewall.core.logger import log
from firewall import errors
from firewall.errors import FirewallError
class FirewallTransaction:
def __init__(self, fw):
self.fw = fw
self.rules = {} # [ ( backend.name, [ rule,.. ] ),.. ]
self.pre_funcs = [] # [ (func, args),.. ]
self.post_funcs = [] # [ (func, args),.. ]
self.fail_funcs = [] # [ (func, args),.. ]
self.modules = [] # [ module,.. ]
def clear(self):
self.rules.clear()
del self.pre_funcs[:]
del self.post_funcs[:]
del self.fail_funcs[:]
def add_rule(self, backend, rule):
self.rules.setdefault(backend.name, []).append(rule)
def add_rules(self, backend, rules):
for rule in rules:
self.add_rule(backend, rule)
def query_rule(self, backend, rule):
return backend.name in self.rules and rule in self.rules[backend.name]
def remove_rule(self, backend, rule):
if backend.name in self.rules and rule in self.rules[backend.name]:
self.rules[backend.name].remove(rule)
def add_pre(self, func, *args):
self.pre_funcs.append((func, args))
def add_post(self, func, *args):
self.post_funcs.append((func, args))
def add_fail(self, func, *args):
self.fail_funcs.append((func, args))
def add_module(self, module):
if module not in self.modules:
self.modules.append(module)
def remove_module(self, module):
if module in self.modules:
self.modules.remove(module)
def add_modules(self, modules):
for module in modules:
self.add_module(module)
def remove_modules(self, modules):
for module in modules:
self.remove_module(module)
def execute(self, enable):
log.debug4("%s.execute(%s)" % (type(self), enable))
rules = self.rules
modules = self.modules
# pre
self.pre()
# stage 1: apply rules
error = False
errorMsg = ""
done = []
for backend_name in rules:
try:
self.fw.rules(backend_name, rules[backend_name])
except Exception as msg:
error = True
errorMsg = msg
log.debug1(traceback.format_exc())
log.error(msg)
else:
done.append(backend_name)
# stage 2: load modules
if not error:
module_return = self.fw.handle_modules(modules, enable)
if module_return:
# Debug log about issues loading modules, but don't error. The
# modules may be builtin or CONFIG_MODULES=n, in which case
# modprobe will fail. Or we may be running inside a container
# that doesn't have sufficient privileges. Unfortunately there
# is no way for us to know.
(status, msg) = module_return
if status:
log.debug1(msg)
if error:
# call failure functions
for func, args in self.fail_funcs:
try:
func(*args)
except Exception as msg:
log.debug1(traceback.format_exc())
log.error("Calling fail func %s(%s) failed: %s" % (func, args, msg))
raise FirewallError(errors.COMMAND_FAILED, errorMsg)
# post
self.post()
def pre(self):
log.debug4("%s.pre()" % type(self))
for func, args in self.pre_funcs:
func(*args)
def post(self):
log.debug4("%s.post()" % type(self))
for func, args in self.post_funcs:
func(*args)