from __future__ import annotations
import logging
from pathlib import (
Path,
PurePosixPath,
)
from queue import Queue
from shlex import quote as posix_quote
from typing import (
BinaryIO,
Callable,
)
from .common import DownloadResponseGenerator
from ..shell import (
ExecutionResult,
ShellCommandExecutor,
create_result,
)
from datalad_next.consts import COPY_BUFSIZE
__all__ = [
'DownloadResponseGenerator',
'DownloadResponseGeneratorPosix',
'upload',
'download',
'delete',
]
lgr = logging.getLogger("datalad.ext.next.shell.operations")
[docs]
class DownloadResponseGeneratorPosix(DownloadResponseGenerator):
"""A response generator for efficient download commands from Linux systems"""
[docs]
def get_final_command(self, remote_file_name: bytes) -> bytes:
"""Return a final command list for the download of ``remote_file_name``
The POSIX version for download response generators.
This method is usually only called by
:meth:`ShellCommandExecutor.__call__`.
Parameters
----------
remote_file_name : bytes
The name of the file that should be downloaded. If the file name
contains special character, e.g. space or ``$``, it must be
quoted for a POSIX shell, for example with ``shlex.quote``.
Returns
-------
bytes
The final command that will be executed in the persistent shell
in order to start the download in the connected shell.
"""
command = b"""
test -r {remote_file_name}
if [ $? -eq 0 ]; then
LC_ALL=C ls -dln -- {remote_file_name} | awk '{print $5; exit}'
cat {remote_file_name}
echo $?
else
echo -1;
fi
""".replace(b'{remote_file_name}', remote_file_name)
return command
[docs]
def upload(
shell: ShellCommandExecutor,
local_path: Path,
remote_path: PurePosixPath,
*,
progress_callback: Callable[[int, int], None] | None = None,
check: bool = False,
) -> ExecutionResult:
"""Upload a local file to a named file in the connected shell
This function uploads a file to the connected shell ``shell``. It uses
``head`` to limit the number of bytes that the remote shell will read.
This ensures that the upload is terminated.
The requirements for upload are:
- The connected shell must be a POSIX shell.
- ``head`` must be installed in the remote shell.
Parameters
----------
shell : ShellCommandExecutor
The shell that should be used to upload the file.
local_path : Path
The path of the file that should be uploaded.
remote_path : PurePosixPath
The path of the file on the connected shell that will contain the
uploaded content.
progress_callback : callable[[int, int], None], optional, default: None
If given, the callback is called with the number of bytes that have
been sent and the total number of bytes that should be sent.
check : bool, optional, default: False
If ``True``, raise a :class:`CommandError` if the remote operation does
not exit with a ``0`` as return code.
Returns
-------
ExecutionResult
The result of the upload operation.
Raises
-------
CommandError:
If the remote operation does not exit with a ``0`` as return code, and
``check`` is ``True``, a :class:`CommandError` is raised. It will
contain the exit code and the last (up to ``chunk_size`` (defined by the
``chunk_size`` keyword argument to :func:`shell`)) bytes of stderr
output.
"""
def signaling_read(
file: BinaryIO,
size: int,
queue: Queue,
*,
chunk_size: int = COPY_BUFSIZE
):
"""iterator that reads from a file and signals EOF via a queue
This iterator is used to prevent the situation where a file that
should be uploaded is completely read and uploaded, but the final
EOF-triggering `read()` call has not yet been made. In this case it can
happen that the server provides an answer. If the answer is interpreted
as indicator for a completed operation, the calling code assumes
that it can close all file handles associated with the operation. This
can lead to the final `read()` call being performed on a closed file,
which would raise a `ValueError`. To prevent this, ``signaling_read``
signals the end of the read-operation, i.e. an EOF was read, by
enqueuing ``Ǹone`` into the signaling queue. The caller can wait for
that event to ensure that the read operation is really done.
"""
processed = 0
while True:
data = file.read(chunk_size)
if data == b"":
break
yield data
processed += len(data)
if progress_callback is not None:
progress_callback(processed, size)
queue.put(None)
# The following command line ensures that content that we send to the shell
# either goes to the destination file or into `/dev/null`, but not into the
# stdin of the shell. In the latter case it would be interpreted as the
# next command, and that might be bad, e.g. if the uploaded content was
# `rm -rf $HOME`.
file_size = local_path.stat().st_size
cmd_line = (
f'head -c {file_size} > {posix_quote(str(remote_path))}'
f"|| (head -c {file_size} > /dev/null; test 1 == 2)"
)
with local_path.open("rb") as local_file:
# We use the `signaling_read` iterator to deal with the situation where
# the content of a file that should be uploaded is completely read and
# uploaded, but the final, EOF-triggering, `read()` call has not yet been
# made. In this case it can happen that the server provides an answer,
# and we leave the context, thereby closing the file. When the
# `iterable_subprocess.<locals>.input_to`-thread then tries to read
# from the file, a `ValueError` would be raised. This exception would
# in turn lead to the closing of stdin of the `shell`-subprocess and
# render it unusable.`signaling_read` allows us to wait for a completed
# read, including the EOF reading.
signal_queue: Queue = Queue()
result = shell(
cmd_line, stdin=signaling_read(local_file, file_size, signal_queue)
)
signal_queue.get()
if check:
result.to_exception(cmd_line, 'upload failed')
return result
[docs]
def download(
shell: ShellCommandExecutor,
remote_path: PurePosixPath,
local_path: Path,
*,
progress_callback: Callable[[int, int], None] | None = None,
response_generator_class: type[
DownloadResponseGenerator
] = DownloadResponseGeneratorPosix,
check: bool = False,
) -> ExecutionResult:
"""Download a file from the connected shell
This method downloads a file from the connected shell.
The requirements for download via instances of class
:class:`DownloadResponseGeneratorPosix` are:
- The connected shell must support `ls -dln`.
- The connected shell must support `echo -e`.
- The connected shell must support `awk`.
- The connected shell must support `cat`.
Parameters
----------
shell: ShellCommandExecutor
The shell from which a file should be downloaded.
remote_path : PurePosixPath
The path of the file on the connected shell that should be
downloaded.
local_path : Path
The path of the local file that will contain the downloaded content.
progress_callback : callable[[int, int], None], optional, default: None
If given, the callback is called with the number of bytes that have
been received and the total number of bytes that should be received.
response_generator_class : type[DownloadResponseGenerator], optional, default: DownloadResponseGeneratorPosix
The response generator that should be used to handle the download
output. It must be a subclass of :class:`DownloadResponseGenerator`.
The default works if the connected shell runs on a Unix-like system that
provides `ls -dln`, `cat`, `echo`, and `awk`, e.g. ``Linux`` or ``OSX``.
check : bool, optional, default: False
If ``True``, raise a :class:`CommandError` if the remote operation does
not exit with a ``0`` as return code.
Returns
-------
ExecutionResult
The result of the download operation.
Raises
-------
CommandError:
If the remote operation does not exit with a ``0`` as return code, and
``check`` is ``True``, a :class:`CommandError` is raised. It will
contain the exit code and the last (up to ``chunk_size`` (defined by the
``chunk_size`` keyword argument to :func:`shell`)) bytes of stderr
output.
"""
command = posix_quote(str(remote_path)).encode()
response_generator = response_generator_class(shell.stdout)
result_generator = shell.start(
command,
response_generator=response_generator,
)
with local_path.open("wb") as local_file:
processed = 0
for chunk in result_generator:
local_file.write(chunk)
processed += len(chunk)
if progress_callback is not None:
progress_callback(processed, response_generator.length)
stderr = b''.join(result_generator.stderr_deque)
result_generator.stderr_deque.clear()
return create_result(
result_generator,
command,
stdout=b'',
stderr=stderr,
check=check,
error_message='download failed',
)
[docs]
def delete(
shell: ShellCommandExecutor,
files: list[PurePosixPath],
*,
force: bool = False,
check: bool = False,
) -> ExecutionResult:
"""Delete files on the connected shell
The requirements for delete are:
- The connected shell must be a POSIX shell.
- ``rm`` must be installed in the remote shell.
Parameters
----------
shell: ShellCommandExecutor
The shell from which a file should be downloaded.
files : list[PurePosixPath]
The "paths" of the files that should be deleted.
force : bool
If ``True``, enforce removal, if possible. For example, the command
could change the permissions of the files to be deleted to ensure
their removal.
check : bool, optional, default: False
If ``True``, raise a :class:`CommandError` if the remote operation does
not exit with a ``0`` as return code.
Raises
-------
CommandError:
If the remote operation does not exit with a ``0`` as return code, and
``check`` is ``True``, a :class:`CommandError` is raised. It will
contain the exit code and the last (up to ``chunk_size`` (defined by the
``chunk_size`` keyword argument to :func:`shell`)) bytes of stderr
output.
"""
cmd_line = (
"rm "
+ ("-f " if force else "")
+ " ".join(
f"{posix_quote(str(f))}"
for f in files
)
)
result = shell(cmd_line.encode())
if check:
result.to_exception(cmd_line, 'delete failed')
return result