Source code for cfssl.cfssl

# -*- coding: utf-8 -*-
# Copyright 2016 LasLabs Inc.
# License MIT (https://opensource.org/licenses/MIT).

import requests
import logging

from .exceptions import CFSSLRemoteException

from .models.config_key import ConfigKey
from .utils import to_api

log = logging.getLogger(__name__)


[docs]class CFSSL(object): """ It provides Python bindings to a remote CFSSL server via HTTP(S). Additional documentation regarding the API endpoints is available at https://github.com/cloudflare/cfssl/tree/master/doc/api """ def __init__(self, host, port, ssl=True, verify_cert=True): """ Initialize the CFSSL object. Args: host (str): Host or IP of remote CFSSL instance. port (int): Port number of remote CFSSL instance. ssl (bool): Whether to use SSL. verify_cert (bool or str): File path of CA cert for verification, `True` to use system certs, or `False` to disable certificate verification. """ ssl = 'https' if ssl else 'http' self.verify = verify_cert self.uri_base = '%s://%s:%d' % (ssl, host, port)
[docs] def auth_sign(self, token, request, datetime=None, remote_address=None): """ It provides returns a signed certificate. Args: token (str): The authentication token. request (CertificateRequest): Signing request document. datetime (datetime.datetime): Authentication timestamp. remote_address (str): An address used in making the request. Returns: str: A PEM-encoded certificate that has been signed by the server. """ data = self._clean_mapping({ 'token': token, 'request': to_api(request), 'datetime': datetime, 'remote_address': remote_address, }) return self.call('authsign', 'POST', data=data)
[docs] def bundle(self, certificate, private_key=None, flavor='ubiquitous', domain=None, ip=None): """ It builds and returns certificate bundles. Args: certificate (str): The PEM-encoded certificate to be bundled. If the ``certificate`` parameter is present, the following four arguments are valid: private_key (str): The PEM-encoded private key to be included with the bundle. This is valid only if the server is not running in ``keyless`` mode. flavor (str): One of ``ubiquitous``, ``force``, or ``optimal``, with a default value of ``ubiquitous``. A ubiquitous bundle is one that has a higher probability of being verified everywhere, even by clients using outdated or unusual trust stores. Force will cause the endpoint to use the bundle provided in the ``certificate`` parameter, and will only verify that the bundle is a valid (verifiable) chain. domain (str): The domain name to verify as the hostname of the certificate. ip (str): The IP address to verify against the certificate IP SANs. If only the ``domain`` parameter is present, the following parameter is valid: ip (str): The IP address of the remote host; this will fetch the certificate from the IP, and verify that it is valid for the domain name. Returns: dict: Object representing the bundle, with the following keys: * bundle contains the concatenated list of PEM certificates forming the certificate chain; this forms the actual bundle. The remaining parameters are additional metadata supporting the bundle. * crl_support is true if CRL information is contained in the certificate. * crt contains the original certificate the bundle is built from. * expires contains the expiration date of the certificate. * hostnames contains the SAN hostnames for the certificate. * issuer contains the X.509 issuer information for the certificate. * key contains the private key for the certificate, if one was presented. * key_size contains the size of the key in bits for the certificate. It will be present even if the private key wasn't provided because this can be determined from the public key. * key_type contains a textual description of the key type, e.g. '2048-bit RSA'. * ocsp contains the OCSP URLs for the certificate, if present. * ocsp_support will be true if the certificate supports OCSP revocation checking. * signature contains the signature type used in the certificate, e.g. ``SHA1WithRSA``. * status contains a :type:`dict` of elements: * code is bit-encoded error code. 1st bit indicates whether there is a expiring certificate in the bundle. 2nd bit indicates whether there is a ubiquity issue with the bundle. * expiring_SKIs contains the SKIs (subject key identifiers) for any certificates that might expire soon (within 30 days). * messages is a list of human-readable warnings on bundle ubiquity and certificate expiration. For example, an expiration warning can be "The expiring cert is #1 in the chain", indicating the leaf certificate is expiring. Ubiquity warnings include SHA-1 deprecation warning (if the bundle triggers any major browser's SHA-1 deprecation policy), SHA-2 compatibility warning (if the bundle contains signatures using ECDSA SHA-2 hash algorithms, it will be rejected by Windows XP SP2), compatibility warning (if the bundle contains ECDSA certificates, it will be rejected by Windows XP, Android 2.2 and Android 2.3 etc) and root trust warning (if the bundle cannot be trusted by some major OSes or browsers). * rebundled indicates whether the server had to rebundle the certificate. The server will rebundle the uploaded certificate as needed; for example, if the certificate contains none of the required intermediates or a better set of intermediates was found. In this case, the server will mark rebundled as true. * untrusted_root_stores contains the names of any major OSes and browsers that doesn't trust the bundle. The names are used to construct the root trust warnings in the messages list * subject contains the X.509 subject identifier from the certificate. """ data = self._clean_mapping({ 'certificate': certificate, 'domain': domain, 'private_key': private_key, 'flavor': flavor, 'ip': ip, }) return self.call('bundle', 'POST', data=data)
[docs] def info(self, label, profile=None): """ It returns information about the CA, including the cert. Args: label (str): A string specifying the signer. profile (str): a string specifying the signing profile for the signer. Signing profile specifies what key usages should be used and how long the expiry should be set. Returns: dict: Mapping with three keys: * certificate (str): a PEM-encoded certificate of the signer. * usage (list of str): Key usages from the signing profile. * expiry (str): the expiry string from the signing profile. """ data = self._clean_mapping({ 'label': label, 'profile': profile, }) return self.call('info', 'POST', data=data)
[docs] def init_ca(self, certificate_request, ca=None): """ It initializes a new certificate authority. Args: certificate_request (CertificateRequest): The certificate request to use when creating the CA. ca (ConfigServer, optional): The configuration of the requested Certificate Authority. Returns: dict: Mapping with two keys: * private key (str): a PEM-encoded CA private key. * certificate (str): a PEM-encoded self-signed CA certificate. """ csr_api = to_api(certificate_request) data = self._clean_mapping({ 'hosts': csr_api['hosts'], 'names': csr_api['names'], 'CN': csr_api['CN'], 'key': csr_api['key'], 'ca': ca and to_api(ca) or None, }) return self.call('init_ca', 'POST', data=data)
[docs] def new_key(self, hosts, names, common_name=None, key=None, ca=None): """ It generates and returns a new private key + CSR. Args: hosts (tuple of Host): Subject Alternative Name(s) for the requested certificate. names (tuple of SubjectInfo): The Subject Info(s) for the requested certificate. CN (str): the common name for the certificate subject in the requestedrequested CA certificate. key (ConfigKey): Cipher and strength to use for certificate. ca (ConfigServer): the CA configuration of the requested CA. Returns: dict: Mapping with three keys: * private key (str): a PEM-encoded CA private key. * certificate (str): a PEM-encoded self-signed CA certificate. * sums (dict): Mapping holding both MD5 and SHA1 digests for the certificate request """ data = self._clean_mapping({ 'hosts': [ to_api(host) for host in hosts ], 'names': [ to_api(name) for name in names ], 'CN': common_name, 'key': key and to_api(key) or ConfigKey().to_api(), 'ca': ca and to_api(ca) or None, }) return self.call('newkey', 'POST', data=data)
[docs] def new_cert(self, request, label=None, profile=None, bundle=None): """ It generates and returns a new private key and certificate. Args: request (CertificateRequest): CSR to be used for certificate creation. label (str): Specifying which signer to be appointed to sign the CSR, useful when interacting with cfssl server that stands in front of a remote multi-root CA signer. profile (str): Specifying the signing profile for the signer. bundle (bool): Specifying whether to include an "optimal" certificate bundle along with the certificate. Returns: dict: mapping with these keys: * private key (str): a PEM-encoded private key. * certificate_request (str): a PEM-encoded certificate request. * certificate (str): a PEM-encoded certificate, signed by the server. * sums (dict): Holding both MD5 and SHA1 digests for the certificate request and the certificate. * bundle (str): See the result of endpoint_bundle.txt (only included if the bundle parameter was set). """ data = self._clean_mapping({ 'request': to_api(request), 'label': label, 'profile': profile, 'bundle': bundle, }) return self.call('newcert', 'POST', data=data)
[docs] def revoke(self, serial, authority_key_id, reason): """ It provides certificate revocation. Args: serial (str): Specifying the serial number of a certificate. authority_key_id (str): Specifying the authority key identifier of the certificate to be revoked; this is used to distinguish which private key was used to sign the certificate. reason (str): Identifying why the certificate was revoked; see, for example, ReasonStringToCode in the ocsp package or section 4.2.1.13 of RFC 5280. The "reasons" used here are the ReasonFlag names in said RFC. """ data = self._clean_mapping({ 'serial': serial, 'authority_key_id': authority_key_id, 'reason': reason, }) return self.call('revoke', 'POST', data=data)
[docs] def scan(self, host, ip=None, timeout=None, family=None, scanner=None): """ It scans servers to determine the quality of their TLS setup. Args: host (Host): The host to scan. ip (str): IP Address to override DNS lookup of host. timeout (str): The amount of time allotted for the scan to complete (default: 1 minute). family (str): regular expression specifying scan famil(ies) to run. scanner (str): regular expression specifying scanner(s) to run. Returns: dict: Mapping with keys for each scan family. Each of these objects contains keys for each scanner run in that family pointing to objects possibly containing the following keys: * grade (str): Describing the exit status of the scan. Can be: * "Good": host performing the expected state-of-the-art. * "Warning": host with non-ideal configuration, possibly maintaining support for legacy clients. * "Bad": host with serious misconfiguration or vulnerability * "Skipped": indicates that the scan was not performed for some reason. * error (str): Any error encountered during the scan process. * output: (dict) Arbitrary data retrieved during the scan. """ data = self._clean_mapping({ 'host': to_api(host), 'ip': ip, 'timeout': timeout, 'family': family, 'scanner': scanner, }) return self.call('scan', params=data)
[docs] def scan_info(self): """ It lists options available for scanning. Returns: dict: Mapping with keys for each scan family. For each family, there exists a `description` containing a string describing the family and a `scanners` object mapping each of the family's scanners to an object containing a ``description`` string. """ return self.call('scaninfo')
[docs] def sign(self, certificate_request, hosts=None, subject=None, serial_sequence=None, label=None, profile=None): """ It signs and returns a certificate. Args: certificate_request (str): the CSR bytes to be signed (in PEM). hosts (tuple of Host): of SAN (subject alternative .names) which overrides the ones in the CSR subject (str): The certificate subject which overrides the ones in the CSR. serial_sequence (str): Specify the prefix which the generated certificate serial should have. label (str): Specifying which signer to be appointed to sign the CSR, useful when interacting with a remote multi-root CA signer. profile (ConfigServer): Specifying the signing profile for the signer, useful when interacting with a remote multi-root CA signer. Returns: str: A PEM-encoded certificate that has been signed by the server. """ data = self._clean_mapping({ 'certificate_request': to_api(certificate_request), 'hosts': [ to_api(host) for host in hosts ], 'subject': subject, 'serial_sequence': serial_sequence, 'label': label, 'profile': to_api(profile), }) result = self.call('sign', 'POST', data=data) return result['certificate']
[docs] def call(self, endpoint, method='GET', params=None, data=None): """ It calls the remote endpoint and returns the result, if success. Args: endpoint (str): CFSSL endpoint to call (e.g. ``newcert``). method (str): HTTP method to utilize for the Request. params: (dict|bytes) Data to be sent in the query string for the Request. data: (dict or bytes or file) Data to send in the body of the Request. Returns: (mixed) Data contained in ``result`` key of the API response. Raises: CFSSLRemoteException: In the event of a ``False`` in the ``success`` key of the API response. """ endpoint = '%s/api/v1/cfssl/%s' % (self.uri_base, endpoint) response = requests.request( method=method, url=endpoint, params=params, json=data, verify=self.verify, ) response = response.json() if not response['success']: raise CFSSLRemoteException( '\n'.join([ 'Errors:', '\n'.join(map(CFSSL._format_response_message, response.get('errors', []))), 'Messages:' '\n'.join(map(CFSSL._format_response_message, response.get('messages', []))), ]) ) if 'messages' in response: for message in response['messages']: log.warning(CFSSL._format_response_message(message)) return response['result']
@staticmethod def _format_response_message(error): message = '' if isinstance(error, dict): message += (error['message'] if 'message' in error else '<undefined message>') if 'code' in error: message += ' (%s)' % error['code'] if not message: message = str(error) return message def _clean_mapping(self, mapping): """ It removes false entries from mapping """ return {k: v for k, v in mapping.items() if v}