[ Avaa Bypassed ]




Upload:

Command:

hmhc3928@18.117.231.160: ~ $
# -*- coding: utf-8 -*-
#
# Copyright (C) 2010-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

__all__ = [ "Firewall_test" ]

import os.path
import sys
import copy
from firewall import config
from firewall import functions
from firewall.core.fw_icmptype import FirewallIcmpType
from firewall.core.fw_service import FirewallService
from firewall.core.fw_zone import FirewallZone
from firewall.core.fw_direct import FirewallDirect
from firewall.core.fw_config import FirewallConfig
from firewall.core.fw_policies import FirewallPolicies
from firewall.core.fw_ipset import FirewallIPSet
from firewall.core.fw_helper import FirewallHelper
from firewall.core.logger import log
from firewall.core.io.firewalld_conf import firewalld_conf
from firewall.core.io.direct import Direct
from firewall.core.io.service import service_reader
from firewall.core.io.icmptype import icmptype_reader
from firewall.core.io.zone import zone_reader, Zone
from firewall.core.io.ipset import ipset_reader
from firewall.core.ipset import IPSET_TYPES
from firewall.core.io.helper import helper_reader
from firewall import errors
from firewall.errors import FirewallError

############################################################################
#
# class Firewall
#
############################################################################

class Firewall_test(object):
    def __init__(self):
        self._firewalld_conf = firewalld_conf(config.FIREWALLD_CONF)

        self.ip4tables_enabled = False
        self.ip6tables_enabled = False
        self.ebtables_enabled = False
        self.ipset_enabled = False
        self.ipset_supported_types = IPSET_TYPES

        self.icmptype = FirewallIcmpType(self)
        self.service = FirewallService(self)
        self.zone = FirewallZone(self)
        self.direct = FirewallDirect(self)
        self.config = FirewallConfig(self)
        self.policies = FirewallPolicies()
        self.ipset = FirewallIPSet(self)
        self.helper = FirewallHelper(self)

        self.__init_vars()

    def __repr__(self):
        return '%s(%r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r)' % \
            (self.__class__, self.ip4tables_enabled, self.ip6tables_enabled,
             self.ebtables_enabled, self._state, self._panic,
             self._default_zone, self._module_refcount, self._marks,
             self._min_mark, self.cleanup_on_exit, self.ipv6_rpfilter_enabled,
             self.ipset_enabled, self._individual_calls, self._log_denied,
             self._automatic_helpers)

    def __init_vars(self):
        self._state = "INIT"
        self._panic = False
        self._default_zone = ""
        self._module_refcount = { }
        self._marks = [ ]
        # fallback settings will be overloaded by firewalld.conf
        self._min_mark = config.FALLBACK_MINIMAL_MARK
        self.cleanup_on_exit = config.FALLBACK_CLEANUP_ON_EXIT
        self.ipv6_rpfilter_enabled = config.FALLBACK_IPV6_RPFILTER
        self._individual_calls = config.FALLBACK_INDIVIDUAL_CALLS
        self._log_denied = config.FALLBACK_LOG_DENIED
        self._automatic_helpers = config.FALLBACK_AUTOMATIC_HELPERS

    def individual_calls(self):
        return self._individual_calls

    def _start(self, reload=False, complete_reload=False):
        # initialize firewall
        default_zone = config.FALLBACK_ZONE

        # load firewalld config
        log.debug1("Loading firewalld config file '%s'", config.FIREWALLD_CONF)
        try:
            self._firewalld_conf.read()
        except Exception:
            log.warning("Using fallback firewalld configuration settings.")
        else:
            if self._firewalld_conf.get("DefaultZone"):
                default_zone = self._firewalld_conf.get("DefaultZone")

            if self._firewalld_conf.get("MinimalMark"):
                self._min_mark = int(self._firewalld_conf.get("MinimalMark"))

            if self._firewalld_conf.get("CleanupOnExit"):
                value = self._firewalld_conf.get("CleanupOnExit")
                if value is not None and value.lower() in [ "no", "false" ]:
                    self.cleanup_on_exit = False

            if self._firewalld_conf.get("Lockdown"):
                value = self._firewalld_conf.get("Lockdown")
                if value is not None and value.lower() in [ "yes", "true" ]:
                    log.debug1("Lockdown is enabled")
                    try:
                        self.policies.enable_lockdown()
                    except FirewallError:
                        # already enabled, this is probably reload
                        pass

            if self._firewalld_conf.get("IPv6_rpfilter"):
                value = self._firewalld_conf.get("IPv6_rpfilter")
                if value is not None:
                    if value.lower() in [ "no", "false" ]:
                        self.ipv6_rpfilter_enabled = False
                    if value.lower() in [ "yes", "true" ]:
                        self.ipv6_rpfilter_enabled = True
            if self.ipv6_rpfilter_enabled:
                log.debug1("IPv6 rpfilter is enabled")
            else:
                log.debug1("IPV6 rpfilter is disabled")

            if self._firewalld_conf.get("IndividualCalls"):
                value = self._firewalld_conf.get("IndividualCalls")
                if value is not None and value.lower() in [ "yes", "true" ]:
                    log.debug1("IndividualCalls is enabled")
                    self._individual_calls = True

            if self._firewalld_conf.get("LogDenied"):
                value = self._firewalld_conf.get("LogDenied")
                if value is None or value.lower() == "no":
                    self._log_denied = "off"
                else:
                    self._log_denied = value.lower()
                    log.debug1("LogDenied is set to '%s'", self._log_denied)

            if self._firewalld_conf.get("AutomaticHelpers"):
                value = self._firewalld_conf.get("AutomaticHelpers")
                if value is not None:
                    if value.lower() in [ "no", "false" ]:
                        self._automatic_helpers = "no"
                    elif value.lower() in [ "yes", "true" ]:
                        self._automatic_helpers = "yes"
                    else:
                        self._automatic_helpers = value.lower()
                    log.debug1("AutomaticHelpers is set to '%s'",
                               self._automatic_helpers)

        self.config.set_firewalld_conf(copy.deepcopy(self._firewalld_conf))

        # load lockdown whitelist
        log.debug1("Loading lockdown whitelist")
        try:
            self.policies.lockdown_whitelist.read()
        except Exception as msg:
            if self.policies.query_lockdown():
                log.error("Failed to load lockdown whitelist '%s': %s",
                      self.policies.lockdown_whitelist.filename, msg)
            else:
                log.debug1("Failed to load lockdown whitelist '%s': %s",
                      self.policies.lockdown_whitelist.filename, msg)

        # copy policies to config interface
        self.config.set_policies(copy.deepcopy(self.policies))

        # load ipset files
        self._loader(config.FIREWALLD_IPSETS, "ipset")
        self._loader(config.ETC_FIREWALLD_IPSETS, "ipset")

        # load icmptype files
        self._loader(config.FIREWALLD_ICMPTYPES, "icmptype")
        self._loader(config.ETC_FIREWALLD_ICMPTYPES, "icmptype")

        if len(self.icmptype.get_icmptypes()) == 0:
            log.error("No icmptypes found.")

        # load helper files
        self._loader(config.FIREWALLD_HELPERS, "helper")
        self._loader(config.ETC_FIREWALLD_HELPERS, "helper")

        # load service files
        self._loader(config.FIREWALLD_SERVICES, "service")
        self._loader(config.ETC_FIREWALLD_SERVICES, "service")

        if len(self.service.get_services()) == 0:
            log.error("No services found.")

        # load zone files
        self._loader(config.FIREWALLD_ZONES, "zone")
        self._loader(config.ETC_FIREWALLD_ZONES, "zone")

        if len(self.zone.get_zones()) == 0:
            log.fatal("No zones found.")
            sys.exit(1)

        # check minimum required zones
        error = False
        for z in [ "block", "drop", "trusted" ]:
            if z not in self.zone.get_zones():
                log.fatal("Zone '%s' is not available.", z)
                error = True
        if error:
            sys.exit(1)

        # check if default_zone is a valid zone
        if default_zone not in self.zone.get_zones():
            if "public" in self.zone.get_zones():
                zone = "public"
            elif "external" in self.zone.get_zones():
                zone = "external"
            else:
                zone = "block" # block is a base zone, therefore it has to exist

            log.error("Default zone '%s' is not valid. Using '%s'.",
                      default_zone, zone)
            default_zone = zone
        else:
            log.debug1("Using default zone '%s'", default_zone)

        # load direct rules
        obj = Direct(config.FIREWALLD_DIRECT)
        if os.path.exists(config.FIREWALLD_DIRECT):
            log.debug1("Loading direct rules file '%s'" % \
                       config.FIREWALLD_DIRECT)
            try:
                obj.read()
            except Exception as msg:
                log.error("Failed to load direct rules file '%s': %s",
                          config.FIREWALLD_DIRECT, msg)
        self.config.set_direct(copy.deepcopy(obj))

        self._default_zone = self.check_zone(default_zone)

        self._state = "RUNNING"

    def start(self):
        self._start()

    def _loader(self, path, reader_type, combine=False):
        # combine: several zone files are getting combined into one obj
        if not os.path.isdir(path):
            return

        if combine:
            if path.startswith(config.ETC_FIREWALLD) and reader_type == "zone":
                combined_zone = Zone()
                combined_zone.name = os.path.basename(path)
                combined_zone.check_name(combined_zone.name)
                combined_zone.path = path
                combined_zone.default = False
            else:
                combine = False

        for filename in sorted(os.listdir(path)):
            if not filename.endswith(".xml"):
                if path.startswith(config.ETC_FIREWALLD) and \
                        reader_type == "zone" and \
                        os.path.isdir("%s/%s" % (path, filename)):
                    self._loader("%s/%s" % (path, filename), reader_type,
                                 combine=True)
                continue

            name = "%s/%s" % (path, filename)
            log.debug1("Loading %s file '%s'", reader_type, name)
            try:
                if reader_type == "icmptype":
                    obj = icmptype_reader(filename, path)
                    if obj.name in self.icmptype.get_icmptypes():
                        orig_obj = self.icmptype.get_icmptype(obj.name)
                        log.debug1("  Overloads %s '%s' ('%s/%s')", reader_type,
                                   orig_obj.name, orig_obj.path,
                                   orig_obj.filename)
                        self.icmptype.remove_icmptype(orig_obj.name)
                    elif obj.path.startswith(config.ETC_FIREWALLD):
                        obj.default = True
                    self.icmptype.add_icmptype(obj)
                    # add a deep copy to the configuration interface
                    self.config.add_icmptype(copy.deepcopy(obj))
                elif reader_type == "service":
                    obj = service_reader(filename, path)
                    if obj.name in self.service.get_services():
                        orig_obj = self.service.get_service(obj.name)
                        log.debug1("  Overloads %s '%s' ('%s/%s')", reader_type,
                                   orig_obj.name, orig_obj.path,
                                   orig_obj.filename)
                        self.service.remove_service(orig_obj.name)
                    elif obj.path.startswith(config.ETC_FIREWALLD):
                        obj.default = True
                    self.service.add_service(obj)
                    # add a deep copy to the configuration interface
                    self.config.add_service(copy.deepcopy(obj))
                elif reader_type == "zone":
                    obj = zone_reader(filename, path, no_check_name=combine)
                    if combine:
                        # Change name for permanent configuration
                        obj.name = "%s/%s" % (
                            os.path.basename(path),
                            os.path.basename(filename)[0:-4])
                        obj.check_name(obj.name)
                    # Copy object before combine
                    config_obj = copy.deepcopy(obj)
                    if obj.name in self.zone.get_zones():
                        orig_obj = self.zone.get_zone(obj.name)
                        self.zone.remove_zone(orig_obj.name)
                        if orig_obj.combined:
                            log.debug1("  Combining %s '%s' ('%s/%s')",
                                        reader_type, obj.name,
                                        path, filename)
                            obj.combine(orig_obj)
                        else:
                            log.debug1("  Overloads %s '%s' ('%s/%s')",
                                       reader_type,
                                       orig_obj.name, orig_obj.path,
                                       orig_obj.filename)
                    elif obj.path.startswith(config.ETC_FIREWALLD):
                        obj.default = True
                        config_obj.default = True
                    self.config.add_zone(config_obj)
                    if combine:
                        log.debug1("  Combining %s '%s' ('%s/%s')",
                                   reader_type, combined_zone.name,
                                   path, filename)
                        combined_zone.combine(obj)
                    else:
                        self.zone.add_zone(obj)
                elif reader_type == "ipset":
                    obj = ipset_reader(filename, path)
                    if obj.name in self.ipset.get_ipsets():
                        orig_obj = self.ipset.get_ipset(obj.name)
                        log.debug1("  Overloads %s '%s' ('%s/%s')", reader_type,
                                   orig_obj.name, orig_obj.path,
                                   orig_obj.filename)
                        self.ipset.remove_ipset(orig_obj.name)
                    elif obj.path.startswith(config.ETC_FIREWALLD):
                        obj.default = True
                    self.ipset.add_ipset(obj)
                    # add a deep copy to the configuration interface
                    self.config.add_ipset(copy.deepcopy(obj))
                elif reader_type == "helper":
                    obj = helper_reader(filename, path)
                    if obj.name in self.helper.get_helpers():
                        orig_obj = self.helper.get_helper(obj.name)
                        log.debug1("  Overloads %s '%s' ('%s/%s')", reader_type,
                                   orig_obj.name, orig_obj.path,
                                   orig_obj.filename)
                        self.helper.remove_helper(orig_obj.name)
                    elif obj.path.startswith(config.ETC_FIREWALLD):
                        obj.default = True
                    self.helper.add_helper(obj)
                    # add a deep copy to the configuration interface
                    self.config.add_helper(copy.deepcopy(obj))
                else:
                    log.fatal("Unknown reader type %s", reader_type)
            except FirewallError as msg:
                log.error("Failed to load %s file '%s': %s", reader_type,
                          name, msg)
            except Exception:
                log.error("Failed to load %s file '%s':", reader_type, name)
                log.exception()

        if combine and combined_zone.combined:
            if combined_zone.name in self.zone.get_zones():
                orig_obj = self.zone.get_zone(combined_zone.name)
                log.debug1("  Overloading and deactivating %s '%s' ('%s/%s')",
                           reader_type, orig_obj.name, orig_obj.path,
                           orig_obj.filename)
                try:
                    self.zone.remove_zone(combined_zone.name)
                except:
                    pass
                self.config.forget_zone(combined_zone.name)
            self.zone.add_zone(combined_zone)

    def cleanup(self):
        self.icmptype.cleanup()
        self.service.cleanup()
        self.zone.cleanup()
        self.ipset.cleanup()
        self.helper.cleanup()
        self.config.cleanup()
        self.direct.cleanup()
        self.policies.cleanup()
        self._firewalld_conf.cleanup()
        self.__init_vars()

    def stop(self):
        self.cleanup()

    # check functions

    def check_panic(self):
        return

    def check_zone(self, zone):
        _zone = zone
        if not _zone or _zone == "":
            _zone = self.get_default_zone()
        if _zone not in self.zone.get_zones():
            raise FirewallError(errors.INVALID_ZONE, _zone)
        return _zone

    def check_interface(self, interface):
        if not functions.checkInterface(interface):
            raise FirewallError(errors.INVALID_INTERFACE, interface)

    def check_service(self, service):
        self.service.check_service(service)

    def check_port(self, port):
        range = functions.getPortRange(port)

        if range == -2 or range == -1 or range is None or \
                (len(range) == 2 and range[0] >= range[1]):
            if range == -2:
                log.debug1("'%s': port > 65535" % port)
            elif range == -1:
                log.debug1("'%s': port is invalid" % port)
            elif range is None:
                log.debug1("'%s': port is ambiguous" % port)
            elif len(range) == 2 and range[0] >= range[1]:
                log.debug1("'%s': range start >= end" % port)
            raise FirewallError(errors.INVALID_PORT, port)

    def check_tcpudp(self, protocol):
        if not protocol:
            raise FirewallError(errors.MISSING_PROTOCOL)
        if protocol not in [ "tcp", "udp", "sctp", "dccp" ]:
            raise FirewallError(errors.INVALID_PROTOCOL,
                                "'%s' not in {'tcp'|'udp'|'sctp'|'dccp'}" % \
                                protocol)

    def check_ip(self, ip):
        if not functions.checkIP(ip):
            raise FirewallError(errors.INVALID_ADDR, ip)

    def check_address(self, ipv, source):
        if ipv == "ipv4":
            if not functions.checkIPnMask(source):
                raise FirewallError(errors.INVALID_ADDR, source)
        elif ipv == "ipv6":
            if not functions.checkIP6nMask(source):
                raise FirewallError(errors.INVALID_ADDR, source)
        else:
            raise FirewallError(errors.INVALID_IPV,
                                "'%s' not in {'ipv4'|'ipv6'}")

    def check_icmptype(self, icmp):
        self.icmptype.check_icmptype(icmp)

    # RELOAD

    def reload(self, stop=False):
        return

    # STATE

    def get_state(self):
        return self._state

    # PANIC MODE

    def enable_panic_mode(self):
        return

    def disable_panic_mode(self):
        return

    def query_panic_mode(self):
        return self._panic

    # LOG DENIED

    def get_log_denied(self):
        return self._log_denied

    def set_log_denied(self, value):
        if value not in config.LOG_DENIED_VALUES:
            raise FirewallError(errors.INVALID_VALUE,
                                "'%s', choose from '%s'" % \
                                (value, "','".join(config.LOG_DENIED_VALUES)))

        if value != self.get_log_denied():
            self._log_denied = value
            self._firewalld_conf.set("LogDenied", value)
            self._firewalld_conf.write()

            # now reload the firewall
            self.reload()
        else:
            raise FirewallError(errors.ALREADY_SET, value)

    # AUTOMATIC HELPERS

    def get_automatic_helpers(self):
        return self._automatic_helpers

    def set_automatic_helpers(self, value):
        if value not in config.AUTOMATIC_HELPERS_VALUES:
            raise FirewallError(errors.INVALID_VALUE,
                                "'%s', choose from '%s'" % \
                                (value, "','".join(config.AUTOMATIC_HELPERS_VALUES)))

        if value != self.get_automatic_helpers():
            self._automatic_helpers = value
            self._firewalld_conf.set("AutomaticHelpers", value)
            self._firewalld_conf.write()

            # now reload the firewall
            self.reload()
        else:
            raise FirewallError(errors.ALREADY_SET, value)

    # DEFAULT ZONE

    def get_default_zone(self):
        return self._default_zone

    def set_default_zone(self, zone):
        _zone = self.check_zone(zone)
        if _zone != self._default_zone:
            self._default_zone = _zone
            self._firewalld_conf.set("DefaultZone", _zone)
            self._firewalld_conf.write()
        else:
            raise FirewallError(errors.ZONE_ALREADY_SET, _zone)

    # lockdown

    def enable_lockdown(self):
        self._firewalld_conf.set("Lockdown", "yes")
        self._firewalld_conf.write()
        
    def disable_lockdown(self):
        self._firewalld_conf.set("Lockdown", "no")
        self._firewalld_conf.write()

Filemanager

Name Type Size Permission Actions
io Folder 0755
.__init__.pyo.40009 File 145 B 0644
.base.pyo.40009 File 1.29 KB 0644
.ebtables.pyo.40009 File 9.04 KB 0644
.fw.pyo.40009 File 30.67 KB 0644
.fw_config.pyo.40009 File 30.69 KB 0644
.fw_direct.pyo.40009 File 14.77 KB 0644
.fw_helper.pyo.40009 File 2.57 KB 0644
.fw_icmptype.pyo.40009 File 3 KB 0644
.fw_ifcfg.pyo.40009 File 1.84 KB 0644
.fw_ipset.pyo.40009 File 9.02 KB 0644
.fw_nm.pyo.40009 File 5.93 KB 0644
.fw_policies.pyo.40009 File 2.94 KB 0644
.fw_service.pyo.40009 File 2.14 KB 0644
.fw_test.pyo.40009 File 17.45 KB 0644
.fw_transaction.pyo.40009 File 10.96 KB 0644
.fw_zone.pyo.40009 File 57.31 KB 0644
.helper.pyo.40009 File 222 B 0644
.icmp.pyo.40009 File 2.89 KB 0644
.ipXtables.pyo.40009 File 34.8 KB 0644
.ipset.pyo.40009 File 9.15 KB 0644
.logger.pyo.40009 File 27.43 KB 0644
.modules.pyo.40009 File 3.56 KB 0644
.nftables.pyo.40009 File 38.56 KB 0644
.prog.pyo.40009 File 988 B 0644
.rich.pyo.40009 File 23.73 KB 0644
.watcher.pyo.40009 File 3.55 KB 0644
__init__.py File 0 B 0644
__init__.pyc File 145 B 0644
__init__.pyo File 145 B 0644
base.py File 1.94 KB 0644
base.pyc File 1.29 KB 0644
base.pyo File 1.29 KB 0644
ebtables.py File 9.13 KB 0644
ebtables.pyc File 9.04 KB 0644
ebtables.pyo File 9.04 KB 0644
fw.py File 43.71 KB 0644
fw.pyc File 30.67 KB 0644
fw.pyo File 30.67 KB 0644
fw_config.py File 35.99 KB 0644
fw_config.pyc File 30.69 KB 0644
fw_config.pyo File 30.69 KB 0644
fw_direct.py File 20.12 KB 0644
fw_direct.pyc File 14.77 KB 0644
fw_direct.pyo File 14.77 KB 0644
fw_helper.py File 1.79 KB 0644
fw_helper.pyc File 2.57 KB 0644
fw_helper.pyo File 2.57 KB 0644
fw_icmptype.py File 2.77 KB 0644
fw_icmptype.pyc File 3 KB 0644
fw_icmptype.pyo File 3 KB 0644
fw_ifcfg.py File 2.5 KB 0644
fw_ifcfg.pyc File 1.84 KB 0644
fw_ifcfg.pyo File 1.84 KB 0644
fw_ipset.py File 8.96 KB 0644
fw_ipset.pyc File 9.02 KB 0644
fw_ipset.pyo File 9.02 KB 0644
fw_nm.py File 6.49 KB 0644
fw_nm.pyc File 5.93 KB 0644
fw_nm.pyo File 5.93 KB 0644
fw_policies.py File 2.74 KB 0644
fw_policies.pyc File 2.94 KB 0644
fw_policies.pyo File 2.94 KB 0644
fw_service.py File 1.6 KB 0644
fw_service.pyc File 2.14 KB 0644
fw_service.pyo File 2.14 KB 0644
fw_test.py File 22.06 KB 0644
fw_test.pyc File 17.45 KB 0644
fw_test.pyo File 17.45 KB 0644
fw_transaction.py File 10.54 KB 0644
fw_transaction.pyc File 10.96 KB 0644
fw_transaction.pyo File 10.96 KB 0644
fw_zone.py File 75.6 KB 0644
fw_zone.pyc File 57.31 KB 0644
fw_zone.pyo File 57.31 KB 0644
helper.py File 804 B 0644
helper.pyc File 222 B 0644
helper.pyo File 222 B 0644
icmp.py File 3.03 KB 0644
icmp.pyc File 2.89 KB 0644
icmp.pyo File 2.89 KB 0644
ipXtables.py File 47.68 KB 0644
ipXtables.pyc File 34.8 KB 0644
ipXtables.pyo File 34.8 KB 0644
ipset.py File 9.1 KB 0644
ipset.pyc File 9.15 KB 0644
ipset.pyo File 9.15 KB 0644
logger.py File 30.31 KB 0644
logger.pyc File 27.43 KB 0644
logger.pyo File 27.43 KB 0644
modules.py File 3.63 KB 0644
modules.pyc File 3.56 KB 0644
modules.pyo File 3.56 KB 0644
nftables.py File 60.55 KB 0644
nftables.pyc File 38.56 KB 0644
nftables.pyo File 38.56 KB 0644
prog.py File 1.47 KB 0644
prog.pyc File 988 B 0644
prog.pyo File 988 B 0644
rich.py File 29.34 KB 0644
rich.pyc File 23.73 KB 0644
rich.pyo File 23.73 KB 0644
watcher.py File 3.15 KB 0644
watcher.pyc File 3.55 KB 0644
watcher.pyo File 3.55 KB 0644