Source code for signify.app.clienting

# -*- encoding: utf-8 -*-
"""
Signify
signify.app.clienting module

"""
from dataclasses import dataclass
from urllib.parse import urlparse, urljoin, urlsplit

import requests
import sseclient
from keri import kering
from keri.core.coring import Tiers
from keri.help import helping
from requests import HTTPError
from requests.auth import AuthBase

from signify.core import keeping, authing
from signify.signifying import State


class SignifyClient:

    def __init__(self, passcode, url=None, tier=Tiers.low, extern_modules=None):

        if len(passcode) < 21:
            raise kering.ConfigurationError(f"bran of length {len(passcode)} is too short, must be 21 characters")

        self.bran = passcode
        self.pidx = 0
        self.tier = tier
        self.extern_modules = extern_modules

        self.mgr = None
        self.session = None
        self.agent = None
        self.authn = None
        self.base = None

        self.ctrl = authing.Controller(bran=self.bran, tier=self.tier)
        if url is not None:
            self.connect(url)

    def connect(self, url):
        up = urlparse(url)
        if up.scheme not in kering.Schemes:
            raise kering.ConfigurationError(f"invalid scheme {up.scheme} for SignifyClient")

        self.base = url

        self.session = requests.Session()
        state = self.states()
        self.pidx = state.pidx

        # Create agent representing the AID of the cloud agent
        self.agent = authing.Agent(state=state.agent)

        # Create controller representing local auth AID
        self.ctrl = authing.Controller(bran=self.bran, tier=self.tier, state=state.controller)
        self.mgr = keeping.Manager(salter=self.ctrl.salter, extern_modules=self.extern_modules)

        if self.agent.delpre != self.ctrl.pre:
            raise kering.ConfigurationError("commitment to controller AID missing in agent inception event")

        if self.ctrl.serder.sn == 0:
            self.approveDelegation()

        self.authn = authing.Authenticater(agent=self.agent, ctrl=self.ctrl)
        self.session.auth = SignifyAuth(self.authn)
        self.session.hooks = dict(response=self.authn.verify)

    def approveDelegation(self):
        serder, sigs = self.ctrl.approveDelegation(self.agent)
        data = dict(ixn=serder.ked, sigs=sigs)
        self.put(path=f"/agent/{self.controller}?type=ixn", json=data)

    def rotate(self, nbran, aids):
        data = self.ctrl.rotate(nbran=nbran, aids=aids)
        self.put(path=f"/agent/{self.controller}", json=data)

    @property
    def controller(self):
        return self.ctrl.pre

    @property
    def icp(self):
        return self.ctrl.serder

    @property
    def salter(self):
        return self.ctrl.salter

    @property
    def manager(self):
        return self.mgr

    def states(self):
        caid = self.ctrl.pre
        res = self.session.get(url=urljoin(self.base, f"/agent/{caid}"))
        if res.status_code == 404:
            raise kering.ConfigurationError(f"agent does not exist for controller {caid}")

        data = res.json()
        state = State()
        state.controller = data["controller"]
        state.agent = data["agent"]
        state.pidx = data["pidx"] if "pidx" in data else 0

        return state

    def _save_old_salt(self, salt):
        caid = self.ctrl.pre
        body = dict(salt=salt)
        res = self.put(f"/salt/{caid}", json=body)
        return res.status_code == 204

    def _delete_old_salt(self):
        caid = self.ctrl.pre
        res = self.delete(f"/salt/{caid}")
        return res.status_code == 204

    def get(self, path, params=None, headers=None, body=None):
        url = urljoin(self.base, path)

        kwargs = dict()
        if params is not None:
            kwargs["params"] = params

        if headers is not None:
            kwargs["headers"] = headers

        if body is not None:
            kwargs["json"] = body

        res = self.session.get(url, **kwargs)
        if not res.ok:
            self.raiseForStatus(res)

        return res

    def stream(self, path, params=None, headers=None, body=None):
        url = urljoin(self.base, path)

        kwargs = dict()
        if params is not None:
            kwargs["params"] = params

        if headers is not None:
            kwargs["headers"] = headers

        if body is not None:
            kwargs["json"] = body

        client = sseclient.SSEClient(url, session=self.session, **kwargs)
        for event in client:
            yield event

    def delete(self, path, params=None, headers=None):
        url = urljoin(self.base, path)

        kwargs = dict()
        if params is not None:
            kwargs["params"] = params

        if headers is not None:
            kwargs["headers"] = headers

        res = self.session.delete(url, **kwargs)
        if not res.ok:
            self.raiseForStatus(res)

        return res

    def post(self, path, json, params=None, headers=None):
        url = urljoin(self.base, path)

        kwargs = dict(json=json)
        if params is not None:
            kwargs["params"] = params

        if headers is not None:
            kwargs["headers"] = headers

        res = self.session.post(url, **kwargs)
        if not res.ok:
            self.raiseForStatus(res)

        return res

    def put(self, path, json, params=None, headers=None):
        url = urljoin(self.base, path)

        kwargs = dict(json=json)
        if params is not None:
            kwargs["params"] = params

        if headers is not None:
            kwargs["headers"] = headers

        res = self.session.put(url, **kwargs)
        if not res.ok:
            self.raiseForStatus(res)

        return res

    def identifiers(self):
        from signify.app.aiding import Identifiers
        return Identifiers(client=self)

    def operations(self):
        from signify.app.coring import Operations
        return Operations(client=self)

    def oobis(self):
        from signify.app.coring import Oobis
        return Oobis(client=self)

    def credentials(self):
        from signify.app.credentialing import Credentials
        return Credentials(client=self)

    def keyStates(self):
        from signify.app.coring import KeyStates
        return KeyStates(client=self)

    def keyEvents(self):
        from signify.app.coring import KeyEvents
        return KeyEvents(client=self)

    def escrows(self):
        from signify.app.escrowing import Escrows
        return Escrows(client=self)

    def endroles(self):
        from signify.app.ending import EndRoleAuthorizations
        return EndRoleAuthorizations(client=self)

    def notifications(self):
        from signify.app.notifying import Notifications
        return Notifications(client=self)

    def groups(self):
        from signify.app.grouping import Groups
        return Groups(client=self)

    def registries(self):
        from signify.app.credentialing import Registries
        return Registries(client=self)

    def exchanges(self):
        from signify.peer.exchanging import Exchanges
        return Exchanges(client=self)

    def ipex(self):
        from signify.app.credentialing import Ipex
        return Ipex(client=self)

    def challenges(self):
        from signify.app.challenging import Challenges
        return Challenges(client=self)

    def contacts(self):
        from signify.app.contacting import Contacts
        return Contacts(client=self)

    @staticmethod
    def raiseForStatus(res):
        try:
            body = res.json()

            if "description" in body:
                reason = body["description"]
            elif "title" in body:
                reason = body["title"]
            else:
                reason = "Unknown"
        except Exception:
            reason = res.text

        http_error_msg = ""
        if 400 <= res.status_code < 500:
            http_error_msg = (
                f"{res.status_code} Client Error: {reason} for url: {res.url}"
            )

        elif 500 <= res.status_code < 600:
            http_error_msg = (
                f"{res.status_code} Server Error: {reason} for url: {res.url}"
            )
        raise HTTPError(http_error_msg, response=res)


[docs] class SignifyAuth(AuthBase): def __init__(self, authn): """ Args: authn(Authenticater): Provides request signing for AuthBase """ self.authn = authn def __call__(self, req): headers = req.headers headers['Signify-Resource'] = self.authn.ctrl.pre headers['Signify-Timestamp'] = helping.nowIso8601() if "Content-Length" not in headers and req.body: headers["Content-Length"] = len(req.body) p = urlsplit(req.url) path = p.path if p.path else "/" req.headers = self.authn.sign(headers, req.method, path) return req