Source code for ai.backend.client.auth

import enum
import hashlib
import hmac

import attr

from .base import api_function

__all__ = (
    'Auth',
    'AuthToken',
    'AuthTokenTypes',
    'generate_signature',
)


class AuthTokenTypes(enum.Enum):
    KEYPAIR = 'keypair'
    JWT = 'jwt'


@attr.s
class AuthToken:
    type = attr.ib(default=AuthTokenTypes.KEYPAIR)  # type: AuthTokenTypes
    content = attr.ib(default=None)                 # type: str


def generate_signature(method, version, endpoint,
                       date, rel_url, content_type, content,
                       access_key, secret_key, hash_type):
    '''
    Generates the API request signature from the given parameters.
    '''
    hash_type = hash_type
    hostname = endpoint._val.netloc  # FIXME: migrate to public API
    if version >= 'v4.20181215':
        content = b''
    else:
        if content_type.startswith('multipart/'):
            content = b''
    body_hash = hashlib.new(hash_type, content).hexdigest()

    sign_str = '{}\n{}\n{}\nhost:{}\ncontent-type:{}\nx-backendai-version:{}\n{}'.format(  # noqa
        method.upper(),
        rel_url,
        date.isoformat(),
        hostname,
        content_type.lower(),
        version,
        body_hash
    )
    sign_bytes = sign_str.encode()

    sign_key = hmac.new(secret_key.encode(),
                        date.strftime('%Y%m%d').encode(), hash_type).digest()
    sign_key = hmac.new(sign_key, hostname.encode(), hash_type).digest()

    signature = hmac.new(sign_key, sign_bytes, hash_type).hexdigest()
    headers = {
        'Authorization': 'BackendAI signMethod=HMAC-{}, credential={}:{}'.format(
            hash_type.upper(),
            access_key,
            signature
        ),
    }
    return headers, signature


[docs]class Auth: ''' Provides the function interface for login session management and authorization. '''
[docs] @api_function @classmethod async def login(cls, user_id: str, password: str) -> dict: ''' Log-in into the endpoint with the given user ID and password. It creates a server-side web session and return a dictionary with ``"authenticated"`` boolean field and JSON-encoded raw cookie data. ''' from .request import Request rqst = Request(cls.session, 'POST', '/server/login') rqst.set_json({ 'username': user_id, 'password': password, }) async with rqst.fetch(anonymous=True) as resp: data = await resp.json() data['cookies'] = resp.raw_response.cookies data['config'] = { 'username': user_id, } return data
[docs] @api_function @classmethod async def logout(cls) -> None: ''' Log-out from the endpoint. It clears the server-side web session. ''' from .request import Request rqst = Request(cls.session, 'POST', '/server/logout') async with rqst.fetch() as resp: resp.raw_response.raise_for_status()
[docs] @api_function @classmethod async def update_password(cls, old_password: str, new_password: str, new_password2: str) -> dict: """ Update user's password. This API works only for account owner. """ from .request import Request rqst = Request(cls.session, 'POST', '/auth/update-password') rqst.set_json({ 'old_password': old_password, 'new_password': new_password, 'new_password2': new_password2, }) async with rqst.fetch() as resp: return await resp.json()