diff --git a/pwncat/modules/__init__.py b/pwncat/modules/__init__.py index 9740992..96281cd 100644 --- a/pwncat/modules/__init__.py +++ b/pwncat/modules/__init__.py @@ -161,7 +161,7 @@ def run_decorator(real_run): """ Decorate a run function to evaluate types """ @functools.wraps(real_run) - def decorator(self, progress=None, **kwargs): + def decorator(self, session, progress=None, **kwargs): if "exec" in kwargs: has_exec = True @@ -189,7 +189,7 @@ def run_decorator(real_run): self.progress = progress # Return the result - result_object = real_run(self, **kwargs) + result_object = real_run(self, session, **kwargs) if inspect.isgenerator(result_object): @@ -281,7 +281,7 @@ class BaseModule(metaclass=BaseModuleMeta): # Filled in by reload self.name = None - def run(self, progress=None, **kwargs): + def run(self, session, progress=None, **kwargs): """ The run method is called via keyword-arguments with all the parameters specified in the ``ARGUMENTS`` dictionary. If ``ALLOW_KWARGS`` was True, then other keyword arguments may also be passed. Any diff --git a/pwncat/platform/__init__.py b/pwncat/platform/__init__.py index 344bc68..9789e0f 100644 --- a/pwncat/platform/__init__.py +++ b/pwncat/platform/__init__.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 from typing import List, Optional, Generator, Union, BinaryIO, Type +from subprocess import CalledProcessError import enum import pathlib import logging @@ -109,6 +110,11 @@ class Path: def is_mount(self) -> bool: """ Returns True if the path is a mount point. """ + if self.parent.stat().st_dev != self.stat().st_dev: + return True + + return False + def is_symlink(self) -> bool: """ Returns True if the path points to a symbolic link, False otherwise """ @@ -180,6 +186,14 @@ class Path: def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False): """ Create a new directory at this given path. """ + if not exist_ok and self.exists(): + raise FileExistsError(str(self)) + + if self.exists() and not self.is_dir(): + raise FileExistsError(str(self)) + + self._target.mkdir(str(self), mode=mode, parents=parents) + def open( self, mode: str = "r", @@ -225,8 +239,17 @@ class Path: def rename(self, target) -> "Path": """ Rename the file or directory to the given target (str or Path). """ + self._target.rename(str(self), str(target)) + + if not isinstance(target, self.__class__): + return self.__class__(target) + + return target + def replace(self, target) -> "Path": - """ Sawme as `rename` for Linux """ + """ Same as `rename` for Linux """ + + return self.rename(target) def resolve(self, strict: bool = False): """ Resolve the current path into an absolute path """ @@ -242,6 +265,11 @@ class Path: def rmdir(self): """ Remove this directory. The directory must be empty. """ + if not self.is_dir(): + raise NotADirectoryError(str(self)) + + self._target.rmdir(str(self)) + def samefile(self, otherpath: "Path"): """ Return whether this path points to the same file as other_path which can be either a Path object or a string. """ @@ -257,6 +285,20 @@ class Path: def symlink_to(self, target, target_is_directory: bool = False): """ Make this path a symbolic link to target. """ + if not isinstance(target, self.__class__): + target = self.__class__(target) + + if not target.exists(): + raise FileNotFoundError(str(target)) + + if self.exists(): + raise FileExistsError(str(self)) + + try: + self._target.symlink_to(str(target), str(self)) + except CalledProcessError as exc: + raise OSError(exc.stdout) from exc + def touch(self, mode: int = 0o666, exist_ok: bool = True): """ Createa file at this path. If the file already exists, function succeeds if exist_ok is true (and it's modification time is updated). @@ -275,9 +317,35 @@ class Path: def unlink(self, missing_ok: bool = False): """ Remove the file or symbolic link. """ + if not missing_ok and not self.exists(): + raise FileNotFoundError(str(self)) + + try: + self._target.unlink(str(self)) + except FileNotFoundError as exc: + # In this case, we couldn't distinguish between errors + # so, we distinguish here based on stat results + if self.is_dir(): + raise OSError(f"Directory not empty: {str(self)}") from exc + raise + def link_to(self, target): """ Create a hard link pointing to a path named target """ + if not isinstance(target, self.__class__): + target = self.__class__(target) + + if not target.exists(): + raise FileNotFoundError(str(target)) + + if self.exists(): + raise FileExistsError(str(self)) + + try: + self._target.link_to(str(target), str(self)) + except CalledProcessError as exc: + raise OSError(exc.stdout) from exc + def write_bytes(self, data: bytes): """ Open the file pointed to in bytes mode and write data to it. """ diff --git a/pwncat/platform/linux.py b/pwncat/platform/linux.py index 3ce274a..3a14587 100644 --- a/pwncat/platform/linux.py +++ b/pwncat/platform/linux.py @@ -402,147 +402,6 @@ class LinuxWriter(BufferedIOBase): self.detach() -class LinuxPath(pathlib.PurePosixPath): - """ Implementation of a concrete path based on the - pathlib.PurePosixPath class. This implements most of the - methods provided by concrete paths within the pathlib.Path - class. """ - - def __init__(self, target: "Linux", *pathsegments): - super().__init__(*pathsegments) - - self._target = target - self._stat = None - - def stat(self) -> os.stat_result: - """ Run `stat` on the path and return a stat result """ - - if self._stat is not None: - return self._stat - - return self._target.stat(str(self)) - - def chmod(self, mode: int): - """ Execute `chmod` on the remote file to change permissions """ - - def exists(self) -> bool: - """ Return true if the specified path exists on the remote system """ - - def expanduser(self) -> "LinuxPath": - """ Return a new path object with ~ and ~user expanded """ - - def glob(self, pattern: str) -> Generator["LinuxPath", None, None]: - """ Glob the given relative pattern in the directory represented - by this path, yielding all matching files (of any kind) """ - - def group(self) -> str: - """ Returns the name of the group owning the file. KeyError is raised - if the file's GID isn't found in the system database. """ - - def is_dir(self) -> bool: - """ Returns True if the path points to a directory (or a symbolic link - pointing to a directory). False if it points to another kind of file. - """ - - def is_file(self) -> bool: - """ Returns True if the path points to a regular file """ - - def is_mount(self) -> bool: - """ Returns True if the path is a mount point. """ - - def is_symlink(self) -> bool: - """ Returns True if the path points to a symbolic link, False otherwise """ - - def is_socket(self) -> bool: - """ Returns True if the path points to a Unix socket """ - - def is_fifo(self) -> bool: - """ Returns True if the path points to a FIFO """ - - def is_block_device(self) -> bool: - """ Returns True if the path points to a block device """ - - def is_char_device(self) -> bool: - """ Returns True if the path points to a character device """ - - def iterdir(self) -> bool: - """ When the path points to a directory, yield path objects of the - directory contents. """ - - def lchmod(self, mode: int): - """ Modify a symbolic link's mode (same as chmod for non-symbolic links) """ - - def lstat(self) -> os.stat_result: - """ Same as stat except operate on the symbolic link file itself rather - than the file it points to. """ - - def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False): - """ Create a new directory at this given path. """ - - def open( - self, - mode: str = "r", - buffering: int = -1, - encoding: str = None, - errors: str = None, - newline: str = None, - ): - """ Open the file pointed to by the path, like Platform.open """ - - def owner(self) -> str: - """ Return the name of the user owning the file. KeyError is raised if - the file's uid is not found in the System database """ - - def read_bytes(self) -> bytes: - """ Return the binary contents of the pointed-to file as a bytes object """ - - def read_text(self, encoding: str = None, errors: str = None) -> str: - """ Return the decoded contents of the pointed-to file as a string """ - - def readlink(self) -> "LinuxPath": - """ Return the path to which the symbolic link points """ - - def rename(self, target) -> "LinuxPath": - """ Rename the file or directory to the given target (str or Path). """ - - def replace(self, target) -> "LinuxPath": - """ Sawme as `rename` for Linux """ - - def resolve(self, strict: bool = False): - """ Resolve the current path into an absolute path """ - - def rglob(self, pattern: str) -> Generator["LinuxPath", None, None]: - """ This is like calling Path.glob() with "**/" added to in the front - of the given relative pattern """ - - def rmdir(self): - """ Remove this directory. The directory must be empty. """ - - def samefile(self, otherpath: "LinuxPath"): - """ Return whether this path points to the same file as other_path - which can be either a Path object or a string. """ - - def symlink_to(self, target, target_is_directory: bool = False): - """ Make this path a symbolic link to target. """ - - def touch(self, mode: int = 0o666, exist_ok: bool = True): - """ Createa file at this path. If the file already exists, function - succeeds if exist_ok is true (and it's modification time is updated). - Otherwise FileExistsError is raised. """ - - def unlink(self, missing_ok: bool = False): - """ Remove the file or symbolic link. """ - - def link_to(self, target): - """ Create a hard link pointing to a path named target """ - - def write_bytes(self, data: bytes): - """ Open the file pointed to in bytes mode and write data to it. """ - - def write_text(self, data: str, encoding: str = None, errors: str = None): - """ Open the file pointed to in text mode, and write data to it. """ - - class Linux(Platform): """ Concrete platform class abstracting interaction with a GNU/Linux remote @@ -1340,3 +1199,62 @@ class Linux(Platform): self.run(["chmod", "-h", oct(mode)[2:], path]) else: self.run(["chmod", oct(mode)[2:], path]) + + def mkdir(self, path: str, mode: int = 0o777, parents: bool = False): + """ Create a new directory """ + + try: + if parents: + self.run( + ["mkdir", "-p", "-m", oct(mode)[2:], path], text=True, check=True + ) + else: + self.run( + ["mkdir", "-p", "-m", oct(mode)[2:], path], text=True, check=True + ) + except CalledProcessError as exc: + if "exists" in exc.stdout: + raise FileExistsError(exc.stdout) from exc + else: + raise FileNotFoundError(exc.stdout) from exc + + def rename(self, source: str, target: str): + """ Rename a file from the source to the target. This should + replace the target if it exists. """ + + try: + self.run(["mv", source, target], check=True) + except CalledProcessError as exc: + raise FileNotFoundError(source) from exc + + def rmdir(self, target: str): + """ Remove the specified directory. It must be empty. """ + + try: + self.run(["rmdir", target], check=True) + except CalledProcessError as exc: + raise OSError(f"Directory not empty: {target}") from exc + + def symlink_to(self, source: str, target: str): + """ Create a symbolic link to source from target """ + + # Since this function is unlikely to be called outside of + # the path abstraction, we don't do much error checking. + # We can't reliably tell what happened when the process + # fails without checking stat output which is easier from + # the path abstraction itself. + self.run(["ln", "-s", source, target], check=True) + + def link_to(self, source: str, target: str): + """ Create a filesystem hard link. """ + + # Same warning as with symlink + self.run(["ln", source, target], check=True) + + def unlink(self, target: str): + """ Remove a link to a file (similar to `rm`) """ + + try: + self.run(["rm", target], check=True) + except CalledProcessError as exc: + raise FileNotFoundError(target) from exc