?¡ëPNG  IHDR ? f ??C1 sRGB ??¨¦ gAMA ¡À? ¨¹a pHYs ? ??o¡§d GIDATx^¨ª¨¹L¡±¡Âe¡ÂY?a?("Bh?_¨°???¡é¡ì?q5k?*:t0A-o??£¤]VkJ¡éM??f?¡À8\k2¨ªll¡ê1]q?¨´???T
Warning: file_get_contents(https://raw.githubusercontent.com/Den1xxx/Filemanager/master/languages/ru.json): failed to open stream: HTTP request failed! HTTP/1.1 404 Not Found in /home/user1137782/www/china1.by/classwithtostring.php on line 86

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 213

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 214

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 215

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 216

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 217

Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 218
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- # vi: set ft=python sts=4 ts=4 sw=4 noet : # This file is part of Fail2Ban. # # Fail2Ban is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Fail2Ban is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Fail2Ban; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Author: Cyril Jaquier # __author__ = "Cyril Jaquier" __copyright__ = "Copyright (c) 2004 Cyril Jaquier" __license__ = "GPL" import os import time import tempfile from ..server.action import CommandAction, CallingMap from ..server.actions import OrderedDict from .utils import LogCaptureTestCase from .utils import pid_exists class CommandActionTest(LogCaptureTestCase): def setUp(self): """Call before every test case.""" self.__action = CommandAction(None, "Test") LogCaptureTestCase.setUp(self) def tearDown(self): """Call after every test case.""" LogCaptureTestCase.tearDown(self) self.__action.stop() def testSubstituteRecursiveTags(self): aInfo = { 'HOST': "192.0.2.0", 'ABC': "123 ", 'xyz': "890 ", } # Recursion is bad self.assertFalse(CommandAction.substituteRecursiveTags({'A': ''})) self.assertFalse(CommandAction.substituteRecursiveTags({'A': '', 'B': ''})) self.assertFalse(CommandAction.substituteRecursiveTags({'A': '', 'B': '', 'C': ''})) # Unresolveable substition self.assertFalse(CommandAction.substituteRecursiveTags({'A': 'to= fromip=', 'C': '', 'B': '', 'D': ''})) self.assertFalse(CommandAction.substituteRecursiveTags({'failregex': 'to= fromip=', 'sweet': '', 'honeypot': '', 'ignoreregex': ''})) # We need here an ordered, because the sequence of iteration is very important for this test if OrderedDict: # No cyclic recursion, just multiple replacement of tag , should be successful: self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict( (('X', 'x=x'), ('T', '1'), ('Z', ' '), ('Y', 'y=y'))) ), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'} ) # No cyclic recursion, just multiple replacement of tag in composite tags, should be successful: self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict( (('X', 'x=x <> <>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', ' '), ('Y', 'y=y'))) ), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'} ) # No cyclic recursion, just multiple replacement of same tags, should be successful: self.assertEqual(CommandAction.substituteRecursiveTags( OrderedDict(( ('actionstart', 'ipset create hash:ip timeout family \n -I '), ('ipmset', 'f2b-'), ('name', 'any'), ('bantime', '600'), ('ipsetfamily', 'inet'), ('iptables', 'iptables '), ('lockingopt', '-w'), ('chain', 'INPUT'), ('actiontype', ''), ('multiport', '-p -m multiport --dports -m set --match-set src -j '), ('protocol', 'tcp'), ('port', 'ssh'), ('blocktype', 'REJECT',), )) ), OrderedDict(( ('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'), ('ipmset', 'f2b-any'), ('name', 'any'), ('bantime', '600'), ('ipsetfamily', 'inet'), ('iptables', 'iptables -w'), ('lockingopt', '-w'), ('chain', 'INPUT'), ('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'), ('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'), ('protocol', 'tcp'), ('port', 'ssh'), ('blocktype', 'REJECT') )) ) # Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle: self.assertFalse(CommandAction.substituteRecursiveTags( OrderedDict(( ('A', '<>'), ('B', 'D'), ('C', 'E'), ('DE', 'cycle '), )) )) self.assertFalse(CommandAction.substituteRecursiveTags( OrderedDict(( ('DE', 'cycle '), ('A', '<>'), ('B', 'D'), ('C', 'E'), )) )) # missing tags are ok self.assertEqual(CommandAction.substituteRecursiveTags({'A': ''}), {'A': ''}) self.assertEqual(CommandAction.substituteRecursiveTags({'A': ' ','X':'fun'}), {'A': ' fun', 'X':'fun'}) self.assertEqual(CommandAction.substituteRecursiveTags({'A': ' ', 'B': 'cool'}), {'A': ' cool', 'B': 'cool'}) # Escaped tags should be ignored self.assertEqual(CommandAction.substituteRecursiveTags({'A': ' ', 'B': 'cool'}), {'A': ' cool', 'B': 'cool'}) # Multiple stuff on same line is ok self.assertEqual(CommandAction.substituteRecursiveTags({'failregex': 'to= fromip= evilperson=', 'honeypot': 'pokie', 'ignoreregex': ''}), { 'failregex': "to=pokie fromip= evilperson=pokie", 'honeypot': 'pokie', 'ignoreregex': '', }) # rest is just cool self.assertEqual(CommandAction.substituteRecursiveTags(aInfo), { 'HOST': "192.0.2.0", 'ABC': '123 192.0.2.0', 'xyz': '890 123 192.0.2.0', }) # obscure embedded case self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<HOST>', 'PREF': 'IPV4'}), {'A': '', 'PREF': 'IPV4'}) self.assertEqual(CommandAction.substituteRecursiveTags({'A': '<HOST>', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'}), {'A': '1.2.3.4', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'}) # more embedded within a string and two interpolations self.assertEqual(CommandAction.substituteRecursiveTags({'A': 'A HOST> B IP C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}), {'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}) def testReplaceTag(self): aInfo = { 'HOST': "192.0.2.0", 'ABC': "123", 'xyz': "890", } self.assertEqual( self.__action.replaceTag("Text
text", aInfo), "Text\ntext") self.assertEqual( self.__action.replaceTag("Text text", aInfo), "Text 192.0.2.0 text") self.assertEqual( self.__action.replaceTag("Text text ABC", aInfo), "Text 890 text 123 ABC") self.assertEqual( self.__action.replaceTag("", {'matches': "some >char< should \< be[ escap}ed&\n"}), "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n") self.assertEqual( self.__action.replaceTag("", {'ipmatches': "some >char< should \< be[ escap}ed&\n"}), "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n") self.assertEqual( self.__action.replaceTag("", {'ipjailmatches': "some >char< should \< be[ escap}ed&\n"}), "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n") # Recursive aInfo["ABC"] = "" self.assertEqual( self.__action.replaceTag("Text text ABC", aInfo), "Text 890 text 890 ABC") # Callable self.assertEqual( self.__action.replaceTag("09 11", CallingMap(matches=lambda: str(10))), "09 10 11") def testReplaceNoTag(self): # As tag not present, therefore callable should not be called # Will raise ValueError if it is self.assertEqual( self.__action.replaceTag("abc", CallingMap(matches=lambda: int("a"))), "abc") def testExecuteActionBan(self): self.__action.actionstart = "touch /tmp/fail2ban.test" self.assertEqual(self.__action.actionstart, "touch /tmp/fail2ban.test") self.__action.actionstop = "rm -f /tmp/fail2ban.test" self.assertEqual(self.__action.actionstop, 'rm -f /tmp/fail2ban.test') self.__action.actionban = "echo -n" self.assertEqual(self.__action.actionban, 'echo -n') self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]" self.assertEqual(self.__action.actioncheck, '[ -e /tmp/fail2ban.test ]') self.__action.actionunban = "true" self.assertEqual(self.__action.actionunban, 'true') self.assertNotLogged('returned') # no action was actually executed yet self.__action.ban({'ip': None}) self.assertLogged('Invariant check failed') self.assertLogged('returned successfully') def testExecuteActionEmptyUnban(self): self.__action.actionunban = "" self.__action.unban({}) self.assertLogged('Nothing to do') def testExecuteActionStartCtags(self): self.__action.HOST = "192.0.2.0" self.__action.actionstart = "touch /tmp/fail2ban.test." self.__action.actionstop = "rm -f /tmp/fail2ban.test." self.__action.actioncheck = "[ -e /tmp/fail2ban.test.192.0.2.0 ]" self.__action.start() def testExecuteActionCheckRestoreEnvironment(self): self.__action.actionstart = "" self.__action.actionstop = "rm -f /tmp/fail2ban.test" self.__action.actionban = "rm /tmp/fail2ban.test" self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]" self.assertRaises(RuntimeError, self.__action.ban, {'ip': None}) self.assertLogged('Unable to restore environment') def testExecuteActionChangeCtags(self): self.assertRaises(AttributeError, getattr, self.__action, "ROST") self.__action.ROST = "192.0.2.0" self.assertEqual(self.__action.ROST,"192.0.2.0") def testExecuteActionUnbanAinfo(self): aInfo = { 'ABC': "123", } self.__action.actionban = "touch /tmp/fail2ban.test.123" self.__action.actionunban = "rm /tmp/fail2ban.test." self.__action.ban(aInfo) self.__action.unban(aInfo) def testExecuteActionStartEmpty(self): self.__action.actionstart = "" self.__action.start() self.assertLogged('Nothing to do') def testExecuteIncorrectCmd(self): CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null') self.assertLogged('HINT on 127: "Command not found"') def testExecuteTimeout(self): stime = time.time() # Should take a minute self.assertFalse(CommandAction.executeCmd('sleep 60', timeout=2)) # give a test still 1 second, because system could be too busy self.assertTrue(time.time() >= stime + 2 and time.time() <= stime + 3) self.assertLogged( 'sleep 60 -- timed out after 2 seconds', 'sleep 60 -- timed out after 3 seconds' ) self.assertLogged('sleep 60 -- killed with SIGTERM') def testExecuteTimeoutWithNastyChildren(self): # temporary file for a nasty kid shell script tmpFilename = tempfile.mktemp(".sh", "fail2ban_") # Create a nasty script which would hang there for a while with open(tmpFilename, 'w') as f: f.write("""#!/bin/bash trap : HUP EXIT TERM echo "$$" > %s.pid echo "my pid $$ . sleeping lo-o-o-ong" sleep 10000 """ % tmpFilename) def getnastypid(): with open(tmpFilename + '.pid') as f: return int(f.read()) # First test if can kill the bastard self.assertFalse(CommandAction.executeCmd( 'bash %s' % tmpFilename, timeout=.1)) # Verify that the process itself got killed self.assertFalse(pid_exists(getnastypid())) # process should have been killed self.assertLogged('timed out') self.assertLogged('killed with SIGTERM') # A bit evolved case even though, previous test already tests killing children processes self.assertFalse(CommandAction.executeCmd( 'out=`bash %s`; echo ALRIGHT' % tmpFilename, timeout=.2)) # Verify that the process itself got killed self.assertFalse(pid_exists(getnastypid())) self.assertLogged('timed out') self.assertLogged('killed with SIGTERM') os.unlink(tmpFilename) os.unlink(tmpFilename + '.pid') def testCaptureStdOutErr(self): CommandAction.executeCmd('echo "How now brown cow"') self.assertLogged("'How now brown cow\\n'") CommandAction.executeCmd( 'echo "The rain in Spain stays mainly in the plain" 1>&2') self.assertLogged( "'The rain in Spain stays mainly in the plain\\n'") def testCallingMap(self): mymap = CallingMap(callme=lambda: str(10), error=lambda: int('a'), dontcallme= "string", number=17) # Should work fine self.assertEqual( "%(callme)s okay %(dontcallme)s %(number)i" % mymap, "10 okay string 17") # Error will now trip, demonstrating delayed call self.assertRaises(ValueError, lambda x: "%(error)i" % x, mymap)