--- a/src/modules/actions/signature.py Tue Mar 08 11:12:06 2016 -0800
+++ b/src/modules/actions/signature.py Wed Mar 09 11:27:23 2016 -0800
@@ -21,19 +21,24 @@
#
#
-# Copyright (c) 2009, 2015, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2009, 2016, Oracle and/or its affiliates. All rights reserved.
#
+import hashlib
import os
import shutil
import tempfile
+from cryptography.exceptions import InvalidSignature
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import serialization, hashes
+from cryptography.hazmat.primitives.asymmetric import padding
+
import generic
import pkg.actions
import pkg.client.api_errors as apx
import pkg.digest as digest
import pkg.misc as misc
-import M2Crypto as m2
valid_hash_algs = ("sha256", "sha384", "sha512")
valid_sig_algs = ("rsa",)
@@ -171,6 +176,14 @@
for attr in digest.DEFAULT_CHAIN_CHASH_ATTRS:
self.attrs[attr] = " ".join(chain_chshes[attr])
+ def __get_hash_by_name(self, name):
+ """Get the cryptopgraphy Hash() class based on the OpenSSL
+ algorithm name."""
+
+ for h in hashes.HashAlgorithm._abc_registry:
+ if h.name == name:
+ return h
+
def get_size(self):
res = generic.Action.get_size(self)
for s in self.attrs.get("chain.sizes", "").split():
@@ -418,11 +431,10 @@
# of the actions, not a signed value.
if self.hash is None:
assert self.sig_alg is None
- dgst = m2.EVP.MessageDigest(self.hash_alg)
- res = dgst.update(self.actions_to_str(acts, ver))
- assert res == 1, \
- "Res was expected to be 1, but was {0}".format(res)
- computed_hash = dgst.final()
+ h = hashlib.new(self.hash_alg)
+ h.update(misc.force_bytes(self.actions_to_str(
+ acts, ver)))
+ computed_hash = h.digest()
# The attrs value is stored in hex so that it's easy
# to read.
if misc.hex_to_binary(self.attrs["value"]) != \
@@ -456,15 +468,19 @@
e.act = self
raise
# Check that the certificate verifies against this signature.
- pub_key = cert.get_pubkey(md=self.hash_alg)
- pub_key.verify_init()
- pub_key.verify_update(self.actions_to_str(acts, ver))
- res = pub_key.verify_final(
- misc.hex_to_binary(self.attrs["value"]))
- if not res:
+ pub_key = cert.public_key()
+ hhash = self.__get_hash_by_name(self.hash_alg)
+ verifier = pub_key.verifier(
+ misc.hex_to_binary(self.attrs["value"]), padding.PKCS1v15(),
+ hhash())
+ verifier.update(self.actions_to_str(acts, ver))
+ try:
+ verifier.verify()
+ except InvalidSignature:
raise apx.UnverifiedSignature(self,
_("The signature value did not match the expected "
- "value. Res: {0}").format(res))
+ "value."))
+
return True
def set_signature(self, acts, key_path=None, chain_paths=misc.EmptyI,
@@ -495,13 +511,10 @@
# If no private key is set, then no certificate should
# have been given.
assert self.data is None
- dgst = m2.EVP.MessageDigest(self.hash_alg)
- res = dgst.update(self.actions_to_str(acts,
- generic.Action.sig_version))
- assert res == 1, \
- "Res was expected to be 1, it was {0}".format(res)
- self.attrs["value"] = \
- misc.binary_to_hex(dgst.final())
+ h = hashlib.new(self.hash_alg)
+ h.update(misc.force_bytes(self.actions_to_str(acts,
+ generic.Action.sig_version)))
+ self.attrs["value"] = h.hexdigest()
else:
# If a private key is used, then the certificate it's
# paired with must be provided.
@@ -509,20 +522,21 @@
self.__set_chain_certs_data(chain_paths, chash_dir)
try:
- priv_key = m2.RSA.load_key(key_path)
- except m2.RSA.RSAError:
+ with open(key_path, "rb") as f:
+ priv_key = serialization.load_pem_private_key(
+ f.read(), password=None,
+ backend=default_backend())
+ except ValueError:
raise apx.BadFileFormat(_("{0} was expected to "
"be a RSA key but could not be read "
"correctly.").format(key_path))
- signer = m2.EVP.PKey(md=self.hash_alg)
- signer.assign_rsa(priv_key, 1)
- del priv_key
- signer.sign_init()
- signer.sign_update(self.actions_to_str(acts,
- generic.Action.sig_version))
+ hhash = self.__get_hash_by_name(self.hash_alg)
+ signer = priv_key.signer(padding.PKCS1v15(), hhash())
+ signer.update(misc.force_bytes(self.actions_to_str(acts,
+ generic.Action.sig_version)))
self.attrs["value"] = \
- misc.binary_to_hex(signer.sign_final())
+ misc.binary_to_hex(signer.finalize())
def generate_indices(self):
"""Generates the indices needed by the search dictionary. See