From 79f85f4237c46e14716e7a0eb24f50aa5f5b0cb9 Mon Sep 17 00:00:00 2001 From: Jeremiah Mackey Date: Thu, 2 Apr 2026 16:56:51 +0000 Subject: [PATCH 1/5] Fix cipher logic and validation --- wolfcrypt/ciphers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wolfcrypt/ciphers.py b/wolfcrypt/ciphers.py index 105224e..2b6bb23 100644 --- a/wolfcrypt/ciphers.py +++ b/wolfcrypt/ciphers.py @@ -183,7 +183,7 @@ def encrypt(self, string): raise ValueError( "empty string not allowed") - if len(string) % self.block_size and not self.mode == MODE_CTR and not "ChaCha" in self._native_type: + if len(string) % self.block_size and "ChaCha" not in self._native_type and self.mode != MODE_CTR: raise ValueError( "string must be a multiple of %d in length" % self.block_size) @@ -216,7 +216,7 @@ def decrypt(self, string): raise ValueError( "empty string not allowed") - if len(string) % self.block_size and not self.mode == MODE_CTR and not "ChaCha" in self._native_type: + if len(string) % self.block_size and "ChaCha" not in self._native_type and self.mode != MODE_CTR: raise ValueError( "string must be a multiple of %d in length" % self.block_size) From 9a031dfc7fc6bcf37a49130650c9429488ec3b99 Mon Sep 17 00:00:00 2001 From: Jeremiah Mackey Date: Thu, 2 Apr 2026 16:57:29 +0000 Subject: [PATCH 2/5] Fix error handling gaps --- scripts/build_ffi.py | 2 +- wolfcrypt/__init__.py | 1 + wolfcrypt/hkdf.py | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/scripts/build_ffi.py b/scripts/build_ffi.py index 4d5ad22..9ec4cd6 100644 --- a/scripts/build_ffi.py +++ b/scripts/build_ffi.py @@ -332,7 +332,7 @@ def get_features(local_wolfssl, features): for d in include_dirs: if not os.path.exists(d): - e = "Invalid wolfSSL include dir: .".format(d) + e = "Invalid wolfSSL include dir: {}.".format(d) raise FileNotFoundError(e) options = os.path.join(d, "wolfssl", "options.h") diff --git a/wolfcrypt/__init__.py b/wolfcrypt/__init__.py index 6879589..e2752a8 100644 --- a/wolfcrypt/__init__.py +++ b/wolfcrypt/__init__.py @@ -46,6 +46,7 @@ if top_level_py not in ["setup.py", "build_ffi.py"]: from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib + from wolfcrypt.exceptions import WolfCryptError if hasattr(_lib, 'WC_RNG_SEED_CB_ENABLED'): if _lib.WC_RNG_SEED_CB_ENABLED: diff --git a/wolfcrypt/hkdf.py b/wolfcrypt/hkdf.py index 6bcf27e..e8580cc 100644 --- a/wolfcrypt/hkdf.py +++ b/wolfcrypt/hkdf.py @@ -34,7 +34,7 @@ def HKDF(hash_cls, in_key, salt=None, info=None, out_len=None): Perform HKDF Extract-and-Expand in one call (wraps wc_HKDF). Parameters: - - hash_cls: hash class, see `wolfcrypt.hashes`. + - hash_cls: HMAC class, e.g. HmacSha256, see `wolfcrypt.hashes`. - in_key: input key material (IKM) as bytes or str. - salt: optional salt value (bytes or str). If None, treated as empty. - info: optional context/application info (bytes or str). If None, @@ -79,7 +79,7 @@ def HKDF_Extract(hash_cls, salt, in_key): Wraps wc_HKDF_Extract. Parameters: - - hash_cls: hash class, see `wolfcrypt.hashes`. + - hash_cls: HMAC class, e.g. HmacSha256, see `wolfcrypt.hashes`. - salt: bytes/str (can be None -> treated as empty). - in_key: input key material (IKM) as bytes/str. @@ -106,7 +106,7 @@ def HKDF_Expand(hash_cls, prk, info, out_len): Wraps wc_HKDF_Expand. Parameters: - - hash_cls: hash class, see `wolfcrypt.hashes`. + - hash_cls: HMAC class, e.g. HmacSha256, see `wolfcrypt.hashes`. - prk: pseudorandom key (output from HKDF-Extract) as bytes/str. - info: optional context/application info (bytes/str). If None, treated as empty. - out_len: length of output keying material in bytes. From c23165b59943dcd16c18a31cdd934db0b8a9a7f9 Mon Sep 17 00:00:00 2001 From: Jeremiah Mackey Date: Thu, 2 Apr 2026 16:59:56 +0000 Subject: [PATCH 3/5] Fix resource leaks --- scripts/build_ffi.py | 2 ++ wolfcrypt/asn.py | 4 +++- wolfcrypt/ciphers.py | 26 +++++++++++++++++++------- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/scripts/build_ffi.py b/scripts/build_ffi.py index 9ec4cd6..169f8fe 100644 --- a/scripts/build_ffi.py +++ b/scripts/build_ffi.py @@ -649,6 +649,7 @@ def build_ffi(local_wolfssl, features): word32 sz, const byte* authIn, word32 authInSz); int wc_AesGcmDecryptFinal(Aes* aes, const byte* authTag, word32 authTagSz); + void wc_AesFree(Aes* aes); """ if features["AES"] and features["AES_SIV"]: @@ -961,6 +962,7 @@ def build_ffi(local_wolfssl, features): int wc_PemToDer(const unsigned char* buff, long longSz, int type, DerBuffer** pDer, void* heap, EncryptedInfo* info, int* keyFormat); + void wc_FreeDer(DerBuffer** pDer); int wc_DerToPemEx(const byte* der, word32 derSz, byte* output, word32 outSz, byte *cipher_info, int type); """ diff --git a/wolfcrypt/asn.py b/wolfcrypt/asn.py index cf8b002..4d66ae4 100644 --- a/wolfcrypt/asn.py +++ b/wolfcrypt/asn.py @@ -42,7 +42,9 @@ def pem_to_der(pem, pem_type): err = "Error converting from PEM to DER. ({})".format(ret) raise WolfCryptError(err) - return _ffi.buffer(der[0][0].buffer, der[0][0].length)[:] + result = _ffi.buffer(der[0][0].buffer, der[0][0].length)[:] + _lib.wc_FreeDer(der) + return result def der_to_pem(der, pem_type): pem_length = _lib.wc_DerToPemEx(der, len(der), _ffi.NULL, 0, _ffi.NULL, diff --git a/wolfcrypt/ciphers.py b/wolfcrypt/ciphers.py index 2b6bb23..86d931b 100644 --- a/wolfcrypt/ciphers.py +++ b/wolfcrypt/ciphers.py @@ -191,6 +191,7 @@ def encrypt(self, string): self._enc = _ffi.new(self._native_type) ret = self._set_key(_ENCRYPTION) if ret < 0: # pragma: no cover + self._enc = None raise WolfCryptError("Invalid key error (%d)" % ret) result = _ffi.new("byte[%d]" % len(string)) @@ -224,6 +225,7 @@ def decrypt(self, string): self._dec = _ffi.new(self._native_type) ret = self._set_key(_DECRYPTION) if ret < 0: # pragma: no cover + self._dec = None raise WolfCryptError("Invalid key error (%d)" % ret) result = _ffi.new("byte[%d]" % len(string)) @@ -406,11 +408,16 @@ def __init__(self, key, IV, tag_bytes=16): raise ValueError("key must be %s in length, not %d" % (self._key_sizes, len(key))) self._native_object = _ffi.new(self._native_type) - _lib.wc_AesInit(self._native_object, _ffi.NULL, -2) + ret = _lib.wc_AesInit(self._native_object, _ffi.NULL, -2) + if ret < 0: + raise WolfCryptError("AES init error (%d)" % ret) ret = _lib.wc_AesGcmInit(self._native_object, key, len(key), IV, len(IV)) if ret < 0: raise WolfCryptError("Init error (%d)" % ret) + def __del__(self): + _lib.wc_AesFree(self._native_object) + def set_aad(self, data): """ Set the additional authentication data for the stream @@ -498,10 +505,11 @@ def __init__(self, key="", size=32): self._dec = None self._key = None if len(key) > 0: - if not size in self._key_sizes: - raise ValueError("Invalid key size %d" % size) self._key = t2b(key) - self.key_size = size + if len(self._key) not in self._key_sizes: + raise ValueError("key must be %s in length, not %d" % + (self._key_sizes, len(self._key))) + self.key_size = len(self._key) self._IV_nonce = [] self._IV_counter = 0 @@ -511,13 +519,13 @@ def _set_key(self, direction): if self._enc: ret = _lib.wc_Chacha_SetKey(self._enc, self._key, len(self._key)) if ret == 0: - _lib.wc_Chacha_SetIV(self._enc, self._IV_nonce, self._IV_counter) + ret = _lib.wc_Chacha_SetIV(self._enc, self._IV_nonce, self._IV_counter) if ret != 0: return ret if self._dec: ret = _lib.wc_Chacha_SetKey(self._dec, self._key, len(self._key)) if ret == 0: - _lib.wc_Chacha_SetIV(self._dec, self._IV_nonce, self._IV_counter) + ret = _lib.wc_Chacha_SetIV(self._dec, self._IV_nonce, self._IV_counter) if ret != 0: return ret return 0 @@ -628,6 +636,11 @@ class Des3(_Cipher): key_size = 24 _native_type = "Des3 *" + def __init__(self, key, mode, IV=None): + if mode != MODE_CBC: + raise ValueError("Des3 only supports MODE_CBC") + super().__init__(key, mode, IV) + def _set_key(self, direction): if direction == _ENCRYPTION: return _lib.wc_Des3_SetKey(self._enc, self._key, @@ -2024,7 +2037,6 @@ def decapsulate(self, ct): ) if ret < 0: # pragma: no cover - self.native_object = None raise WolfCryptError("wc_KyberKey_Decapsulate() error (%d)" % ret) return _ffi.buffer(ss, ss_size)[:] From 0b94b0c1234dcd75b455eda5751c80ef697086a5 Mon Sep 17 00:00:00 2001 From: Jeremiah Mackey Date: Fri, 3 Apr 2026 15:50:28 +0000 Subject: [PATCH 4/5] Fix low severity static analysis issues --- scripts/build_ffi.py | 17 +++++ wolfcrypt/asn.py | 12 ++-- wolfcrypt/ciphers.py | 155 ++++++++++++++++++++++++++++--------------- wolfcrypt/hashes.py | 30 +++++++++ 4 files changed, 154 insertions(+), 60 deletions(-) diff --git a/scripts/build_ffi.py b/scripts/build_ffi.py index 169f8fe..a2d2b57 100644 --- a/scripts/build_ffi.py +++ b/scripts/build_ffi.py @@ -496,6 +496,7 @@ def build_ffi(local_wolfssl, features): int ML_KEM_ENABLED = """ + str(features["ML_KEM"]) + """; int ML_DSA_ENABLED = """ + str(features["ML_DSA"]) + """; int HKDF_ENABLED = """ + str(features["HKDF"]) + """; + int ERROR_STRINGS_ENABLED = """ + str(features["ERROR_STRINGS"]) + """; """ ffibuilder.set_source( "wolfcrypt._ffi", init_source_string, @@ -535,6 +536,7 @@ def build_ffi(local_wolfssl, features): extern int ML_KEM_ENABLED; extern int ML_DSA_ENABLED; extern int HKDF_ENABLED; + extern int ERROR_STRINGS_ENABLED; typedef unsigned char byte; typedef unsigned int word32; @@ -558,6 +560,7 @@ def build_ffi(local_wolfssl, features): typedef struct { ...; } mp_int; int mp_init (mp_int * a); + void mp_clear (mp_int * a); int mp_to_unsigned_bin (mp_int * a, unsigned char *b); int mp_to_unsigned_bin_len (mp_int * a, unsigned char *b, int c); int mp_read_unsigned_bin (mp_int * a, const unsigned char *b, int c); @@ -569,6 +572,7 @@ def build_ffi(local_wolfssl, features): int wc_InitSha(wc_Sha*); int wc_ShaUpdate(wc_Sha*, const byte*, word32); int wc_ShaFinal(wc_Sha*, byte*); + void wc_ShaFree(wc_Sha*); """ if features["SHA256"]: @@ -577,6 +581,7 @@ def build_ffi(local_wolfssl, features): int wc_InitSha256(wc_Sha256*); int wc_Sha256Update(wc_Sha256*, const byte*, word32); int wc_Sha256Final(wc_Sha256*, byte*); + void wc_Sha256Free(wc_Sha256*); """ if features["SHA384"]: @@ -585,6 +590,7 @@ def build_ffi(local_wolfssl, features): int wc_InitSha384(wc_Sha384*); int wc_Sha384Update(wc_Sha384*, const byte*, word32); int wc_Sha384Final(wc_Sha384*, byte*); + void wc_Sha384Free(wc_Sha384*); """ if features["SHA512"]: @@ -594,6 +600,7 @@ def build_ffi(local_wolfssl, features): int wc_InitSha512(wc_Sha512*); int wc_Sha512Update(wc_Sha512*, const byte*, word32); int wc_Sha512Final(wc_Sha512*, byte*); + void wc_Sha512Free(wc_Sha512*); """ if features["SHA3"]: cdef += """ @@ -610,6 +617,10 @@ def build_ffi(local_wolfssl, features): int wc_Sha3_256_Final(wc_Sha3*, byte*); int wc_Sha3_384_Final(wc_Sha3*, byte*); int wc_Sha3_512_Final(wc_Sha3*, byte*); + int wc_Sha3_224_Free(wc_Sha3*); + int wc_Sha3_256_Free(wc_Sha3*); + int wc_Sha3_384_Free(wc_Sha3*); + int wc_Sha3_512_Free(wc_Sha3*); """ if features["DES3"]: @@ -706,6 +717,7 @@ def build_ffi(local_wolfssl, features): int wc_HmacSetKey(Hmac*, int, const byte*, word32); int wc_HmacUpdate(Hmac*, const byte*, word32); int wc_HmacFinal(Hmac*, byte*); + void wc_HmacFree(Hmac*); """ if features["RSA"]: @@ -990,6 +1002,11 @@ def build_ffi(local_wolfssl, features): int wolfCrypt_GetPrivateKeyReadEnable_fips(enum wc_KeyType); """ + if features["ERROR_STRINGS"]: + cdef += """ + const char* wc_GetErrorString(int error); + """ + if features["ML_KEM"] or features["ML_DSA"]: cdef += """ static const int INVALID_DEVID; diff --git a/wolfcrypt/asn.py b/wolfcrypt/asn.py index 4d66ae4..15d809f 100644 --- a/wolfcrypt/asn.py +++ b/wolfcrypt/asn.py @@ -20,6 +20,8 @@ # pylint: disable=no-member,no-name-in-module +import hmac as _hmac + from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib from wolfcrypt.exceptions import WolfCryptError @@ -63,13 +65,13 @@ def der_to_pem(der, pem_type): return _ffi.buffer(pem, pem_length)[:] def hash_oid_from_class(hash_cls): - if hash_cls == Sha: + if _lib.SHA_ENABLED and hash_cls == Sha: return _lib.SHAh - elif hash_cls == Sha256: + elif _lib.SHA256_ENABLED and hash_cls == Sha256: return _lib.SHA256h - elif hash_cls == Sha384: + elif _lib.SHA384_ENABLED and hash_cls == Sha384: return _lib.SHA384h - elif hash_cls == Sha512: + elif _lib.SHA512_ENABLED and hash_cls == Sha512: return _lib.SHA512h else: err = "Unknown hash class {}.".format(hash_cls.__name__) @@ -97,4 +99,4 @@ def make_signature(data, hash_cls, key=None): def check_signature(signature, data, hash_cls, pub_key): computed_signature = make_signature(data, hash_cls) decrypted_signature = pub_key.verify(signature) - return computed_signature == decrypted_signature + return _hmac.compare_digest(computed_signature, decrypted_signature) diff --git a/wolfcrypt/ciphers.py b/wolfcrypt/ciphers.py index 86d931b..42fa282 100644 --- a/wolfcrypt/ciphers.py +++ b/wolfcrypt/ciphers.py @@ -129,6 +129,10 @@ def __init__(self, key, mode, IV=None): self.mode = mode + key = t2b(key) + if IV is not None: + IV = t2b(IV) + if self.key_size: if self.key_size != len(key): raise ValueError("key must be %d in length, not %d" % @@ -147,10 +151,10 @@ def __init__(self, key, mode, IV=None): self._native_object = _ffi.new(self._native_type) self._enc = None self._dec = None - self._key = t2b(key) + self._key = key if IV: - self._IV = t2b(IV) + self._IV = IV else: # pragma: no cover self._IV = _ffi.new("byte[%d]" % self.block_size) @@ -403,6 +407,8 @@ def __init__(self, key, IV, tag_bytes=16): """ key = t2b(key) IV = t2b(IV) + if tag_bytes < 4 or tag_bytes > 16: + raise ValueError("tag_bytes must be between 4 and 16") self._tag_bytes = tag_bytes if len(key) not in self._key_sizes: raise ValueError("key must be %s in length, not %d" % @@ -443,7 +449,7 @@ def encrypt(self, data): self._buf = _ffi.new("byte[%d]" % (len(data))) ret = _lib.wc_AesGcmEncryptUpdate(self._native_object, self._buf, data, len(data), aad, len(aad)) if ret < 0: - raise WolfCryptError("Decryption error (%d)" % ret) + raise WolfCryptError("Encryption error (%d)" % ret) return bytes(self._buf) def decrypt(self, data): @@ -456,7 +462,7 @@ def decrypt(self, data): self._mode = _DECRYPTION aad = self._aad elif self._mode == _ENCRYPTION: - raise WolfCryptError("Class instance already in use for decryption") + raise WolfCryptError("Class instance already in use for encryption") self._buf = _ffi.new("byte[%d]" % (len(data))) ret = _lib.wc_AesGcmDecryptUpdate(self._native_object, self._buf, data, len(data), aad, len(aad)) if ret < 0: @@ -496,7 +502,7 @@ class ChaCha(_Cipher): key_size = None # 16, 24, 32 _key_sizes = [16, 32] _native_type = "ChaCha *" - _IV_nonce = [] + _IV_nonce = b"" _IV_counter = 0 def __init__(self, key="", size=32): @@ -510,7 +516,7 @@ def __init__(self, key="", size=32): raise ValueError("key must be %s in length, not %d" % (self._key_sizes, len(self._key))) self.key_size = len(self._key) - self._IV_nonce = [] + self._IV_nonce = b"" self._IV_counter = 0 def _set_key(self, direction): @@ -755,6 +761,8 @@ def encrypt(self, plaintext): return _ffi.buffer(ciphertext)[:] def encrypt_oaep(self, plaintext, label=""): + if not self._hash_type: + raise WolfCryptError("Hash type not set. Cannot use OAEP padding without a hash type.") plaintext = t2b(plaintext) label = t2b(label) ciphertext = _ffi.new("byte[%d]" % self.output_size) @@ -838,13 +846,13 @@ def verify_pss(self, plaintext, signature): class RsaPrivate(RsaPublic): if _lib.KEYGEN_ENABLED: @classmethod - def make_key(cls, size, rng=Random(), hash_type=None): + def make_key(cls, size, rng=None, hash_type=None): """ Generates a new key pair of desired length **size**. """ + if rng is None: + rng = Random() rsa = cls(hash_type=hash_type) - if rsa == None: # pragma: no cover - raise WolfCryptError("Invalid key error (%d)" % ret) ret = _lib.wc_MakeRsaKey(rsa.native_object, size, 65537, rng.native_object) @@ -946,6 +954,8 @@ def decrypt_oaep(self, ciphertext, label=""): Returns a string containing the plaintext. """ + if not self._hash_type: + raise WolfCryptError("Hash type not set. Cannot use OAEP padding without a hash type.") ciphertext = t2b(ciphertext) label = t2b(label) plaintext = _ffi.new("byte[%d]" % self.output_size) @@ -1176,33 +1186,39 @@ def verify_raw(self, R, S, data): raise WolfCryptError("wolfCrypt error (%d)" % ret) ret = _lib.mp_init(mpS) if ret != 0: # pragma: no cover + _lib.mp_clear(mpR) raise WolfCryptError("wolfCrypt error (%d)" % ret) - ret = _lib.mp_read_unsigned_bin(mpR, R, len(R)) - if ret != 0: # pragma: no cover - raise WolfCryptError("wolfCrypt error (%d)" % ret) - - ret = _lib.mp_read_unsigned_bin(mpS, S, len(S)) - if ret != 0: # pragma: no cover - raise WolfCryptError("wolfCrypt error (%d)" % ret) + try: + ret = _lib.mp_read_unsigned_bin(mpR, R, len(R)) + if ret != 0: # pragma: no cover + raise WolfCryptError("wolfCrypt error (%d)" % ret) + ret = _lib.mp_read_unsigned_bin(mpS, S, len(S)) + if ret != 0: # pragma: no cover + raise WolfCryptError("wolfCrypt error (%d)" % ret) - ret = _lib.wc_ecc_verify_hash_ex(mpR, mpS, - data, len(data), - status, self.native_object) + ret = _lib.wc_ecc_verify_hash_ex(mpR, mpS, + data, len(data), + status, self.native_object) - if ret < 0: - raise WolfCryptError("Verify error (%d)" % ret) + if ret < 0: + raise WolfCryptError("Verify error (%d)" % ret) - return status[0] == 1 + return status[0] == 1 + finally: + _lib.mp_clear(mpR) + _lib.mp_clear(mpS) class EccPrivate(EccPublic): @classmethod - def make_key(cls, size, rng=Random()): + def make_key(cls, size, rng=None): """ Generates a new key pair of desired length **size**. """ + if rng is None: + rng = Random() ecc = cls() ret = _lib.wc_ecc_make_key(rng.native_object, size, @@ -1305,12 +1321,14 @@ def shared_secret(self, peer): return _ffi.buffer(shared_secret, secret_size[0])[:] - def sign(self, plaintext, rng=Random()): + def sign(self, plaintext, rng=None): """ Signs **plaintext**, using the private key data in the object. Returns the signature. """ + if rng is None: + rng = Random() plaintext = t2b(plaintext) signature = _ffi.new("byte[%d]" % self.max_signature_size) @@ -1328,12 +1346,14 @@ def sign(self, plaintext, rng=Random()): return _ffi.buffer(signature, signature_size[0])[:] if _lib.MPAPI_ENABLED: - def sign_raw(self, plaintext, rng=Random()): + def sign_raw(self, plaintext, rng=None): """ Signs **plaintext**, using the private key data in the object. Returns the signature in its two raw components r, s """ + if rng is None: + rng = Random() plaintext = t2b(plaintext) R = _ffi.new("mp_int[1]"); S = _ffi.new("mp_int[1]"); @@ -1346,25 +1366,30 @@ def sign_raw(self, plaintext, rng=Random()): raise WolfCryptError("wolfCrypt error (%d)" % ret) ret = _lib.mp_init(S) if ret != 0: # pragma: no cover + _lib.mp_clear(R) raise WolfCryptError("wolfCrypt error (%d)" % ret) - ret = _lib.wc_ecc_sign_hash_ex(plaintext, len(plaintext), - rng.native_object, - self.native_object, - R, S) - if ret != 0: # pragma: no cover - raise WolfCryptError("Signature error (%d)" % ret) + try: + ret = _lib.wc_ecc_sign_hash_ex(plaintext, len(plaintext), + rng.native_object, + self.native_object, + R, S) + if ret != 0: # pragma: no cover + raise WolfCryptError("Signature error (%d)" % ret) - ret = _lib.mp_to_unsigned_bin_len(R, R_bin, self.size) - if ret != 0: # pragma: no cover - raise WolfCryptError("wolfCrypt error (%d)" % ret) + ret = _lib.mp_to_unsigned_bin_len(R, R_bin, self.size) + if ret != 0: # pragma: no cover + raise WolfCryptError("wolfCrypt error (%d)" % ret) - ret = _lib.mp_to_unsigned_bin_len(S, S_bin, self.size) - if ret != 0: # pragma: no cover - raise WolfCryptError("wolfCrypt error (%d)" % ret) + ret = _lib.mp_to_unsigned_bin_len(S, S_bin, self.size) + if ret != 0: # pragma: no cover + raise WolfCryptError("wolfCrypt error (%d)" % ret) - return _ffi.buffer(R_bin, self.size)[:], _ffi.buffer(S_bin, - self.size)[:] + return _ffi.buffer(R_bin, self.size)[:], _ffi.buffer(S_bin, + self.size)[:] + finally: + _lib.mp_clear(R) + _lib.mp_clear(S) if _lib.ED25519_ENABLED: @@ -1465,10 +1490,12 @@ def __init__(self, key=None, pub=None): self.decode_key(key,pub) @classmethod - def make_key(cls, size, rng=Random()): + def make_key(cls, size, rng=None): """ Generates a new key pair of desired length **size**. """ + if rng is None: + rng = Random() ed25519 = cls() ret = _lib.wc_ed25519_make_key(rng.native_object, size, @@ -1506,6 +1533,8 @@ def decode_key(self, key, pub = None): raise WolfCryptError("Public key generate error (%d)" % ret) ret = _lib.wc_ed25519_import_public(pubkey, self.size, self.native_object); + if ret < 0: + raise WolfCryptError("Public key import error (%d)" % ret) if self.size <= 0: # pragma: no cover raise WolfCryptError("Key decode error (%d)" % self.size) @@ -1521,20 +1550,22 @@ def encode_key(self): """ key = _ffi.new("byte[%d]" % (self.size * 4)) pubkey = _ffi.new("byte[%d]" % (self.size * 4)) - size = _ffi.new("word32[1]") + priv_size = _ffi.new("word32[1]") + pub_size = _ffi.new("word32[1]") - size[0] = _lib.wc_ed25519_priv_size(self.native_object) + priv_size[0] = _lib.wc_ed25519_priv_size(self.native_object) + pub_size[0] = _lib.wc_ed25519_pub_size(self.native_object) ret = _lib.wc_ed25519_export_private_only(self.native_object, - key, size) + key, priv_size) if ret != 0: # pragma: no cover raise WolfCryptError("Private key encode error (%d)" % ret) ret = _lib.wc_ed25519_export_public(self.native_object, pubkey, - size) + pub_size) if ret != 0: # pragma: no cover raise WolfCryptError("Public key encode error (%d)" % ret) - return _ffi.buffer(key, size[0])[:], _ffi.buffer(pubkey, size[0])[:] + return _ffi.buffer(key, priv_size[0])[:], _ffi.buffer(pubkey, pub_size[0])[:] def sign(self, plaintext): """ @@ -1661,10 +1692,12 @@ def __init__(self, key=None, pub=None): self.decode_key(key,pub) @classmethod - def make_key(cls, size, rng=Random()): + def make_key(cls, size, rng=None): """ Generates a new key pair of desired length **size**. """ + if rng is None: + rng = Random() ed448 = cls() ret = _lib.wc_ed448_make_key(rng.native_object, size, @@ -1702,6 +1735,8 @@ def decode_key(self, key, pub = None): raise WolfCryptError("Public key generate error (%d)" % ret) ret = _lib.wc_ed448_import_public(pubkey, self.size, self.native_object); + if ret < 0: + raise WolfCryptError("Public key import error (%d)" % ret) if self.size <= 0: # pragma: no cover raise WolfCryptError("Key decode error (%d)" % self.size) @@ -1717,20 +1752,22 @@ def encode_key(self): """ key = _ffi.new("byte[%d]" % (self.size * 4)) pubkey = _ffi.new("byte[%d]" % (self.size * 4)) - size = _ffi.new("word32[1]") + priv_size = _ffi.new("word32[1]") + pub_size = _ffi.new("word32[1]") - size[0] = _lib.wc_ed448_priv_size(self.native_object) + priv_size[0] = _lib.wc_ed448_priv_size(self.native_object) + pub_size[0] = _lib.wc_ed448_pub_size(self.native_object) ret = _lib.wc_ed448_export_private_only(self.native_object, - key, size) + key, priv_size) if ret != 0: # pragma: no cover raise WolfCryptError("Private key encode error (%d)" % ret) ret = _lib.wc_ed448_export_public(self.native_object, pubkey, - size) + pub_size) if ret != 0: # pragma: no cover raise WolfCryptError("Public key encode error (%d)" % ret) - return _ffi.buffer(key, size[0])[:], _ffi.buffer(pubkey, size[0])[:] + return _ffi.buffer(key, priv_size[0])[:], _ffi.buffer(pubkey, pub_size[0])[:] def sign(self, plaintext, ctx=None): """ @@ -1878,13 +1915,15 @@ def decode_key(self, pub_key): if ret < 0: # pragma: no cover raise WolfCryptError("wc_KyberKey_DecodePublicKey() error (%d)" % ret) - def encapsulate(self, rng=Random()): + def encapsulate(self, rng=None): """ :param rng: random number generator for an encupsulation :type rng: Random :return: tuple of a shared secret (first element) and the cipher text (second element) :rtype: tuple[bytes, bytes] """ + if rng is None: + rng = Random() ct_size = self.ct_size ss_size = self.ss_size ct = _ffi.new(f"unsigned char[{ct_size}]") @@ -1922,7 +1961,7 @@ def encapsulate_with_random(self, rand): class MlKemPrivate(_MlKemBase): @classmethod - def make_key(cls, mlkem_type, rng=Random()): + def make_key(cls, mlkem_type, rng=None): """ :param mlkem_type: ML-KEM type :type mlkem_type: MlKemType @@ -1931,6 +1970,8 @@ def make_key(cls, mlkem_type, rng=Random()): :return: `MlKemPrivate` object :rtype: MlKemPrivate """ + if rng is None: + rng = Random() mlkem_priv = cls(mlkem_type) ret = _lib.wc_KyberKey_MakeKey(mlkem_priv.native_object, rng.native_object) @@ -2165,7 +2206,7 @@ def verify(self, signature, message): class MlDsaPrivate(_MlDsaBase): @classmethod - def make_key(cls, mldsa_type, rng=Random()): + def make_key(cls, mldsa_type, rng=None): """ :param mldsa_type: ML-DSA type :type mldsa_type: MlDsaType @@ -2174,6 +2215,8 @@ def make_key(cls, mldsa_type, rng=Random()): :return: `MlDsaPrivate` object :rtype: MlDsaPrivate """ + if rng is None: + rng = Random() mldsa_priv = cls(mldsa_type) ret = _lib.wc_dilithium_make_key( mldsa_priv.native_object, rng.native_object @@ -2258,7 +2301,7 @@ def decode_key(self, priv_key, pub_key=None): if pub_key is not None: self._decode_pub_key(pub_key) - def sign(self, message, rng=Random()): + def sign(self, message, rng=None): """ :param message: message to be signed :type message: bytes or str @@ -2267,6 +2310,8 @@ def sign(self, message, rng=Random()): :return: signature :rtype: bytes """ + if rng is None: + rng = Random() msg_bytestype = t2b(message) in_size = self.sig_size signature = _ffi.new(f"byte[{in_size}]") diff --git a/wolfcrypt/hashes.py b/wolfcrypt/hashes.py index ef3989e..7276947 100644 --- a/wolfcrypt/hashes.py +++ b/wolfcrypt/hashes.py @@ -116,6 +116,10 @@ class Sha(_Hash): digest_size = 20 _native_type = "wc_Sha *" _native_size = _ffi.sizeof("wc_Sha") + _delete = _lib.wc_ShaFree + + def __del__(self): + self._delete(self._native_object) def _init(self): return _lib.wc_InitSha(self._native_object) @@ -138,6 +142,10 @@ class Sha256(_Hash): digest_size = 32 _native_type = "wc_Sha256 *" _native_size = _ffi.sizeof("wc_Sha256") + _delete = _lib.wc_Sha256Free + + def __del__(self): + self._delete(self._native_object) def _init(self): return _lib.wc_InitSha256(self._native_object) @@ -160,6 +168,10 @@ class Sha384(_Hash): digest_size = 48 _native_type = "wc_Sha384 *" _native_size = _ffi.sizeof("wc_Sha384") + _delete = _lib.wc_Sha384Free + + def __del__(self): + self._delete(self._native_object) def _init(self): return _lib.wc_InitSha384(self._native_object) @@ -182,6 +194,10 @@ class Sha512(_Hash): digest_size = 64 _native_type = "wc_Sha512 *" _native_size = _ffi.sizeof("wc_Sha512") + _delete = _lib.wc_Sha512Free + + def __del__(self): + self._delete(self._native_object) def _init(self): return _lib.wc_InitSha512(self._native_object) @@ -209,6 +225,16 @@ class Sha3(_Hash): SHA3_384_DIGEST_SIZE = 48 SHA3_512_DIGEST_SIZE = 64 + def __del__(self): + if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: + _lib.wc_Sha3_224_Free(self._native_object) + elif self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: + _lib.wc_Sha3_256_Free(self._native_object) + elif self.digest_size == Sha3.SHA3_384_DIGEST_SIZE: + _lib.wc_Sha3_384_Free(self._native_object) + elif self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: + _lib.wc_Sha3_512_Free(self._native_object) + def __init__(self, string=None, size=SHA3_384_DIGEST_SIZE): # pylint: disable=W0231 self._native_object = _ffi.new(self._native_type) self.digest_size = size @@ -285,6 +311,10 @@ class _Hmac(_Hash): digest_size = None _native_type = "Hmac *" _native_size = _ffi.sizeof("Hmac") + _delete = _lib.wc_HmacFree + + def __del__(self): + self._delete(self._native_object) def __init__(self, key, string=None): # pylint: disable=W0231 key = t2b(key) From da7a0b4935d298d8d4a0ad9323bcee4aad1fb150 Mon Sep 17 00:00:00 2001 From: Jeremiah Mackey Date: Fri, 3 Apr 2026 16:05:23 +0000 Subject: [PATCH 5/5] Add tests and fix build issues --- scripts/build_ffi.py | 27 ++++++++++++++------------- tests/test_aesgcmstream.py | 13 +++++++++++++ tests/test_ciphers.py | 26 ++++++++++++++++++++++++++ wolfcrypt/ciphers.py | 8 ++++---- wolfcrypt/hashes.py | 18 ++++++++++-------- 5 files changed, 67 insertions(+), 25 deletions(-) diff --git a/scripts/build_ffi.py b/scripts/build_ffi.py index a2d2b57..6869cd5 100644 --- a/scripts/build_ffi.py +++ b/scripts/build_ffi.py @@ -305,13 +305,14 @@ def generate_libwolfssl(fips): def get_features(local_wolfssl, features): fips = False + fips_file = None - if sys.platform == "win32": + if local_wolfssl and sys.platform == "win32": # On Windows, we assume the local_wolfssl path is to a wolfSSL source # directory where the library has been built. fips_file = os.path.join(local_wolfssl, "wolfssl", "wolfcrypt", "fips.h") - else: + elif local_wolfssl: # On non-Windows platforms, first assume local_wolfssl is an # installation directory with an include subdirectory. fips_file = os.path.join(local_wolfssl, "include", "wolfssl", @@ -321,7 +322,7 @@ def get_features(local_wolfssl, features): fips_file = os.path.join(local_wolfssl, "wolfssl", "wolfcrypt", "fips.h") - if os.path.exists(fips_file): + if fips_file and os.path.exists(fips_file): with open(fips_file, "r") as f: contents = f.read() if not contents.isspace(): @@ -617,10 +618,10 @@ def build_ffi(local_wolfssl, features): int wc_Sha3_256_Final(wc_Sha3*, byte*); int wc_Sha3_384_Final(wc_Sha3*, byte*); int wc_Sha3_512_Final(wc_Sha3*, byte*); - int wc_Sha3_224_Free(wc_Sha3*); - int wc_Sha3_256_Free(wc_Sha3*); - int wc_Sha3_384_Free(wc_Sha3*); - int wc_Sha3_512_Free(wc_Sha3*); + void wc_Sha3_224_Free(wc_Sha3*); + void wc_Sha3_256_Free(wc_Sha3*); + void wc_Sha3_384_Free(wc_Sha3*); + void wc_Sha3_512_Free(wc_Sha3*); """ if features["DES3"]: @@ -1106,17 +1107,17 @@ def main(ffibuilder): e = "Local wolfssl installation path {} doesn't exist.".format(local_wolfssl) raise FileNotFoundError(e) - get_features(local_wolfssl, features) - - if features["RSA_BLINDING"] and features["FIPS"]: - # These settings can't coexist. See settings.h. - features["RSA_BLINDING"] = 0 - if not local_wolfssl: print("Building wolfSSL...") if not get_libwolfssl(): generate_libwolfssl(features["FIPS"]) + get_features(local_wolfssl, features) + + if features["RSA_BLINDING"] and features["FIPS"]: + # These settings can't coexist. See settings.h. + features["RSA_BLINDING"] = 0 + build_ffi(local_wolfssl, features) diff --git a/tests/test_aesgcmstream.py b/tests/test_aesgcmstream.py index 12c8c04..cfa6881 100644 --- a/tests/test_aesgcmstream.py +++ b/tests/test_aesgcmstream.py @@ -123,3 +123,16 @@ def test_encrypt_aad_bad(): gcmdec.decrypt(buf) with pytest.raises(WolfCryptError): gcmdec.final(authTag) + + def test_invalid_tag_bytes(): + key = "fedcba9876543210" + iv = "0123456789abcdef" + with pytest.raises(ValueError, match="tag_bytes must be between 4 and 16"): + AesGcmStream(key, iv, tag_bytes=0) + with pytest.raises(ValueError, match="tag_bytes must be between 4 and 16"): + AesGcmStream(key, iv, tag_bytes=3) + with pytest.raises(ValueError, match="tag_bytes must be between 4 and 16"): + AesGcmStream(key, iv, tag_bytes=17) + # valid edge cases + AesGcmStream(key, iv, tag_bytes=4) + AesGcmStream(key, iv, tag_bytes=16) diff --git a/tests/test_ciphers.py b/tests/test_ciphers.py index 79092e2..7871072 100644 --- a/tests/test_ciphers.py +++ b/tests/test_ciphers.py @@ -876,3 +876,29 @@ def test_aessiv_decrypt_kat_openssl(): TEST_VECTOR_CIPHERTEXT_OPENSSL ) assert plaintext == TEST_VECTOR_PLAINTEXT_OPENSSL + + +if _lib.DES3_ENABLED: + def test_des3_rejects_mode_ctr(): + key = b"\x01\x23\x45\x67\x89\xab\xcd\xef" * 3 + iv = b"\xfe\xdc\xba\x98\x76\x54\x32\x10" + with pytest.raises(ValueError, match="Des3 only supports MODE_CBC"): + Des3(key, MODE_CTR, iv) + + +if _lib.CHACHA_ENABLED: + def test_chacha_non_block_aligned(): + key = b"\x00" * 32 + chacha = ChaCha(key) + chacha.set_iv(b"\x00" * 12) + plaintext = b"This is 25 bytes of text!" + assert len(plaintext) == 25 + ciphertext = chacha.encrypt(plaintext) + assert len(ciphertext) == 25 + chacha2 = ChaCha(key) + chacha2.set_iv(b"\x00" * 12) + assert chacha2.decrypt(ciphertext) == plaintext + + def test_chacha_invalid_key_length(): + with pytest.raises(ValueError, match="key must be"): + ChaCha(b"\x00" * 20) diff --git a/wolfcrypt/ciphers.py b/wolfcrypt/ciphers.py index 42fa282..6e3739b 100644 --- a/wolfcrypt/ciphers.py +++ b/wolfcrypt/ciphers.py @@ -422,7 +422,8 @@ def __init__(self, key, IV, tag_bytes=16): raise WolfCryptError("Init error (%d)" % ret) def __del__(self): - _lib.wc_AesFree(self._native_object) + if hasattr(self, '_native_object'): + _lib.wc_AesFree(self._native_object) def set_aad(self, data): """ @@ -1231,6 +1232,7 @@ def make_key(cls, size, rng=None): ret = _lib.wc_ecc_set_rng(ecc.native_object, rng.native_object) if ret < 0: raise WolfCryptError("Error setting ECC RNG (%d)" % ret) + ecc._rng = rng return ecc @@ -2247,9 +2249,7 @@ def priv_key_size(self): if ret < 0: # pragma: no cover raise WolfCryptError("wc_MlDsaKey_GetPrivLen() error (%d)" % ret) - key_pair_size = size[0] - - return key_pair_size - self.pub_key_size + return size[0] - self.pub_key_size def encode_pub_key(self): """ diff --git a/wolfcrypt/hashes.py b/wolfcrypt/hashes.py index 7276947..2e11a1c 100644 --- a/wolfcrypt/hashes.py +++ b/wolfcrypt/hashes.py @@ -225,19 +225,21 @@ class Sha3(_Hash): SHA3_384_DIGEST_SIZE = 48 SHA3_512_DIGEST_SIZE = 64 + _SHA3_FREE = { + 28: _lib.wc_Sha3_224_Free, + 32: _lib.wc_Sha3_256_Free, + 48: _lib.wc_Sha3_384_Free, + 64: _lib.wc_Sha3_512_Free, + } + def __del__(self): - if self.digest_size == Sha3.SHA3_224_DIGEST_SIZE: - _lib.wc_Sha3_224_Free(self._native_object) - elif self.digest_size == Sha3.SHA3_256_DIGEST_SIZE: - _lib.wc_Sha3_256_Free(self._native_object) - elif self.digest_size == Sha3.SHA3_384_DIGEST_SIZE: - _lib.wc_Sha3_384_Free(self._native_object) - elif self.digest_size == Sha3.SHA3_512_DIGEST_SIZE: - _lib.wc_Sha3_512_Free(self._native_object) + if hasattr(self, '_delete'): + self._delete(self._native_object) def __init__(self, string=None, size=SHA3_384_DIGEST_SIZE): # pylint: disable=W0231 self._native_object = _ffi.new(self._native_type) self.digest_size = size + self._delete = self._SHA3_FREE.get(size) ret = self._init() if ret < 0: # pragma: no cover raise WolfCryptError("Sha3 init error (%d)" % ret)