33 import re |
33 import re |
34 import shutil |
34 import shutil |
35 import sys |
35 import sys |
36 import tempfile |
36 import tempfile |
37 import unittest |
37 import unittest |
|
38 |
|
39 from cryptography import x509 |
|
40 from cryptography.hazmat.backends import default_backend |
|
41 from cryptography.hazmat.primitives import serialization |
38 |
42 |
39 import pkg.actions as action |
43 import pkg.actions as action |
40 import pkg.actions.signature as signature |
44 import pkg.actions.signature as signature |
41 import pkg.client.api_errors as apx |
45 import pkg.client.api_errors as apx |
42 import pkg.digest as digest |
46 import pkg.digest as digest |
43 import pkg.facet as facet |
47 import pkg.facet as facet |
44 import pkg.fmri as fmri |
48 import pkg.fmri as fmri |
45 import pkg.misc as misc |
49 import pkg.misc as misc |
46 import pkg.portable as portable |
50 import pkg.portable as portable |
47 import M2Crypto as m2 |
|
48 |
51 |
49 from pkg.client.debugvalues import DebugValues |
52 from pkg.client.debugvalues import DebugValues |
50 from pkg.pkggzip import PkgGzipFile |
53 from pkg.pkggzip import PkgGzipFile |
51 |
54 |
52 try: |
55 try: |
1285 cert=os.path.join(self.cs_dir, |
1288 cert=os.path.join(self.cs_dir, |
1286 "cs5_ch1_ta3_cert.pem"), |
1289 "cs5_ch1_ta3_cert.pem"), |
1287 i1=os.path.join(self.chain_certs_dir, |
1290 i1=os.path.join(self.chain_certs_dir, |
1288 "ch1_ta3_cert.pem")) |
1291 "ch1_ta3_cert.pem")) |
1289 self.pkgsign(self.rurl1, sign_args) |
1292 self.pkgsign(self.rurl1, sign_args) |
1290 |
1293 self.pkg_image_create(self.rurl1) |
1291 self.pkg_image_create(self.rurl1) |
1294 self.seed_ta_dir("ta3") |
1292 self.seed_ta_dir("ta3") |
|
1293 |
|
1294 self.pkg("set-property signature-policy verify") |
1295 self.pkg("set-property signature-policy verify") |
1295 api_obj = self.get_img_api_obj() |
1296 api_obj = self.get_img_api_obj() |
1296 self.assertRaises(apx.UnsupportedExtensionValue, |
1297 self.assertRaises(apx.UnsupportedExtensionValue, |
1297 self._api_install, api_obj, ["example_pkg"]) |
1298 self._api_install, api_obj, ["example_pkg"]) |
1298 # Tests that the cli can handle an UnsupportedCriticalExtension. |
1299 # Tests that the cli can handle an UnsupportedExtensionValue. |
1299 self.pkg("install example_pkg", exit=1) |
1300 self.pkg("install example_pkg", exit=1) |
1300 self.pkg("set-property signature-policy ignore") |
1301 self.pkg("set-property signature-policy ignore") |
1301 self.pkg("set-publisher --set-property signature-policy=ignore " |
1302 self.pkg("set-publisher --set-property signature-policy=ignore " |
1302 "test") |
1303 "test") |
1303 api_obj = self.get_img_api_obj() |
1304 api_obj = self.get_img_api_obj() |
1306 def test_unknown_value_for_critical_extension(self): |
1307 def test_unknown_value_for_critical_extension(self): |
1307 """Test that an unknown value for a recognized critical |
1308 """Test that an unknown value for a recognized critical |
1308 extension causes an exception to be raised.""" |
1309 extension causes an exception to be raised.""" |
1309 |
1310 |
1310 plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10) |
1311 plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10) |
1311 sign_args = "-k {key} -c {cert} {name}".format( |
1312 sign_args = "-k {key} -c {cert} -i {i1} {name}".format( |
1312 name=plist[0], |
1313 name=plist[0], |
1313 key=os.path.join(self.keys_dir, |
1314 key=os.path.join(self.keys_dir, |
1314 "cs6_ch1_ta3_key.pem"), |
1315 "cs6_ch1_ta3_key.pem"), |
1315 cert=os.path.join(self.cs_dir, |
1316 cert=os.path.join(self.cs_dir, |
1316 "cs6_ch1_ta3_cert.pem"), |
1317 "cs6_ch1_ta3_cert.pem"), |
1323 |
1324 |
1324 self.pkg("set-property signature-policy verify") |
1325 self.pkg("set-property signature-policy verify") |
1325 api_obj = self.get_img_api_obj() |
1326 api_obj = self.get_img_api_obj() |
1326 self.assertRaises(apx.UnsupportedExtensionValue, |
1327 self.assertRaises(apx.UnsupportedExtensionValue, |
1327 self._api_install, api_obj, ["example_pkg"]) |
1328 self._api_install, api_obj, ["example_pkg"]) |
|
1329 # Tests that the cli can handle an UnsupportedExtensionValue. |
|
1330 self.pkg("install example_pkg", exit=1) |
|
1331 self.pkg("set-property signature-policy ignore") |
|
1332 self.pkg("set-publisher --set-property signature-policy=ignore " |
|
1333 "test") |
|
1334 api_obj = self.get_img_api_obj() |
|
1335 self._api_install(api_obj, ["example_pkg"]) |
|
1336 |
|
1337 def test_invalid_extension_1(self): |
|
1338 """Test that an invalid value in the extension causes an |
|
1339 exception to be raised.""" |
|
1340 |
|
1341 plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10) |
|
1342 sign_args = "-k {key} -c {cert} -i {i1} {name}".format( |
|
1343 name=plist[0], |
|
1344 key=os.path.join(self.keys_dir, |
|
1345 "cs9_ch1_ta3_key.pem"), |
|
1346 cert=os.path.join(self.cs_dir, |
|
1347 "cs9_ch1_ta3_cert.pem"), |
|
1348 i1=os.path.join(self.chain_certs_dir, |
|
1349 "ch1_ta3_cert.pem")) |
|
1350 self.pkgsign(self.rurl1, sign_args) |
|
1351 |
|
1352 self.pkg_image_create(self.rurl1) |
|
1353 self.seed_ta_dir("ta3") |
|
1354 |
|
1355 self.pkg("set-property signature-policy verify") |
|
1356 api_obj = self.get_img_api_obj() |
|
1357 self.assertRaises(apx.InvalidCertificateExtensions, |
|
1358 self._api_install, api_obj, ["example_pkg"]) |
|
1359 # Tests that the cli can handle an InvalidCertificateExtensions. |
|
1360 self.pkg("install example_pkg", exit=1) |
|
1361 self.pkg("set-property signature-policy ignore") |
|
1362 self.pkg("set-publisher --set-property signature-policy=ignore " |
|
1363 "test") |
|
1364 api_obj = self.get_img_api_obj() |
|
1365 self._api_install(api_obj, ["example_pkg"]) |
|
1366 |
|
1367 def test_invalid_extension_2(self): |
|
1368 """Test that a critical extension that Cryptography can't |
|
1369 understand causes an exception to be raised.""" |
|
1370 |
|
1371 plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10) |
|
1372 sign_args = "-k {key} -c {cert} {name}".format( |
|
1373 name=plist[0], |
|
1374 key=os.path.join(self.keys_dir, |
|
1375 "cust_key.pem"), |
|
1376 cert=os.path.join(self.cs_dir, |
|
1377 "cust_cert.pem")) |
|
1378 self.pkgsign(self.rurl1, sign_args) |
|
1379 |
|
1380 self.pkg_image_create(self.rurl1) |
|
1381 self.seed_ta_dir("cust") |
|
1382 |
|
1383 self.pkg("set-property signature-policy verify") |
|
1384 api_obj = self.get_img_api_obj() |
|
1385 self.assertRaises(apx.InvalidCertificateExtensions, |
|
1386 self._api_install, api_obj, ["example_pkg"]) |
|
1387 # Tests that the cli can handle an InvalidCertificateExtensions. |
|
1388 self.pkg("install example_pkg", exit=1) |
1328 self.pkg("set-property signature-policy ignore") |
1389 self.pkg("set-property signature-policy ignore") |
1329 self.pkg("set-publisher --set-property signature-policy=ignore " |
1390 self.pkg("set-publisher --set-property signature-policy=ignore " |
1330 "test") |
1391 "test") |
1331 api_obj = self.get_img_api_obj() |
1392 api_obj = self.get_img_api_obj() |
1332 self._api_install(api_obj, ["example_pkg"]) |
1393 self._api_install(api_obj, ["example_pkg"]) |
1470 self._api_install(api_obj, ["example_pkg"]) |
1531 self._api_install(api_obj, ["example_pkg"]) |
1471 |
1532 |
1472 def test_crl_0(self): |
1533 def test_crl_0(self): |
1473 """Test that the X509 CRL revocation works correctly.""" |
1534 """Test that the X509 CRL revocation works correctly.""" |
1474 |
1535 |
1475 crl = m2.X509.load_crl(os.path.join(self.crl_dir, |
1536 with open(os.path.join(self.crl_dir, "ch1_ta4_crl.pem"), |
1476 "ch1_ta4_crl.pem")) |
1537 "rb") as f: |
1477 revoked_cert = m2.X509.load_cert(os.path.join(self.cs_dir, |
1538 crl = x509.load_pem_x509_crl( |
1478 "cs1_ch1_ta4_cert.pem")) |
1539 f.read(), default_backend()) |
1479 assert crl.is_revoked(revoked_cert)[0] |
1540 |
|
1541 with open(os.path.join(self.cs_dir, |
|
1542 "cs1_ch1_ta4_cert.pem"), "rb") as f: |
|
1543 cert = x509.load_pem_x509_certificate( |
|
1544 f.read(), default_backend()) |
|
1545 |
|
1546 self.assertTrue(crl.issuer == cert.issuer) |
|
1547 for rev in crl: |
|
1548 if rev.serial_number == cert.serial: |
|
1549 break |
|
1550 else: |
|
1551 self.assertTrue(False, "Can not find revoked " |
|
1552 "certificate in CRL!") |
1480 |
1553 |
1481 def test_bogus_inter_certs(self): |
1554 def test_bogus_inter_certs(self): |
1482 """Test that if SignatureAction.set_signature is given invalid |
1555 """Test that if SignatureAction.set_signature is given invalid |
1483 paths to intermediate certs, it errors as expected. This |
1556 paths to intermediate certs, it errors as expected. This |
1484 cannot be tested from the command line because the command |
1557 cannot be tested from the command line because the command |
2264 api_obj = self.get_img_api_obj() |
2337 api_obj = self.get_img_api_obj() |
2265 self._api_install(api_obj, ["example_pkg"]) |
2338 self._api_install(api_obj, ["example_pkg"]) |
2266 |
2339 |
2267 def test_small_pathlen(self): |
2340 def test_small_pathlen(self): |
2268 """Test that a chain cert which has a smaller pathlen value than |
2341 """Test that a chain cert which has a smaller pathlen value than |
2269 is needed is allowed.""" |
2342 is needed is disallowed.""" |
2270 |
2343 |
2271 plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10) |
2344 plist = self.pkgsend_bulk(self.rurl1, self.example_pkg10) |
2272 sign_args = "-k {key} -c {cert} -i {i1} -i {i2} " \ |
2345 sign_args = "-k {key} -c {cert} -i {i1} -i {i2} " \ |
2273 "-i {i3} -i {i4} -i {i5} {pkg}".format(**{ |
2346 "-i {i3} -i {i4} -i {i5} {pkg}".format(**{ |
2274 "key": os.path.join(self.keys_dir, |
2347 "key": os.path.join(self.keys_dir, |
2932 # of the transformed certificate file, as if pkgrecv had put it |
3005 # of the transformed certificate file, as if pkgrecv had put it |
2933 # there itself. |
3006 # there itself. |
2934 repo_location = self.dcs[1].get_repodir() |
3007 repo_location = self.dcs[1].get_repodir() |
2935 cache_dir = os.path.join(self.test_root, "cache") |
3008 cache_dir = os.path.join(self.test_root, "cache") |
2936 os.mkdir(cache_dir) |
3009 os.mkdir(cache_dir) |
2937 cert = m2.X509.load_cert(cert_path) |
3010 |
|
3011 with open(cert_path, "rb") as f: |
|
3012 cert = x509.load_pem_x509_certificate( |
|
3013 f.read(), default_backend()) |
|
3014 |
2938 fd, new_cert = tempfile.mkstemp(dir=self.test_root) |
3015 fd, new_cert = tempfile.mkstemp(dir=self.test_root) |
2939 with os.fdopen(fd, "wb") as fh: |
3016 with os.fdopen(fd, "wb") as fh: |
2940 fh.write(cert.as_pem()) |
3017 fh.write(cert.public_bytes( |
|
3018 serialization.Encoding.PEM)) |
2941 |
3019 |
2942 # the file-store uses the least-preferred hash when storing |
3020 # the file-store uses the least-preferred hash when storing |
2943 # content |
3021 # content |
2944 alg = digest.HASH_ALGS[digest.REVERSE_RANKED_HASH_ATTRS[0]] |
3022 alg = digest.HASH_ALGS[digest.REVERSE_RANKED_HASH_ATTRS[0]] |
2945 file_name = misc.get_data_digest(new_cert, |
3023 file_name = misc.get_data_digest(new_cert, |
2946 hash_func=alg)[0] |
3024 hash_func=alg)[0] |
2947 subdir = os.path.join(cache_dir, file_name[:2]) |
3025 subdir = os.path.join(cache_dir, file_name[:2]) |
2948 os.mkdir(subdir) |
3026 os.mkdir(subdir) |
2949 fp = os.path.join(subdir, file_name) |
3027 fp = os.path.join(subdir, file_name) |
2950 fh = PkgGzipFile(fp, "wb") |
3028 fh = PkgGzipFile(fp, "wb") |
2951 fh.write(cert.as_pem()) |
3029 fh.write(cert.public_bytes(serialization.Encoding.PEM)) |
2952 fh.close() |
3030 fh.close() |
2953 |
3031 |
2954 self.pkgrecv(self.rurl2, "-c {0} -d {1} '*'".format( |
3032 self.pkgrecv(self.rurl2, "-c {0} -d {1} '*'".format( |
2955 cache_dir, self.rurl1)) |
3033 cache_dir, self.rurl1)) |
2956 |
3034 |
2980 # of the transformed certificate file, as if pkgrecv had put it |
3058 # of the transformed certificate file, as if pkgrecv had put it |
2981 # there itself. |
3059 # there itself. |
2982 repo_location = self.dcs[1].get_repodir() |
3060 repo_location = self.dcs[1].get_repodir() |
2983 cache_dir = os.path.join(self.test_root, "cache") |
3061 cache_dir = os.path.join(self.test_root, "cache") |
2984 os.mkdir(cache_dir) |
3062 os.mkdir(cache_dir) |
2985 cert = m2.X509.load_cert(ta_path) |
3063 |
|
3064 with open(ta_path, "rb") as f: |
|
3065 cert = x509.load_pem_x509_certificate( |
|
3066 f.read(), default_backend()) |
|
3067 |
2986 fd, new_cert = tempfile.mkstemp(dir=self.test_root) |
3068 fd, new_cert = tempfile.mkstemp(dir=self.test_root) |
2987 with os.fdopen(fd, "wb") as fh: |
3069 with os.fdopen(fd, "wb") as fh: |
2988 fh.write(cert.as_pem()) |
3070 fh.write(cert.public_bytes( |
|
3071 serialization.Encoding.PEM)) |
|
3072 |
2989 for attr in digest.DEFAULT_HASH_ATTRS: |
3073 for attr in digest.DEFAULT_HASH_ATTRS: |
2990 alg = digest.HASH_ALGS[attr] |
3074 alg = digest.HASH_ALGS[attr] |
2991 file_name = misc.get_data_digest(new_cert, |
3075 file_name = misc.get_data_digest(new_cert, |
2992 hash_func=alg)[0] |
3076 hash_func=alg)[0] |
2993 subdir = os.path.join(cache_dir, file_name[:2]) |
3077 subdir = os.path.join(cache_dir, file_name[:2]) |
2994 os.mkdir(subdir) |
3078 os.mkdir(subdir) |
2995 fp = os.path.join(subdir, file_name) |
3079 fp = os.path.join(subdir, file_name) |
2996 fh = PkgGzipFile(fp, "wb") |
3080 fh = PkgGzipFile(fp, "wb") |
2997 fh.write(cert.as_pem()) |
3081 fh.write(cert.public_bytes( |
|
3082 serialization.Encoding.PEM)) |
2998 fh.close() |
3083 fh.close() |
2999 |
3084 |
3000 self.pkgrecv(self.rurl2, "-c {0} -d {1} '*'".format( |
3085 self.pkgrecv(self.rurl2, "-c {0} -d {1} '*'".format( |
3001 cache_dir, self.rurl1)) |
3086 cache_dir, self.rurl1)) |
3002 |
3087 |