Source code for signify.core.keeping

# -*- encoding: utf-8 -*-
"""
SIGNIFY
signify.core.keeping module

"""


import importlib

from keri import kering
from keri.app import keeping
from keri.core import coring
from keri.core.coring import Tiers, MtrDex


class Manager:

    def __init__(self, salter, extern_modules=None):
        self.salter = salter
        extern_modules = extern_modules if extern_modules is not None else []
        self.modules = dict()
        for module in extern_modules:
            typ = module["type"]
            name = module["name"]
            params = module["params"]

            pkg = importlib.import_module(name)
            mod = pkg.module(**params)

            self.modules[typ] = mod

    def new(self, algo, pidx, **kwargs):
        match algo:
            case keeping.Algos.salty:
                return SaltyKeeper(salter=self.salter, pidx=pidx, **kwargs)

            case keeping.Algos.group:
                return GroupKeeper(mgr=self, **kwargs)

            case keeping.Algos.randy:
                return RandyKeeper(salter=self.salter, **kwargs)

            case keeping.Algos.extern:
                typ = kwargs["extern_type"]
                if typ not in self.modules:
                    raise kering.ConfigurationError(f"unsupported external module type {typ}")
                mod = self.modules[typ]

                eargs = kwargs["extern"]
                return mod.shim(pidx=pidx, **eargs)

    def get(self, aid):
        pre = coring.Prefixer(qb64=aid["prefix"])
        if keeping.Algos.salty in aid:
            kwargs = aid[keeping.Algos.salty]
            if "pidx" not in kwargs:
                raise kering.ConfigurationError(f"missing pidx in {kwargs}")
            return SaltyKeeper(salter=self.salter, **kwargs)

        elif keeping.Algos.randy in aid:
            kwargs = aid[keeping.Algos.randy]
            return RandyKeeper(salter=self.salter, transferable=pre.transferable, **kwargs)

        elif keeping.Algos.group in aid:
            kwargs = aid[keeping.Algos.group]
            return GroupKeeper(mgr=self, **kwargs)


class BaseKeeper:

    @property
    def algo(self):
        if isinstance(self, SaltyKeeper):
            return keeping.Algos.salty
        elif isinstance(self, RandyKeeper):
            return keeping.Algos.randy
        elif isinstance(self, GroupKeeper):
            return keeping.Algos.group
        else:
            return keeping.Algos.extern

    @staticmethod
    def __sign__(ser, signers, indexed=False, indices=None, ondices=None):
        if indexed:
            sigers = []
            for j, signer in enumerate(signers):
                if indices:  # not the default get index from indices
                    i = indices[j]  # must be whole number
                    if not isinstance(i, int) or i < 0:
                        raise ValueError(f"Invalid signing index = {i}, not "
                                         f"whole number.")
                else:  # the default
                    i = j  # same index as database

                if ondices:  # not the default get ondex from ondices
                    o = ondices[j]  # int means both, None means current only
                    if not (o is None or
                            isinstance(o, int) and not isinstance(o, bool) and o >= 0):
                        raise ValueError(f"Invalid other signing index = {o}, not "
                                         f"None or not whole number.")
                else:  # default
                    o = i  # must both be same value int
                # .sign assigns .verfer of siger and sets code of siger
                # appropriately for single or dual indexed signatures
                sigers.append(signer.sign(ser,
                                          index=i,
                                          only=True if o is None else False,
                                          ondex=o))
            return [siger.qb64 for siger in sigers]

        else:
            cigars = []
            for signer in signers:
                cigars.append(signer.sign(ser))  # assigns .verfer to cigar
            return [cigar.qb64 for cigar in cigars]


[docs] class SaltyKeeper(BaseKeeper): """ Keeper class for managing keys for an AID that uses a hierarchical deterministic key chain with a salt per AID. The passcode is used as an encryption key to encrypt and store the AID's salt on the server. This class can either be instantiated with an encrypted salt or None which will create a random salt for this AID. """ stem = "signify:aid" def __init__(self, salter, pidx, kidx=0, tier=Tiers.low, transferable=False, stem=None, code=MtrDex.Ed25519_Seed, count=1, icodes=None, ncode=MtrDex.Ed25519_Seed, ncount=1, ncodes=None, dcode=MtrDex.Blake3_256, bran=None, sxlt=None): """ Create an instance of a SaltyKeeper for managing keys for a single AID. This can be created from data saved externally to recreate keys at a given point in time or with values for a new AID. The sxlt parameter can contain an existing encrypted salt to use for the HDK algorithm for this key chain or None to create a new random one for a new AID Parameters: salter (Salter): encryption salter used for encrypting the AID salt. Typically user passcode pidx (int): prefix relative index for this AID in a Habery of AIDs kidx (int): key index for the current state of the key chain tier (Tiers): secret derivation security tier transferable (bool): True if the AID for this keeper can establish new keys stem (str): prefix for the path generated for key creation code (str): derivation code for signing key creation count (int): number of signing keys icodes (list): alternate to code and count to be specific about key codes ncode (str): derivation code for rotation key creation ncount (int): number of rotation keys ncodes (list): alternate to ncode and ncount to be specific about rotation key codes dcode (str): derivation code for hashing algorithm for next key digests bran (str): AID specific salt to use for key generate for this AID inception sxlt (str): qualified base64 of cipher of AID salt. """ if not icodes: # if not codes make list len count of same code icodes = [code] * count if not ncodes: ncodes = [ncode] * ncount # Salter is the entered passcode and used for enc/dec of salts for each AID signer = salter.signer(transferable=False) self.aeid = signer.verfer.qb64 self.encrypter = coring.Encrypter(verkey=self.aeid) self.decrypter = coring.Decrypter(seed=signer.qb64) self.tier = tier self.icodes = icodes self.ncodes = ncodes self.dcode = dcode self.pidx = pidx self.kidx = kidx self.transferable = transferable stem = stem if stem is not None else self.stem # sxlt is encrypted salt for this AID or None if incepting if bran is not None: bran = coring.MtrDex.Salt_128 + 'A' + bran[:21] self.creator = keeping.SaltyCreator(salt=bran, stem=stem, tier=tier) self.sxlt = self.encrypter.encrypt(self.creator.salt).qb64 elif sxlt is None: self.creator = keeping.SaltyCreator(stem=stem, tier=tier) self.sxlt = self.encrypter.encrypt(self.creator.salt).qb64 else: self.sxlt = sxlt ciph = coring.Cipher(qb64=self.sxlt) self.creator = keeping.SaltyCreator(self.decrypter.decrypt(cipher=ciph).qb64, stem=stem, tier=tier)
[docs] def params(self): """ Get AID parameters to store externally """ return dict( sxlt=self.sxlt, pidx=self.pidx, kidx=self.kidx, stem=self.stem, tier=self.tier, icodes=self.icodes, ncodes=self.ncodes, dcode=self.dcode, transferable=self.transferable )
[docs] def incept(self, transferable): """ Create verfers and digers for inception event for AID represented by this Keeper Args: transferable (bool): True if the AID for this keeper can establish new keys Returns: verfers(list): qualified base64 of signing public keys digers(list): qualified base64 of hash of rotation public keys """ self.transferable = transferable self.kidx = 0 signers = self.creator.create(codes=self.icodes, pidx=self.pidx, kidx=self.kidx, transferable=transferable) verfers = [signer.verfer.qb64 for signer in signers] nsigners = self.creator.create(codes=self.ncodes, pidx=self.pidx, kidx=len(self.icodes), transferable=self.transferable) digers = [coring.Diger(ser=nsigner.verfer.qb64b, code=self.dcode).qb64 for nsigner in nsigners] return verfers, digers
[docs] def rotate(self, ncodes, transferable, **_): """ Rotate and return verfers and digers for next rotation event for AID represented by this Keeper Args: ncodes (list): transferable (bool): derivation codes for rotation key creation Returns: verfers(list): qualified base64 of signing public keys digers(list): qualified base64 of hash of rotation public keys """ signers = self.creator.create(codes=self.ncodes, pidx=self.pidx, kidx=self.kidx + len(self.icodes), transferable=self.transferable) verfers = [signer.verfer.qb64 for signer in signers] self.kidx = self.kidx + len(self.icodes) nsigners = self.creator.create(codes=ncodes, pidx=self.pidx, kidx=self.kidx + len(self.icodes), transferable=transferable) digers = [coring.Diger(ser=nsigner.verfer.qb64b, code=self.dcode).qb64 for nsigner in nsigners] return verfers, digers
[docs] def sign(self, ser, indexed=True, indices=None, ondices=None): """ Sign provided data using the current signing keys for AID Args: ser (bytes): data to sign indexed (bool): True indicates the signatures are to be indexed signatures (indexed code) indices (list): specified signing indicies for each signature generated ondices (list): specified rotation indicies for each signature generated Returns: list: qualified b64 CESR encoded signatures """ signers = self.creator.create(codes=self.icodes, pidx=self.pidx, kidx=self.kidx, transferable=self.transferable) return self.__sign__(ser, signers=signers, indexed=indexed, indices=indices, ondices=ondices)
class RandyKeeper(BaseKeeper): def __init__(self, salter, code=MtrDex.Ed25519_Seed, count=1, icodes=None, transferable=False, ncode=MtrDex.Ed25519_Seed, ncount=1, ncodes=None, dcode=MtrDex.Blake3_256, prxs=None, nxts=None): self.salter = salter if not icodes: # if not codes make list len count of same code icodes = [code] * count if not ncodes: ncodes = [ncode] * ncount signer = salter.signer(transferable=False) self.aeid = signer.verfer.qb64 self.encrypter = coring.Encrypter(verkey=self.aeid) self.decrypter = coring.Decrypter(seed=signer.qb64) self.prxs = prxs self.nxts = nxts self.transferable = transferable self.icodes = icodes self.ncodes = ncodes self.dcode = dcode self.creator = keeping.RandyCreator() def params(self): return dict( prxs=self.prxs, nxts=self.nxts, transferable=self.transferable ) def incept(self, transferable): self.transferable = transferable signers = self.creator.create(codes=self.icodes, transferable=transferable) self.prxs = [self.encrypter.encrypt(matter=signer).qb64 for signer in signers] verfers = [signer.verfer.qb64 for signer in signers] nsigners = self.creator.create(codes=self.ncodes, transferable=transferable) self.nxts = [self.encrypter.encrypt(matter=signer).qb64 for signer in nsigners] digers = [coring.Diger(ser=nsigner.verfer.qb64b, code=self.dcode).qb64 for nsigner in nsigners] return verfers, digers def rotate(self, ncodes, transferable, **_): self.transferable = transferable self.prxs = self.nxts signers = [self.decrypter.decrypt(cipher=coring.Cipher(qb64=nxt), transferable=self.transferable) for nxt in self.nxts] verfers = [signer.verfer.qb64 for signer in signers] nsigners = self.creator.create(codes=ncodes, transferable=transferable) self.nxts = [self.encrypter.encrypt(matter=signer).qb64 for signer in nsigners] digers = [coring.Diger(ser=nsigner.verfer.qb64b, code=self.dcode).qb64 for nsigner in nsigners] return verfers, digers def sign(self, ser, indexed=True, indices=None, ondices=None, **_): signers = [self.decrypter.decrypt(ser=coring.Cipher(qb64=prx).qb64b, transferable=self.transferable) for prx in self.prxs] return self.__sign__(ser, signers=signers, indexed=indexed, indices=indices, ondices=ondices) class GroupKeeper(BaseKeeper): def __init__(self, mgr: Manager, mhab=None, states=None, rstates=None, keys=None, ndigs=None): self.mgr = mgr if states is not None: keys = [state['k'][0] for state in states] if rstates is not None: ndigs = [state['n'][0] for state in rstates] self.gkeys = keys self.gdigs = ndigs self.mhab = mhab def incept(self, **_): return self.gkeys, self.gdigs def rotate(self, states, rstates, **_): self.gkeys = [state['k'][0] for state in states] self.gdigs = [state['n'][0] for state in rstates] return self.gkeys, self.gdigs def sign(self, ser, indexed=True, **_): key = self.mhab['state']['k'][0] ndig = self.mhab['state']['n'][0] csi = self.gkeys.index(key) pni = self.gdigs.index(ndig) mkeeper = self.mgr.get(self.mhab) return mkeeper.sign(ser, indexed=indexed, indices=[csi], ondices=[pni]) def params(self): return dict( mhab=self.mhab, keys=self.gkeys, ndigs=self.gdigs )