diff --git a/requirements.txt b/requirements.txt index 84daaaa..06cfa7e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,3 @@ Flask==1.0.2 Flask-Login==0.4.1 pyserial==3.4 pymodbus==2.2.0 -cryptography diff --git a/webserver/credentials.py b/webserver/credentials.py index e6b3b2d..a79ef76 100644 --- a/webserver/credentials.py +++ b/webserver/credentials.py @@ -1,94 +1,85 @@ -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography import x509 -from cryptography.x509.oid import NameOID -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.backends import default_backend -import datetime -import ipaddress +import subprocess import os class CertGen(): - """Generates a self-signed TLS certificate and private key.""" + """Generates a self-signed TLS certificate and private key using OpenSSL CLI.""" def __init__(self, hostname, ip_addresses=None): self.hostname = hostname - self.ip_addresses = ip_addresses - - self.now = datetime.datetime.utcnow() - self.subject = self.issuer = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, hostname), - ]) - - self.alt_names = [x509.DNSName(hostname)] - if ip_addresses: - for addr in ip_addresses: - self.alt_names.append(x509.IPAddress(ipaddress.ip_address(addr))) - - self.san_extension = x509.SubjectAlternativeName(self.alt_names) - - def generate_key(self): - # Generate our key - self.key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) + self.ip_addresses = ip_addresses or [] def generate_self_signed_cert(self, cert_file="cert.pem", key_file="key.pem"): + """Generate a self-signed certificate using OpenSSL CLI with strong security parameters.""" print(f"Generating self-signed certificate for {self.hostname}...") + + san_list = [f"DNS:{self.hostname}"] + for ip in self.ip_addresses: + san_list.append(f"IP:{ip}") + san_string = ",".join(san_list) + + cmd = [ + "openssl", "req", + "-x509", + "-newkey", "rsa:4096", + "-sha256", + "-nodes", + "-keyout", str(key_file), + "-out", str(cert_file), + "-days", "36500", + "-subj", f"/CN={self.hostname}", + "-addext", f"subjectAltName={san_string}" + ] + + try: + result = subprocess.run( + cmd, + check=True, + capture_output=True, + text=True + ) + print(f"Certificate saved to {cert_file}") + print(f"Private key saved to {key_file}") + return f"Certificate generated successfully for {self.hostname}" + except subprocess.CalledProcessError as e: + error_msg = f"Error generating certificate: {e.stderr}" + print(error_msg) + raise RuntimeError(error_msg) + except FileNotFoundError: + error_msg = "OpenSSL not found. Please ensure OpenSSL is installed on the system." + print(error_msg) + raise RuntimeError(error_msg) - self.generate_key() - - cert = ( - x509.CertificateBuilder() - .subject_name(self.subject) - .issuer_name(self.issuer) - .public_key(self.key.public_key()) - .serial_number(x509.random_serial_number()) - .not_valid_before(self.now) - .not_valid_after(self.now + datetime.timedelta(days=365)) # Valid for 1 year - .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) - .add_extension(self.san_extension, critical=False) - .sign(self.key, hashes.SHA256(), default_backend()) - ) - - # Write our certificate and key to disk - with open(cert_file, "wb+") as f: - f.write(cert.public_bytes(serialization.Encoding.PEM)) - with open(key_file, "wb+") as f: - f.write(self.key.private_bytes( - serialization.Encoding.PEM, - serialization.PrivateFormat.PKCS8, - serialization.NoEncryption() - )) - print(f"Certificate saved to {cert_file}") - print(f"Private key saved to {key_file}") - - # TODO add a function to update the certificate on the client before expiration def is_certificate_valid(self, cert_file): - """Check if the certificate is valid.""" + """Check if the certificate exists and is not expired using OpenSSL.""" if not os.path.exists(cert_file): print(f"Certificate file not found: {cert_file}") return False - + try: - with open(cert_file, "rb") as f: - cert_data = f.read() - cert = x509.load_pem_x509_certificate(cert_data, default_backend()) - - now = datetime.datetime.utcnow() - - if now < cert.not_valid_before_utc: - print(f"Certificate is not yet valid. Valid from: {cert.not_valid_before}") + result = subprocess.run( + ["openssl", "x509", "-in", str(cert_file), "-noout", "-checkend", "0"], + check=False, + capture_output=True, + text=True + ) + + if result.returncode == 0: + date_result = subprocess.run( + ["openssl", "x509", "-in", str(cert_file), "-noout", "-enddate"], + check=True, + capture_output=True, + text=True + ) + expiry_line = date_result.stdout.strip() + print(f"Certificate is valid. {expiry_line}") + return True + else: + print(f"Certificate has expired.") return False - if now > cert.not_valid_after_utc: - print(f"Certificate has expired. Expired on: {cert.not_valid_after}") - return False - - print(f"Certificate is valid. Expires on: {cert.not_valid_after_utc}") - return True - - except Exception as e: - print(f"Error loading or parsing certificate: {e}") + + except subprocess.CalledProcessError as e: + print(f"Error checking certificate validity: {e.stderr}") + return False + except FileNotFoundError: + print("OpenSSL not found. Please ensure OpenSSL is installed on the system.") return False