# -*- encoding: utf-8 -*-
"""
SIGNIFY
signify.app.aiding module
"""
from math import ceil
from keri import kering
from keri.app.keeping import Algos
from keri.core import eventing
from keri.core.coring import MtrDex, Tholder
from keri.kering import Roles
from signify.app.clienting import SignifyClient
from signify.core import httping
[docs]
class Identifiers:
""" Domain class for accessing, creating and rotating KERI Autonomic IDentifiers (AIDs) """
def __init__(self, client: SignifyClient):
self.client = client
def list(self, start=0, end=24):
headers = dict(Range=f"aids={start}-{end}")
res = self.client.get(f"/identifiers", headers=headers)
cr = res.headers["content-range"]
start, end, total = httping.parseRangeHeader(cr, "aids")
return dict(start=start, end=end, total=total, aids=res.json())
def get(self, name):
res = self.client.get(f"/identifiers/{name}")
return res.json()
def create(self, name, transferable=True, isith="1", nsith="1", wits=None, toad="0", proxy=None, delpre=None,
dcode=MtrDex.Blake3_256, data=None, algo=Algos.salty, estOnly=False, DnD=False, **kwargs):
# Get the algo specific key params
keeper = self.client.manager.new(algo, self.client.pidx, **kwargs)
keys, ndigs = keeper.incept(transferable=transferable)
wits = wits if wits is not None else []
data = [data] if data is not None else []
cnfg = []
if estOnly:
cnfg.append(eventing.TraitCodex.EstOnly)
if DnD:
cnfg.append(eventing.TraitCodex.DoNotDelegate)
if delpre is not None:
serder = eventing.delcept(delpre=delpre,
keys=keys,
isith=isith,
nsith=nsith,
ndigs=ndigs,
code=dcode,
wits=wits,
toad=toad,
cnfg=cnfg,
data=data)
else:
serder = eventing.incept(keys=keys,
isith=isith,
nsith=nsith,
ndigs=ndigs,
code=dcode,
wits=wits,
toad=toad,
cnfg=cnfg,
data=data)
sigs = keeper.sign(serder.raw)
json = dict(
name=name,
icp=serder.ked,
sigs=sigs,
proxy=proxy)
json[algo] = keeper.params()
if 'states' in kwargs:
json['smids'] = [state['i'] for state in kwargs['states']]
if 'rstates' in kwargs:
json['rmids'] = [state['i'] for state in kwargs['rstates']]
self.client.pidx = self.client.pidx + 1
res = self.client.post("/identifiers", json=json)
return serder, sigs, res.json()
def update(self, name, typ, **kwas):
if typ == "interact":
self.interact(name, **kwas)
elif typ == "rotate":
self.rotate(name, **kwas)
else:
raise kering.KeriError(f"{typ} invalid identifier update type, only 'rotate' or 'interact' allowed")
pass
def delete(self, name):
pass
def interact(self, name, data=None):
hab = self.get(name)
pre = hab["prefix"]
state = hab["state"]
sn = int(state["s"], 16)
dig = state["d"]
data = data if isinstance(data, list) else [data]
serder = eventing.interact(pre, sn=sn + 1, data=data, dig=dig)
keeper = self.client.manager.get(aid=hab)
sigs = keeper.sign(ser=serder.raw)
json = dict(
ixn=serder.ked,
sigs=sigs)
json[keeper.algo] = keeper.params()
res = self.client.put(f"/identifiers/{name}?type=ixn", json=json)
return serder, sigs, res.json()
def rotate(self, name, *, transferable=True, nsith=None, toad=None, cuts=None, adds=None,
data=None, ncode=MtrDex.Ed25519_Seed, ncount=1, ncodes=None, states=None, rstates=None):
hab = self.get(name)
pre = hab["prefix"]
state = hab["state"]
count = len(state['k'])
dig = state["d"]
ridx = int(state["s"], 16) + 1
wits = state['b']
isith = state["kt"] if "kt" in state else None
if nsith is None:
nsith = isith # use new current as default
if isith is None: # compute default from newly rotated verfers above
isith = f"{max(1, ceil(count / 2)):x}"
if nsith is None: # compute default from newly rotated digers above
nsith = f"{max(0, ceil(ncount / 2)):x}"
cst = Tholder(sith=isith).sith # current signing threshold
nst = Tholder(sith=nsith).sith # next signing threshold
# Regenerate next keys to sign rotation event
keeper = self.client.manager.get(hab)
# Create new keys for next digests
if ncodes is None:
ncodes = [ncode] * ncount
keys, ndigs = keeper.rotate(ncodes=ncodes, transferable=transferable, states=states, rstates=rstates)
cuts = cuts if cuts is not None else []
adds = adds if adds is not None else []
data = [data] if data is not None else []
serder = eventing.rotate(pre=pre,
keys=keys,
dig=dig,
sn=ridx,
isith=cst,
nsith=nst,
ndigs=ndigs,
toad=toad,
wits=wits,
cuts=cuts,
adds=adds,
data=data)
sigs = keeper.sign(ser=serder.raw)
json = dict(
rot=serder.ked,
sigs=sigs)
json[keeper.algo] = keeper.params()
if states is not None:
json['smids'] = [state['i'] for state in states]
if rstates is not None:
json['rmids'] = [state['i'] for state in rstates]
res = self.client.put(f"/identifiers/{name}", json=json)
return serder, sigs, res.json()
def addEndRole(self, name, *, role=Roles.agent, eid=None, stamp=None):
hab = self.get(name)
pre = hab["prefix"]
rpy = self.makeEndRole(pre, role, eid, stamp)
keeper = self.client.manager.get(aid=hab)
sigs = keeper.sign(ser=rpy.raw)
json = dict(
rpy=rpy.ked,
sigs=sigs
)
res = self.client.post(f"/identifiers/{name}/endroles", json=json)
return rpy, sigs, res.json()
def sign(self, name, ser):
hab = self.get(name)
keeper = self.client.manager.get(aid=hab)
sigs = keeper.sign(ser=ser.raw)
return sigs
def members(self, name):
res = self.client.get(f"/identifiers/{name}/members")
return res.json()
@staticmethod
def makeEndRole(pre, role=Roles.agent, eid=None, stamp=None):
data = dict(cid=pre, role=role)
if eid is not None:
data['eid'] = eid
route = "/end/role/add"
return eventing.reply(route=route, data=data, stamp=stamp)