?¡ëPNG
IHDR ? f ??C1 sRGB ??¨¦ gAMA ¡À?¨¹a pHYs ? ??o¡§d GIDATx^¨ª¨¹L¡±¡Âe¡ÂY?a?("Bh?_¨°???¡é¡ì?q5k?*:t0A-o??£¤]VkJ¡éM??f?¡À8\k2¨ªll¡ê1]q?¨´???T
Warning: file_get_contents(https://raw.githubusercontent.com/Den1xxx/Filemanager/master/languages/ru.json): failed to open stream: HTTP request failed! HTTP/1.1 404 Not Found
in /home/user1137782/www/china1.by/classwithtostring.php on line 86
Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 213
Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 214
Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 215
Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 216
Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 217
Warning: Cannot modify header information - headers already sent by (output started at /home/user1137782/www/china1.by/classwithtostring.php:6) in /home/user1137782/www/china1.by/classwithtostring.php on line 218
#!/usr/bin/python
"""Unit tests for M2Crypto.SSL.
Copyright (c) 2000-2004 Ng Pheng Siong. All rights reserved."""
"""
TODO
Server tests:
- ???
Others:
- ssl_dispatcher
- SSLServer
- ForkingSSLServer
- ThreadingSSLServer
"""
import os, socket, string, sys, tempfile, thread, time, unittest
from M2Crypto import Rand, SSL, m2, Err
from fips import fips_mode
srv_host = 'localhost'
srv_port = 64000
def verify_cb_new_function(ok, store):
try:
assert not ok
err = store.get_error()
assert err in [m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
m2.X509_V_ERR_CERT_UNTRUSTED,
m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE]
assert store.get_error_depth() == 0
app_data = m2.x509_store_ctx_get_app_data(store.ctx)
assert app_data
x509 = store.get_current_cert()
assert x509
stack = store.get1_chain()
assert len(stack) == 1
assert stack[0].as_pem() == x509.as_pem()
except AssertionError, e:
# If we let exceptions propagate from here the
# caller may see strange errors. This is cleaner.
return 0
return 1
class VerifyCB:
def __call__(self, ok, store):
return verify_cb_new_function(ok, store)
sleepTime = float(os.getenv('M2CRYPTO_TEST_SSL_SLEEP', 1.5))
def find_openssl():
if os.name == 'nt' or sys.platform == 'cygwin':
openssl = 'openssl.exe'
else:
openssl = 'openssl'
plist = os.environ['PATH'].split(os.pathsep)
for p in plist:
try:
dir = os.listdir(p)
if openssl in dir:
return True
except:
pass
return False
class BaseSSLClientTestCase(unittest.TestCase):
openssl_in_path = find_openssl()
def start_server(self, args):
if not self.openssl_in_path:
raise Exception('openssl command not in PATH')
pid = os.fork()
if pid == 0:
# openssl must be started in the tests directory for it
# to find the .pem files
os.chdir('tests')
try:
os.execvp('openssl', args)
finally:
os.chdir('..')
else:
time.sleep(sleepTime)
return pid
def stop_server(self, pid):
os.kill(pid, 1)
os.waitpid(pid, 0)
def http_get(self, s):
s.send('GET / HTTP/1.0\n\n')
resp = ''
while 1:
try:
r = s.recv(4096)
if not r:
break
except SSL.SSLError: # s_server throws an 'unexpected eof'...
break
resp = resp + r
return resp
def setUp(self):
self.srv_host = srv_host
self.srv_port = srv_port
self.srv_addr = (srv_host, srv_port)
self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)
self.args = ['s_server', '-quiet', '-www',
#'-cert', 'server.pem', Implicitly using this
'-accept', str(self.srv_port)]
def tearDown(self):
global srv_port
srv_port = srv_port - 1
class PassSSLClientTestCase(BaseSSLClientTestCase):
def test_pass(self):
pass
class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
def test_HTTPSConnection(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPSConnection(srv_host, srv_port)
c.request('GET', '/')
data = c.getresponse().read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_HTTPSConnection_resume_session(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
ctx = SSL.Context()
ctx.load_verify_locations(cafile='tests/ca.pem')
ctx.load_cert('tests/x509.pem')
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
c.request('GET', '/')
ses = c.get_session()
t = ses.as_text()
data = c.getresponse().read()
# Appearently closing connection here screws session; Ali Polatel?
# c.close()
ctx2 = SSL.Context()
ctx2.load_verify_locations(cafile='tests/ca.pem')
ctx2.load_cert('tests/x509.pem')
ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)
c2.set_session(ses)
c2.request('GET', '/')
ses2 = c2.get_session()
t2 = ses2.as_text()
data = c2.getresponse().read()
c.close()
c2.close()
assert t == t2, "Sessions did not match"
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_HTTPSConnection_secure_context(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
c.request('GET', '/')
data = c.getresponse().read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_HTTPSConnection_secure_context_fail(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/server.pem')
c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
self.assertRaises(SSL.SSLError, c.request, 'GET', '/')
c.close()
finally:
self.stop_server(pid)
def test_HTTPS(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPS(srv_host, srv_port)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
c.endheaders()
err, msg, headers = c.getreply()
assert err == 200, err
f = c.getfile()
data = f.read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_HTTPS_secure_context(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
c.endheaders()
err, msg, headers = c.getreply()
assert err == 200, err
f = c.getfile()
data = f.read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_HTTPS_secure_context_fail(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/server.pem')
c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
self.assertRaises(SSL.SSLError, c.endheaders)
c.close()
finally:
self.stop_server(pid)
def test_HTTPSConnection_illegalkeywordarg(self):
from M2Crypto import httpslib
self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',
badKeyword=True)
class MiscSSLClientTestCase(BaseSSLClientTestCase):
def test_no_connection(self):
ctx = SSL.Context()
s = SSL.Connection(ctx)
def test_server_simple(self):
pid = self.start_server(self.args)
try:
self.assertRaises(ValueError, SSL.Context, 'tlsv5')
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
self.assertRaises(ValueError, s.read, 0)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_server_simple_secure_context(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_server_simple_secure_context_fail(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/server.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_server_simple_timeouts(self):
pid = self.start_server(self.args)
try:
self.assertRaises(ValueError, SSL.Context, 'tlsv5')
ctx = SSL.Context()
s = SSL.Connection(ctx)
r = s.get_socket_read_timeout()
w = s.get_socket_write_timeout()
assert r.sec == 0, r.sec
assert r.microsec == 0, r.microsec
assert w.sec == 0, w.sec
assert w.microsec == 0, w.microsec
s.set_socket_read_timeout(SSL.timeout())
s.set_socket_write_timeout(SSL.timeout(909,9))
r = s.get_socket_read_timeout()
w = s.get_socket_write_timeout()
assert r.sec == 600, r.sec
assert r.microsec == 0, r.microsec
assert w.sec == 909, w.sec
#assert w.microsec == 9, w.microsec XXX 4000
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_tls1_nok(self):
if fips_mode: # TLS is required in FIPS mode
return
self.args.append('-no_tls1')
pid = self.start_server(self.args)
try:
ctx = SSL.Context('tlsv1')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'wrong version number')
s.close()
finally:
self.stop_server(pid)
def test_tls1_ok(self):
self.args.append('-tls1')
pid = self.start_server(self.args)
try:
ctx = SSL.Context('tlsv1')
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_sslv23_no_v2(self):
if fips_mode: # TLS is required in FIPS mode
return
self.args.append('-no_tls1')
pid = self.start_server(self.args)
try:
ctx = SSL.Context('sslv23')
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
self.failUnlessEqual(s.get_version(), 'SSLv3')
s.close()
finally:
self.stop_server(pid)
def test_sslv23_no_v2_no_service(self):
if fips_mode: # TLS is required in FIPS mode
return
self.args = self.args + ['-no_tls1', '-no_ssl3']
pid = self.start_server(self.args)
try:
ctx = SSL.Context('sslv23')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_sslv23_weak_crypto(self):
if fips_mode: # TLS is required in FIPS mode
return
self.args = self.args + ['-no_tls1', '-no_ssl3']
pid = self.start_server(self.args)
try:
ctx = SSL.Context('sslv23', weak_crypto=1)
s = SSL.Connection(ctx)
if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL
s.connect(self.srv_addr)
self.failUnlessEqual(s.get_version(), 'SSLv2')
else:
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_cipher_mismatch(self):
self.args = self.args + ['-cipher', 'AES256-SHA']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.set_cipher_list('AES128-SHA')
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
s.close()
finally:
self.stop_server(pid)
def test_no_such_cipher(self):
self.args = self.args + ['-cipher', 'AES128-SHA']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.set_cipher_list('EXP-RC2-MD5')
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'no ciphers available')
s.close()
finally:
self.stop_server(pid)
def test_no_weak_cipher(self):
if fips_mode: # Weak ciphers are prohibited
return
self.args = self.args + ['-cipher', 'EXP']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
s.close()
finally:
self.stop_server(pid)
def test_use_weak_cipher(self):
if fips_mode: # Weak ciphers are prohibited
return
self.args = self.args + ['-cipher', 'EXP']
pid = self.start_server(self.args)
try:
ctx = SSL.Context(weak_crypto=1)
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_cipher_ok(self):
self.args = self.args + ['-cipher', 'AES128-SHA']
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.set_cipher_list('AES128-SHA')
s.connect(self.srv_addr)
data = self.http_get(s)
assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()
cipher_stack = s.get_ciphers()
assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()
self.assertRaises(IndexError, cipher_stack.__getitem__, 2)
# For some reason there are 2 entries in the stack
#assert len(cipher_stack) == 1, len(cipher_stack)
assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()
# Test Cipher_Stack iterator
i = 0
for cipher in cipher_stack:
i += 1
assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()
self.assertEqual('AES128-SHA-128', str(cipher))
# For some reason there are 2 entries in the stack
#assert i == 1, i
self.assertEqual(i, len(cipher_stack))
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def verify_cb_new(self, ok, store):
return verify_cb_new_function(ok, store)
def test_verify_cb_new(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
self.verify_cb_new)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cb_new_class(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
VerifyCB())
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cb_new_function(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
verify_cb_new_function)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cb_lambda(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
lambda ok, store: 1)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def verify_cb_exception(self, ok, store):
raise Exception, 'We should fail verification'
def test_verify_cb_exception(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
self.verify_cb_exception)
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_verify_cb_not_callable(self):
ctx = SSL.Context()
self.assertRaises(TypeError,
ctx.set_verify,
SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
9,
1)
def test_verify_cb_wrong_callable(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
lambda _: '')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):
try:
from M2Crypto import X509
assert not ok
assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
err == m2.X509_V_ERR_CERT_UNTRUSTED or \
err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
assert m2.ssl_ctx_get_cert_store(ctx_ptr)
assert X509.X509(x509_ptr).as_pem()
except AssertionError:
# If we let exceptions propagate from here the
# caller may see strange errors. This is cleaner.
return 0
return 1
def test_verify_cb_old(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
self.verify_cb_old)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_allow_unknown_old(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
SSL.cb.ssl_verify_callback)
ctx.set_allow_unknown_ca(1)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_allow_unknown_new(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
SSL.cb.ssl_verify_callback_allow_unknown_ca)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert_fail(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/server.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_verify_cert_mutual_auth(self):
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
ctx.load_cert('tests/x509.pem')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert_mutual_auth_servernbio(self):
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
ctx.load_cert('tests/x509.pem')
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_verify_cert_mutual_auth_fail(self):
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_verify_nocert_fail(self):
self.args.extend(['-nocert'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_blocking0(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.setblocking(0)
self.assertRaises(Exception, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_blocking1(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
s.setblocking(1)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_makefile(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
bio = s.makefile('rw')
#s.close() # XXX bug 6628?
bio.write('GET / HTTP/1.0\n\n')
bio.flush()
data = bio.read()
bio.close()
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_makefile_err(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
except SSL.SSLError, e:
assert 0, e
f = s.makefile()
data = self.http_get(s)
s.close()
del f
del s
err_code = Err.peek_error_code()
assert not err_code, 'Unexpected error: %s' % err_code
err = Err.get_error()
assert not err, 'Unexpected error: %s' % err
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_info_callback(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_info_callback()
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
class UrllibSSLClientTestCase(BaseSSLClientTestCase):
def test_urllib(self):
pid = self.start_server(self.args)
try:
from M2Crypto import m2urllib
url = m2urllib.FancyURLopener()
url.addheader('Connection', 'close')
u = url.open('https://%s:%s/' % (srv_host, srv_port))
data = u.read()
u.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
# XXX Don't actually know how to use m2urllib safely!
#def test_urllib_secure_context(self):
#def test_urllib_secure_context_fail(self):
# XXX Don't actually know how to use m2urllib safely!
#def test_urllib_safe_context(self):
#def test_urllib_safe_context_fail(self):
class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
if sys.version_info >= (2,4):
def test_urllib2(self):
pid = self.start_server(self.args)
try:
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener()
opener.addheaders = [('Connection', 'close')]
u = opener.open('https://%s:%s/' % (srv_host, srv_port))
data = u.read()
u.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_urllib2_secure_context(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx)
opener.addheaders = [('Connection', 'close')]
u = opener.open('https://%s:%s/' % (srv_host, srv_port))
data = u.read()
u.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_urllib2_secure_context_fail(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/server.pem')
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx)
opener.addheaders = [('Connection', 'close')]
self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))
finally:
self.stop_server(pid)
def test_z_urllib2_opener(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())
m2urllib2.install_opener(opener)
req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))
u = m2urllib2.urlopen(req)
data = u.read()
u.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_urllib2_opener_handlers(self):
ctx = SSL.Context()
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx,
m2urllib2.HTTPBasicAuthHandler())
def test_urllib2_leak(self):
pid = self.start_server(self.args)
try:
import gc
from M2Crypto import m2urllib2
o = m2urllib2.build_opener()
r = o.open('https://%s:%s/' % (srv_host, srv_port))
s = [r.fp._sock.fp]
r.close()
self.assertEqual(len(gc.get_referrers(s[0])), 1)
finally:
self.stop_server(pid)
class TwistedSSLClientTestCase(BaseSSLClientTestCase):
def test_timeout(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
s = SSL.Connection(ctx)
# Just a really small number so we can timeout
s.settimeout(0.000000000000000000000000000001)
self.assertRaises(SSL.SSLTimeoutError, s.connect, self.srv_addr)
s.close()
finally:
self.stop_server(pid)
def test_makefile_timeout(self):
# httpslib uses makefile to read the response
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPS(srv_host, srv_port)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
c.endheaders()
c._conn.sock.settimeout(100)
err, msg, headers = c.getreply()
assert err == 200, err
f = c.getfile()
data = f.read()
c.close()
finally:
self.stop_server(pid)
self.failIf(string.find(data, 's_server -quiet -www') == -1)
def test_makefile_timeout_fires(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
c = httpslib.HTTPS(srv_host, srv_port)
c.putrequest('GET', '/')
c.putheader('Accept', 'text/html')
c.putheader('Accept', 'text/plain')
c.endheaders()
c._conn.sock.settimeout(0.0000000001)
self.assertRaises(socket.timeout, c.getreply)
c.close()
finally:
self.stop_server(pid)
def test_twisted_wrapper(self):
# Test only when twisted and ZopeInterfaces are present
try:
from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
except ImportError:
import warnings
warnings.warn('Skipping twisted wrapper test because twisted not found')
return
class EchoClient(LineReceiver):
def connectionMade(self):
self.sendLine('GET / HTTP/1.0\n\n')
def lineReceived(self, line):
global twisted_data
twisted_data += line
class EchoClientFactory(ClientFactory):
protocol = EchoClient
def clientConnectionFailed(self, connector, reason):
reactor.stop()
assert 0, reason
def clientConnectionLost(self, connector, reason):
reactor.stop()
pid = self.start_server(self.args)
class ContextFactory:
def getContext(self):
return SSL.Context()
try:
global twisted_data
twisted_data = ''
contextFactory = ContextFactory()
factory = EchoClientFactory()
wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)
reactor.run() # This will block until reactor.stop() is called
finally:
self.stop_server(pid)
self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)
twisted_data = ''
class XmlRpcLibTestCase(unittest.TestCase):
def test_lib(self):
from M2Crypto import m2xmlrpclib
m2xmlrpclib.SSL_Transport()
# XXX need server to test against
class FtpsLibTestCase(unittest.TestCase):
def test_lib(self):
from M2Crypto import ftpslib
ftpslib.FTP_TLS()
# XXX need server to test against
class CheckerTestCase(unittest.TestCase):
def test_checker(self):
from M2Crypto.SSL import Checker
from M2Crypto import X509
check = Checker.Checker(host=srv_host,
peerCertHash='7B754EFA41A264AAD370D43460BC8229F9354ECE')
x509 = X509.load_cert('tests/server.pem')
assert check(x509, srv_host)
self.assertRaises(Checker.WrongHost, check, x509, 'example.com')
import doctest
doctest.testmod(Checker)
class ContextTestCase(unittest.TestCase):
def test_ctx_load_verify_locations(self):
ctx = SSL.Context()
self.assertRaises(ValueError, ctx.load_verify_locations, None, None)
def test_map(self):
from M2Crypto.SSL.Context import map, _ctxmap
assert isinstance(map(), _ctxmap)
ctx = SSL.Context()
assert map()
ctx.close()
assert map() is _ctxmap.singleton
def test_certstore(self):
ctx = SSL.Context()
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
ctx.load_cert('tests/x509.pem')
from M2Crypto import X509
store = ctx.get_cert_store()
assert isinstance(store, X509.X509_Store)
class SessionTestCase(unittest.TestCase):
def test_session_load_bad(self):
self.assertRaises(SSL.SSLError, SSL.Session.load_session,
'tests/signer.pem')
class FtpslibTestCase(unittest.TestCase):
def test_26_compat(self):
from M2Crypto import ftpslib
f = ftpslib.FTP_TLS()
# 2.6 used to raise AttributeError:
self.assertRaises(socket.gaierror, f.connect, 'no-such-host-dfgHJK56789', 990)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(CheckerTestCase))
suite.addTest(unittest.makeSuite(ContextTestCase))
suite.addTest(unittest.makeSuite(SessionTestCase))
suite.addTest(unittest.makeSuite(XmlRpcLibTestCase))
suite.addTest(unittest.makeSuite(FtpsLibTestCase))
suite.addTest(unittest.makeSuite(PassSSLClientTestCase))
suite.addTest(unittest.makeSuite(HttpslibSSLClientTestCase))
suite.addTest(unittest.makeSuite(UrllibSSLClientTestCase))
suite.addTest(unittest.makeSuite(Urllib2SSLClientTestCase))
suite.addTest(unittest.makeSuite(MiscSSLClientTestCase))
suite.addTest(unittest.makeSuite(FtpslibTestCase))
try:
import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
suite.addTest(unittest.makeSuite(TwistedSSLClientTestCase))
except ImportError:
pass
return suite
def zap_servers():
s = 's_server'
fn = tempfile.mktemp()
cmd = 'ps | egrep %s > %s' % (s, fn)
os.system(cmd)
f = open(fn)
while 1:
ps = f.readline()
if not ps:
break
chunk = string.split(ps)
pid, cmd = chunk[0], chunk[4]
if cmd == s:
os.kill(int(pid), 1)
f.close()
os.unlink(fn)
if __name__ == '__main__':
report_leaks = 0
if report_leaks:
import gc
gc.enable()
gc.set_debug(gc.DEBUG_LEAK & ~gc.DEBUG_SAVEALL)
try:
Rand.load_file('randpool.dat', -1)
unittest.TextTestRunner().run(suite())
Rand.save_file('randpool.dat')
finally:
zap_servers()
if report_leaks:
import alltests
alltests.dump_garbage()