Source code for ai.backend.client.agent

import textwrap
from typing import Iterable, Sequence

from .base import api_function
from .request import Request

__all__ = (
    'Agent',
    'AgentWatcher',
)


[docs]class Agent: ''' Provides a shortcut of :func:`Admin.query() <ai.backend.client.admin.Admin.query>` that fetches various agent information. .. note:: All methods in this function class require your API access key to have the *admin* privilege. ''' session = None '''The client session instance that this function class is bound to.'''
[docs] @api_function @classmethod async def list_with_limit(cls, limit, offset, status: str = 'ALIVE', fields: Iterable[str] = None) -> Sequence[dict]: ''' Fetches the list of agents with the given status with limit and offset for pagination. :param limit: number of agents to get :param offset: offset index of agents to get :param status: An upper-cased string constant representing agent status (one of ``'ALIVE'``, ``'TERMINATED'``, ``'LOST'``, etc.) :param fields: Additional per-agent query fields to fetch. ''' if fields is None: fields = ( 'id', 'addr', 'status', 'first_contact', 'mem_slots', 'cpu_slots', 'gpu_slots', ) q = 'query($limit: Int!, $offset: Int!, $status: String) {' \ ' agent_list(limit: $limit, offset: $offset, status: $status) {' \ ' items { $fields }' \ ' total_count' \ ' }' \ '}' q = q.replace('$fields', ' '.join(fields)) variables = { 'limit': limit, 'offset': offset, 'status': status, } rqst = Request(cls.session, 'POST', '/admin/graphql') rqst.set_json({ 'query': q, 'variables': variables, }) async with rqst.fetch() as resp: data = await resp.json() return data['agent_list']
[docs] @api_function @classmethod async def detail(cls, agent_id: str, fields: Iterable[str] = None) -> Sequence[dict]: if fields is None: fields = ('id', 'status', 'addr', 'region', 'first_contact', 'cpu_cur_pct', 'mem_cur_bytes', 'available_slots', 'occupied_slots') query = textwrap.dedent('''\ query($agent_id: String!) { agent(agent_id: $agent_id) {$fields} } ''') query = query.replace('$fields', ' '.join(fields)) variables = {'agent_id': agent_id} rqst = Request(cls.session, 'POST', '/admin/graphql') rqst.set_json({ 'query': query, 'variables': variables, }) async with rqst.fetch() as resp: data = await resp.json() return data['agent']
class AgentWatcher: ''' Provides a shortcut of :func:`Admin.query() <ai.backend.client.admin.Admin.query>` that manipulate agent status. .. note:: All methods in this function class require you to have the *superadmin* privilege. ''' session = None '''The client session instance that this function class is bound to.''' @api_function @classmethod async def get_status(cls, agent_id: str) -> dict: ''' Get agent and watcher status. ''' rqst = Request(cls.session, 'GET', '/resource/watcher') rqst.set_json({'agent_id': agent_id}) async with rqst.fetch() as resp: data = await resp.json() if 'message' in data: return data['message'] else: return data @api_function @classmethod async def agent_start(cls, agent_id: str) -> dict: ''' Start agent. ''' rqst = Request(cls.session, 'POST', '/resource/watcher/agent/start') rqst.set_json({'agent_id': agent_id}) async with rqst.fetch() as resp: data = await resp.json() if 'message' in data: return data['message'] else: return data @api_function @classmethod async def agent_stop(cls, agent_id: str) -> dict: ''' Stop agent. ''' rqst = Request(cls.session, 'POST', '/resource/watcher/agent/stop') rqst.set_json({'agent_id': agent_id}) async with rqst.fetch() as resp: data = await resp.json() if 'message' in data: return data['message'] else: return data @api_function @classmethod async def agent_restart(cls, agent_id: str) -> dict: ''' Restart agent. ''' rqst = Request(cls.session, 'POST', '/resource/watcher/agent/restart') rqst.set_json({'agent_id': agent_id}) async with rqst.fetch() as resp: data = await resp.json() if 'message' in data: return data['message'] else: return data