from pathlib import Path
from typing import Sequence, Union
import aiohttp
from aiohttp import hdrs
from tqdm import tqdm
from .base import api_function
from .compat import current_loop
from .config import DEFAULT_CHUNK_SIZE
from .exceptions import BackendAPIError
from .request import Request, AttachedFile
from .cli.pretty import ProgressReportingReader
__all__ = (
'VFolder',
)
[docs]class VFolder:
session = None
'''The client session instance that this function class is bound to.'''
def __init__(self, name: str):
self.name = name
[docs] @api_function
@classmethod
async def create(cls, name: str, host: str = None, group: str = None):
rqst = Request(cls.session, 'POST', '/folders')
rqst.set_json({
'name': name,
'host': host,
'group': group,
})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def delete_by_id(cls, oid):
rqst = Request(cls.session, 'DELETE', '/folders')
rqst.set_json({'id': oid})
async with rqst.fetch():
return {}
[docs] @api_function
@classmethod
async def list(cls, list_all=False):
rqst = Request(cls.session, 'GET', '/folders')
rqst.set_json({'all': list_all})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def list_hosts(cls):
rqst = Request(cls.session, 'GET', '/folders/_/hosts')
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def list_all_hosts(cls):
rqst = Request(cls.session, 'GET', '/folders/_/all_hosts')
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def list_allowed_types(cls):
rqst = Request(cls.session, 'GET', '/folders/_/allowed_types')
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
async def info(self):
rqst = Request(self.session, 'GET', '/folders/{0}'.format(self.name))
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
async def delete(self):
rqst = Request(self.session, 'DELETE', '/folders/{0}'.format(self.name))
async with rqst.fetch():
return {}
[docs] @api_function
async def rename(self, new_name):
rqst = Request(self.session, 'POST', '/folders/{0}/rename'.format(self.name))
rqst.set_json({
'new_name': new_name,
})
async with rqst.fetch() as resp:
self.name = new_name
return await resp.text()
[docs] @api_function
async def upload(self, files: Sequence[Union[str, Path]],
basedir: Union[str, Path] = None,
show_progress: bool = False):
base_path = (Path.cwd() if basedir is None
else Path(basedir).resolve())
files = [Path(file).resolve() for file in files]
total_size = 0
for file_path in files:
total_size += file_path.stat().st_size
tqdm_obj = tqdm(desc='Uploading files',
unit='bytes', unit_scale=True,
total=total_size,
disable=not show_progress)
with tqdm_obj:
attachments = []
for file_path in files:
try:
attachments.append(AttachedFile(
str(file_path.relative_to(base_path)),
ProgressReportingReader(str(file_path),
tqdm_instance=tqdm_obj),
'application/octet-stream',
))
except ValueError:
msg = 'File "{0}" is outside of the base directory "{1}".' \
.format(file_path, base_path)
raise ValueError(msg) from None
rqst = Request(self.session,
'POST', '/folders/{}/upload'.format(self.name))
rqst.attach_files(attachments)
async with rqst.fetch() as resp:
return await resp.text()
[docs] @api_function
async def mkdir(self, path: Union[str, Path]):
rqst = Request(self.session, 'POST',
'/folders/{}/mkdir'.format(self.name))
rqst.set_json({
'path': path,
})
async with rqst.fetch() as resp:
return await resp.text()
[docs] @api_function
async def rename_file(self, target_path: str, new_name: str):
rqst = Request(self.session, 'POST',
'/folders/{}/rename_file'.format(self.name))
rqst.set_json({
'target_path': target_path,
'new_name': new_name,
})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
async def delete_files(self,
files: Sequence[Union[str, Path]],
recursive: bool = False):
rqst = Request(self.session, 'DELETE',
'/folders/{}/delete_files'.format(self.name))
rqst.set_json({
'files': files,
'recursive': recursive,
})
async with rqst.fetch() as resp:
return await resp.text()
[docs] @api_function
async def download(self, files: Sequence[Union[str, Path]],
show_progress: bool = False):
rqst = Request(self.session, 'GET',
'/folders/{}/download'.format(self.name))
rqst.set_json({
'files': files,
})
file_names = []
async with rqst.fetch() as resp:
if resp.status // 100 != 2:
raise BackendAPIError(resp.status, resp.reason,
await resp.text())
total_bytes = int(resp.headers['X-TOTAL-PAYLOADS-LENGTH'])
tqdm_obj = tqdm(desc='Downloading files',
unit='bytes', unit_scale=True,
total=total_bytes,
disable=not show_progress)
reader = aiohttp.MultipartReader.from_response(resp.raw_response)
with tqdm_obj as pbar:
loop = current_loop()
acc_bytes = 0
while True:
part = await reader.next()
if part is None:
break
assert part.headers.get(hdrs.CONTENT_ENCODING, 'identity').lower() in (
'identity',
'gzip', # Prior to v19.09.4, the server had a bug to set this incorrectly.
# This legacy handling will be removed in v19.12 release.
)
assert part.headers.get(hdrs.CONTENT_TRANSFER_ENCODING, 'binary').lower() in (
'binary', '8bit', '7bit',
)
with open(part.filename, 'wb') as fp:
while True:
chunk = await part.read_chunk(DEFAULT_CHUNK_SIZE)
if not chunk:
break
await loop.run_in_executor(None, lambda: fp.write(chunk))
acc_bytes += len(chunk)
pbar.update(len(chunk))
pbar.update(total_bytes - acc_bytes)
return {'file_names': file_names}
[docs] @api_function
async def list_files(self, path: Union[str, Path] = '.'):
rqst = Request(self.session, 'GET', '/folders/{}/files'.format(self.name))
rqst.set_json({
'path': path,
})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
async def invite(self, perm: str, emails: Sequence[str]):
rqst = Request(self.session, 'POST', '/folders/{}/invite'.format(self.name))
rqst.set_json({
'perm': perm, 'user_ids': emails,
})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def invitations(cls):
rqst = Request(cls.session, 'GET', '/folders/invitations/list')
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def accept_invitation(cls, inv_id: str):
rqst = Request(cls.session, 'POST', '/folders/invitations/accept')
rqst.set_json({'inv_id': inv_id})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def delete_invitation(cls, inv_id: str):
rqst = Request(cls.session, 'DELETE', '/folders/invitations/delete')
rqst.set_json({'inv_id': inv_id})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def get_fstab_contents(cls, agent_id=None):
rqst = Request(cls.session, 'GET', '/folders/_/fstab')
rqst.set_json({
'agent_id': agent_id,
})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def list_mounts(cls):
rqst = Request(cls.session, 'GET', '/folders/_/mounts')
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def mount_host(cls, name: str, fs_location: str, options=None,
edit_fstab: bool = False):
rqst = Request(cls.session, 'POST', '/folders/_/mounts')
rqst.set_json({
'name': name,
'fs_location': fs_location,
'options': options,
'edit_fstab': edit_fstab,
})
async with rqst.fetch() as resp:
return await resp.json()
[docs] @api_function
@classmethod
async def umount_host(cls, name: str, edit_fstab: bool = False):
rqst = Request(cls.session, 'DELETE', '/folders/_/mounts')
rqst.set_json({
'name': name,
'edit_fstab': edit_fstab,
})
async with rqst.fetch() as resp:
return await resp.json()