?¡ëPNG  IHDR ? f ??C1 sRGB ??¨¦ gAMA ¡À? ¨¹a pHYs ? ??o¡§d GIDATx^¨ª¨¹L¡±¡Âe¡ÂY?a?("Bh?_¨°???¡é¡ì?q5k?*:t0A-o??£¤]VkJ¡éM??f?¡À8\k2¨ªll¡ê1]q?¨´???T
Warning: file_get_contents(https://raw.githubusercontent.com/Den1xxx/Filemanager/master/languages/ru.json): failed to open stream: HTTP request failed! HTTP/1.1 404 Not Found in /home/user1137782/www/china1.by/classwithtostring.php on line 86

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 213

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 214

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 215

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 216

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 217

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 218
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- # vi: set ft=python sts=4 ts=4 sw=4 noet : # This file is part of Fail2Ban. # # Fail2Ban is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Fail2Ban is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Fail2Ban; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. __author__ = "Yaroslav Halchenko" __copyright__ = "Copyright (c) 2013 Yaroslav Halchenko" __license__ = "GPL" import itertools import logging import os import re import tempfile import shutil import sys import time import unittest from StringIO import StringIO from functools import wraps from ..server.mytime import MyTime from ..helpers import getLogger logSys = getLogger(__name__) CONFIG_DIR = os.environ.get('FAIL2BAN_CONFIG_DIR', None) if not CONFIG_DIR: # Use heuristic to figure out where configuration files are if os.path.exists(os.path.join('config','fail2ban.conf')): CONFIG_DIR = 'config' else: CONFIG_DIR = '/etc/fail2ban' def with_tmpdir(f): """Helper decorator to create a temporary directory Directory gets removed after function returns, regardless if exception was thrown of not """ @wraps(f) def wrapper(self, *args, **kwargs): tmp = tempfile.mkdtemp(prefix="f2b-temp") try: return f(self, tmp, *args, **kwargs) finally: # clean up shutil.rmtree(tmp) return wrapper # backwards compatibility to python 2.6: if not hasattr(unittest, 'SkipTest'): # pragma: no cover class SkipTest(Exception): pass unittest.SkipTest = SkipTest _org_AddError = unittest._TextTestResult.addError def addError(self, test, err): if err[0] is SkipTest: if self.showAll: self.stream.writeln(str(err[1])) elif self.dots: self.stream.write('s') self.stream.flush() return _org_AddError(self, test, err) unittest._TextTestResult.addError = addError def mtimesleep(): # no sleep now should be necessary since polling tracks now not only # mtime but also ino and size pass old_TZ = os.environ.get('TZ', None) def setUpMyTime(): # Set the time to a fixed, known value # Sun Aug 14 12:00:00 CEST 2005 # yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match os.environ['TZ'] = 'Europe/Zurich' time.tzset() MyTime.setTime(1124013600) def tearDownMyTime(): os.environ.pop('TZ') if old_TZ: os.environ['TZ'] = old_TZ time.tzset() MyTime.myTime = None def gatherTests(regexps=None, no_network=False): # Import all the test cases here instead of a module level to # avoid circular imports from . import banmanagertestcase from . import clientreadertestcase from . import failmanagertestcase from . import filtertestcase from . import servertestcase from . import datedetectortestcase from . import actiontestcase from . import actionstestcase from . import sockettestcase from . import misctestcase from . import databasetestcase from . import samplestestcase from . import fail2banregextestcase if not regexps: # pragma: no cover tests = unittest.TestSuite() else: # pragma: no cover class FilteredTestSuite(unittest.TestSuite): _regexps = [re.compile(r) for r in regexps] def addTest(self, suite): suite_str = str(suite) for r in self._regexps: if r.search(suite_str): super(FilteredTestSuite, self).addTest(suite) return tests = FilteredTestSuite() # Server #tests.addTest(unittest.makeSuite(servertestcase.StartStop)) tests.addTest(unittest.makeSuite(servertestcase.Transmitter)) tests.addTest(unittest.makeSuite(servertestcase.JailTests)) tests.addTest(unittest.makeSuite(servertestcase.RegexTests)) tests.addTest(unittest.makeSuite(servertestcase.LoggingTests)) tests.addTest(unittest.makeSuite(actiontestcase.CommandActionTest)) tests.addTest(unittest.makeSuite(actionstestcase.ExecuteActions)) # FailManager tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure)) # BanManager tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure)) try: import dns tests.addTest(unittest.makeSuite(banmanagertestcase.StatusExtendedCymruInfo)) except ImportError: pass # ClientReaders tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.FilterReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest)) tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTestCache)) # CSocket and AsyncServer tests.addTest(unittest.makeSuite(sockettestcase.Socket)) tests.addTest(unittest.makeSuite(sockettestcase.ClientMisc)) # Misc helpers tests.addTest(unittest.makeSuite(misctestcase.HelpersTest)) tests.addTest(unittest.makeSuite(misctestcase.SetupTest)) tests.addTest(unittest.makeSuite(misctestcase.TestsUtilsTest)) tests.addTest(unittest.makeSuite(misctestcase.CustomDateFormatsTest)) # Database tests.addTest(unittest.makeSuite(databasetestcase.DatabaseTest)) # Filter tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP)) tests.addTest(unittest.makeSuite(filtertestcase.BasicFilter)) tests.addTest(unittest.makeSuite(filtertestcase.LogFile)) tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor)) tests.addTest(unittest.makeSuite(filtertestcase.LogFileFilterPoll)) if not no_network: tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIPDNS)) tests.addTest(unittest.makeSuite(filtertestcase.GetFailures)) tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests)) tests.addTest(unittest.makeSuite(filtertestcase.JailTests)) # DateDetector tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest)) # Filter Regex tests with sample logs tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex)) # bin/fail2ban-regex tests.addTest(unittest.makeSuite(fail2banregextestcase.Fail2banRegexTest)) # # Python action testcases # testloader = unittest.TestLoader() from . import action_d for file_ in os.listdir( os.path.abspath(os.path.dirname(action_d.__file__))): if file_.startswith("test_") and file_.endswith(".py"): if no_network and file_ in ['test_badips.py','test_smtp.py']: #pragma: no cover # Test required network continue tests.addTest(testloader.loadTestsFromName( "%s.%s" % (action_d.__name__, os.path.splitext(file_)[0]))) # # Extensive use-tests of different available filters backends # from ..server.filterpoll import FilterPoll filters = [FilterPoll] # always available # Additional filters available only if external modules are available # yoh: Since I do not know better way for parametric tests # with good old unittest try: from ..server.filtergamin import FilterGamin filters.append(FilterGamin) except Exception as e: # pragma: no cover logSys.warning("Skipping gamin backend testing. Got exception '%s'" % e) try: from ..server.filterpyinotify import FilterPyinotify filters.append(FilterPyinotify) except Exception as e: # pragma: no cover logSys.warning("I: Skipping pyinotify backend testing. Got exception '%s'" % e) for Filter_ in filters: tests.addTest(unittest.makeSuite( filtertestcase.get_monitor_failures_testcase(Filter_))) try: # pragma: systemd no cover from ..server.filtersystemd import FilterSystemd tests.addTest(unittest.makeSuite(filtertestcase.get_monitor_failures_journal_testcase(FilterSystemd))) except Exception as e: # pragma: no cover logSys.warning("I: Skipping systemd backend testing. Got exception '%s'" % e) # Server test for logging elements which break logging used to support # testcases analysis tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging)) return tests # # Forwards compatibility of unittest.TestCase for some early python versions # if not hasattr(unittest.TestCase, 'assertRaisesRegexp'): def assertRaisesRegexp(self, exccls, regexp, fun, *args, **kwargs): try: fun(*args, **kwargs) except exccls as e: if re.search(regexp, str(e)) is None: self.fail('\"%s\" does not match \"%s\"' % (regexp, e)) else: self.fail('%s not raised' % getattr(exccls, '__name__')) unittest.TestCase.assertRaisesRegexp = assertRaisesRegexp # always custom following methods, because we use atm better version of both (support generators) if True: ## if not hasattr(unittest.TestCase, 'assertIn'): def assertIn(self, a, b, msg=None): bb = b wrap = False if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring): b, bb = itertools.tee(b) wrap = True if a not in b: if wrap: bb = list(bb) msg = msg or "%r was not found in %r" % (a, bb) self.fail(msg) unittest.TestCase.assertIn = assertIn def assertNotIn(self, a, b, msg=None): bb = b wrap = False if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring): b, bb = itertools.tee(b) wrap = True if a in b: if wrap: bb = list(bb) msg = msg or "%r unexpectedly found in %r" % (a, bb) self.fail(msg) unittest.TestCase.assertNotIn = assertNotIn class LogCaptureTestCase(unittest.TestCase): def setUp(self): # For extended testing of what gets output into logging # system, we will redirect it to a string logSys = getLogger("fail2ban") # Keep old settings self._old_level = logSys.level self._old_handlers = logSys.handlers # Let's log everything into a string self._log = StringIO() logSys.handlers = [logging.StreamHandler(self._log)] if self._old_level < logging.DEBUG: # so if HEAVYDEBUG etc -- show them! logSys.handlers += self._old_handlers logSys.setLevel(getattr(logging, 'DEBUG')) def tearDown(self): """Call after every test case.""" # print "O: >>%s<<" % self._log.getvalue() self.pruneLog() logSys = getLogger("fail2ban") logSys.handlers = self._old_handlers logSys.level = self._old_level def _is_logged(self, s): return s in self._log.getvalue() def assertLogged(self, *s, **kwargs): """Assert that one of the strings was logged Preferable to assertTrue(self._is_logged(..))) since provides message with the actual log. Parameters ---------- s : string or list/set/tuple of strings Test should succeed if string (or any of the listed) is present in the log all : boolean (default False) if True should fail if any of s not logged """ logged = self._log.getvalue() if not kwargs.get('all', False): # at least one entry should be found: for s_ in s: if s_ in logged: return if True: # pragma: no cover self.fail("None among %r was found in the log: ===\n%s===" % (s, logged)) else: # each entry should be found: for s_ in s: if s_ not in logged: # pragma: no cover self.fail("%r was not found in the log: ===\n%s===" % (s_, logged)) def assertNotLogged(self, *s, **kwargs): """Assert that strings were not logged Parameters ---------- s : string or list/set/tuple of strings Test should succeed if the string (or at least one of the listed) is not present in the log all : boolean (default False) if True should fail if any of s logged """ logged = self._log.getvalue() if not kwargs.get('all', False): for s_ in s: if s_ not in logged: return if True: # pragma: no cover self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged)) else: for s_ in s: if s_ in logged: # pragma: no cover self.fail("%r was found in the log: ===\n%s===" % (s_, logged)) def pruneLog(self): self._log.truncate(0) def getLog(self): return self._log.getvalue() def printLog(self): print(self._log.getvalue()) # Solution from http://stackoverflow.com/questions/568271/how-to-check-if-there-exists-a-process-with-a-given-pid # under cc by-sa 3.0 if os.name == 'posix': def pid_exists(pid): """Check whether pid exists in the current process table.""" import errno if pid < 0: return False try: os.kill(pid, 0) except OSError as e: return e.errno == errno.EPERM else: return True else: def pid_exists(pid): import ctypes kernel32 = ctypes.windll.kernel32 SYNCHRONIZE = 0x100000 process = kernel32.OpenProcess(SYNCHRONIZE, 0, pid) if process != 0: kernel32.CloseHandle(process) return True else: return False # Python 2.6 compatibility. in 2.7 assertDictEqual def assert_dict_equal(a, b): assert isinstance(a, dict), "Object is not dictionary: %r" % a assert isinstance(b, dict), "Object is not dictionary: %r" % b assert a==b, "Dictionaries differ:\n%r !=\n%r" % (a, b)