1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-27 19:04:15 +01:00

Finished up path abstraction

I believe the path and platform abstraction is more or less complete.
You are able to abstractly run processes and interactive with the remote
FS in the same way as the local one (mimicked pathlib and subprocess).
I now need to convert the modules and the rest of the commands to work
with the new manager/session architecture. 😭
This commit is contained in:
Caleb Stewart 2020-11-09 00:45:56 -05:00
parent 5072b01340
commit f80d6b65ee
3 changed files with 131 additions and 145 deletions

View File

@ -161,7 +161,7 @@ def run_decorator(real_run):
""" Decorate a run function to evaluate types """ """ Decorate a run function to evaluate types """
@functools.wraps(real_run) @functools.wraps(real_run)
def decorator(self, progress=None, **kwargs): def decorator(self, session, progress=None, **kwargs):
if "exec" in kwargs: if "exec" in kwargs:
has_exec = True has_exec = True
@ -189,7 +189,7 @@ def run_decorator(real_run):
self.progress = progress self.progress = progress
# Return the result # Return the result
result_object = real_run(self, **kwargs) result_object = real_run(self, session, **kwargs)
if inspect.isgenerator(result_object): if inspect.isgenerator(result_object):
@ -281,7 +281,7 @@ class BaseModule(metaclass=BaseModuleMeta):
# Filled in by reload # Filled in by reload
self.name = None self.name = None
def run(self, progress=None, **kwargs): def run(self, session, progress=None, **kwargs):
""" The run method is called via keyword-arguments with all the """ The run method is called via keyword-arguments with all the
parameters specified in the ``ARGUMENTS`` dictionary. If ``ALLOW_KWARGS`` parameters specified in the ``ARGUMENTS`` dictionary. If ``ALLOW_KWARGS``
was True, then other keyword arguments may also be passed. Any was True, then other keyword arguments may also be passed. Any

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from typing import List, Optional, Generator, Union, BinaryIO, Type from typing import List, Optional, Generator, Union, BinaryIO, Type
from subprocess import CalledProcessError
import enum import enum
import pathlib import pathlib
import logging import logging
@ -109,6 +110,11 @@ class Path:
def is_mount(self) -> bool: def is_mount(self) -> bool:
""" Returns True if the path is a mount point. """ """ Returns True if the path is a mount point. """
if self.parent.stat().st_dev != self.stat().st_dev:
return True
return False
def is_symlink(self) -> bool: def is_symlink(self) -> bool:
""" Returns True if the path points to a symbolic link, False otherwise """ """ Returns True if the path points to a symbolic link, False otherwise """
@ -180,6 +186,14 @@ class Path:
def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False): def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False):
""" Create a new directory at this given path. """ """ Create a new directory at this given path. """
if not exist_ok and self.exists():
raise FileExistsError(str(self))
if self.exists() and not self.is_dir():
raise FileExistsError(str(self))
self._target.mkdir(str(self), mode=mode, parents=parents)
def open( def open(
self, self,
mode: str = "r", mode: str = "r",
@ -225,8 +239,17 @@ class Path:
def rename(self, target) -> "Path": def rename(self, target) -> "Path":
""" Rename the file or directory to the given target (str or Path). """ """ Rename the file or directory to the given target (str or Path). """
self._target.rename(str(self), str(target))
if not isinstance(target, self.__class__):
return self.__class__(target)
return target
def replace(self, target) -> "Path": def replace(self, target) -> "Path":
""" Sawme as `rename` for Linux """ """ Same as `rename` for Linux """
return self.rename(target)
def resolve(self, strict: bool = False): def resolve(self, strict: bool = False):
""" Resolve the current path into an absolute path """ """ Resolve the current path into an absolute path """
@ -242,6 +265,11 @@ class Path:
def rmdir(self): def rmdir(self):
""" Remove this directory. The directory must be empty. """ """ Remove this directory. The directory must be empty. """
if not self.is_dir():
raise NotADirectoryError(str(self))
self._target.rmdir(str(self))
def samefile(self, otherpath: "Path"): def samefile(self, otherpath: "Path"):
""" Return whether this path points to the same file as other_path """ Return whether this path points to the same file as other_path
which can be either a Path object or a string. """ which can be either a Path object or a string. """
@ -257,6 +285,20 @@ class Path:
def symlink_to(self, target, target_is_directory: bool = False): def symlink_to(self, target, target_is_directory: bool = False):
""" Make this path a symbolic link to target. """ """ Make this path a symbolic link to target. """
if not isinstance(target, self.__class__):
target = self.__class__(target)
if not target.exists():
raise FileNotFoundError(str(target))
if self.exists():
raise FileExistsError(str(self))
try:
self._target.symlink_to(str(target), str(self))
except CalledProcessError as exc:
raise OSError(exc.stdout) from exc
def touch(self, mode: int = 0o666, exist_ok: bool = True): def touch(self, mode: int = 0o666, exist_ok: bool = True):
""" Createa file at this path. If the file already exists, function """ Createa file at this path. If the file already exists, function
succeeds if exist_ok is true (and it's modification time is updated). succeeds if exist_ok is true (and it's modification time is updated).
@ -275,9 +317,35 @@ class Path:
def unlink(self, missing_ok: bool = False): def unlink(self, missing_ok: bool = False):
""" Remove the file or symbolic link. """ """ Remove the file or symbolic link. """
if not missing_ok and not self.exists():
raise FileNotFoundError(str(self))
try:
self._target.unlink(str(self))
except FileNotFoundError as exc:
# In this case, we couldn't distinguish between errors
# so, we distinguish here based on stat results
if self.is_dir():
raise OSError(f"Directory not empty: {str(self)}") from exc
raise
def link_to(self, target): def link_to(self, target):
""" Create a hard link pointing to a path named target """ """ Create a hard link pointing to a path named target """
if not isinstance(target, self.__class__):
target = self.__class__(target)
if not target.exists():
raise FileNotFoundError(str(target))
if self.exists():
raise FileExistsError(str(self))
try:
self._target.link_to(str(target), str(self))
except CalledProcessError as exc:
raise OSError(exc.stdout) from exc
def write_bytes(self, data: bytes): def write_bytes(self, data: bytes):
""" Open the file pointed to in bytes mode and write data to it. """ """ Open the file pointed to in bytes mode and write data to it. """

View File

@ -402,147 +402,6 @@ class LinuxWriter(BufferedIOBase):
self.detach() self.detach()
class LinuxPath(pathlib.PurePosixPath):
""" Implementation of a concrete path based on the
pathlib.PurePosixPath class. This implements most of the
methods provided by concrete paths within the pathlib.Path
class. """
def __init__(self, target: "Linux", *pathsegments):
super().__init__(*pathsegments)
self._target = target
self._stat = None
def stat(self) -> os.stat_result:
""" Run `stat` on the path and return a stat result """
if self._stat is not None:
return self._stat
return self._target.stat(str(self))
def chmod(self, mode: int):
""" Execute `chmod` on the remote file to change permissions """
def exists(self) -> bool:
""" Return true if the specified path exists on the remote system """
def expanduser(self) -> "LinuxPath":
""" Return a new path object with ~ and ~user expanded """
def glob(self, pattern: str) -> Generator["LinuxPath", None, None]:
""" Glob the given relative pattern in the directory represented
by this path, yielding all matching files (of any kind) """
def group(self) -> str:
""" Returns the name of the group owning the file. KeyError is raised
if the file's GID isn't found in the system database. """
def is_dir(self) -> bool:
""" Returns True if the path points to a directory (or a symbolic link
pointing to a directory). False if it points to another kind of file.
"""
def is_file(self) -> bool:
""" Returns True if the path points to a regular file """
def is_mount(self) -> bool:
""" Returns True if the path is a mount point. """
def is_symlink(self) -> bool:
""" Returns True if the path points to a symbolic link, False otherwise """
def is_socket(self) -> bool:
""" Returns True if the path points to a Unix socket """
def is_fifo(self) -> bool:
""" Returns True if the path points to a FIFO """
def is_block_device(self) -> bool:
""" Returns True if the path points to a block device """
def is_char_device(self) -> bool:
""" Returns True if the path points to a character device """
def iterdir(self) -> bool:
""" When the path points to a directory, yield path objects of the
directory contents. """
def lchmod(self, mode: int):
""" Modify a symbolic link's mode (same as chmod for non-symbolic links) """
def lstat(self) -> os.stat_result:
""" Same as stat except operate on the symbolic link file itself rather
than the file it points to. """
def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False):
""" Create a new directory at this given path. """
def open(
self,
mode: str = "r",
buffering: int = -1,
encoding: str = None,
errors: str = None,
newline: str = None,
):
""" Open the file pointed to by the path, like Platform.open """
def owner(self) -> str:
""" Return the name of the user owning the file. KeyError is raised if
the file's uid is not found in the System database """
def read_bytes(self) -> bytes:
""" Return the binary contents of the pointed-to file as a bytes object """
def read_text(self, encoding: str = None, errors: str = None) -> str:
""" Return the decoded contents of the pointed-to file as a string """
def readlink(self) -> "LinuxPath":
""" Return the path to which the symbolic link points """
def rename(self, target) -> "LinuxPath":
""" Rename the file or directory to the given target (str or Path). """
def replace(self, target) -> "LinuxPath":
""" Sawme as `rename` for Linux """
def resolve(self, strict: bool = False):
""" Resolve the current path into an absolute path """
def rglob(self, pattern: str) -> Generator["LinuxPath", None, None]:
""" This is like calling Path.glob() with "**/" added to in the front
of the given relative pattern """
def rmdir(self):
""" Remove this directory. The directory must be empty. """
def samefile(self, otherpath: "LinuxPath"):
""" Return whether this path points to the same file as other_path
which can be either a Path object or a string. """
def symlink_to(self, target, target_is_directory: bool = False):
""" Make this path a symbolic link to target. """
def touch(self, mode: int = 0o666, exist_ok: bool = True):
""" Createa file at this path. If the file already exists, function
succeeds if exist_ok is true (and it's modification time is updated).
Otherwise FileExistsError is raised. """
def unlink(self, missing_ok: bool = False):
""" Remove the file or symbolic link. """
def link_to(self, target):
""" Create a hard link pointing to a path named target """
def write_bytes(self, data: bytes):
""" Open the file pointed to in bytes mode and write data to it. """
def write_text(self, data: str, encoding: str = None, errors: str = None):
""" Open the file pointed to in text mode, and write data to it. """
class Linux(Platform): class Linux(Platform):
""" """
Concrete platform class abstracting interaction with a GNU/Linux remote Concrete platform class abstracting interaction with a GNU/Linux remote
@ -1340,3 +1199,62 @@ class Linux(Platform):
self.run(["chmod", "-h", oct(mode)[2:], path]) self.run(["chmod", "-h", oct(mode)[2:], path])
else: else:
self.run(["chmod", oct(mode)[2:], path]) self.run(["chmod", oct(mode)[2:], path])
def mkdir(self, path: str, mode: int = 0o777, parents: bool = False):
""" Create a new directory """
try:
if parents:
self.run(
["mkdir", "-p", "-m", oct(mode)[2:], path], text=True, check=True
)
else:
self.run(
["mkdir", "-p", "-m", oct(mode)[2:], path], text=True, check=True
)
except CalledProcessError as exc:
if "exists" in exc.stdout:
raise FileExistsError(exc.stdout) from exc
else:
raise FileNotFoundError(exc.stdout) from exc
def rename(self, source: str, target: str):
""" Rename a file from the source to the target. This should
replace the target if it exists. """
try:
self.run(["mv", source, target], check=True)
except CalledProcessError as exc:
raise FileNotFoundError(source) from exc
def rmdir(self, target: str):
""" Remove the specified directory. It must be empty. """
try:
self.run(["rmdir", target], check=True)
except CalledProcessError as exc:
raise OSError(f"Directory not empty: {target}") from exc
def symlink_to(self, source: str, target: str):
""" Create a symbolic link to source from target """
# Since this function is unlikely to be called outside of
# the path abstraction, we don't do much error checking.
# We can't reliably tell what happened when the process
# fails without checking stat output which is easier from
# the path abstraction itself.
self.run(["ln", "-s", source, target], check=True)
def link_to(self, source: str, target: str):
""" Create a filesystem hard link. """
# Same warning as with symlink
self.run(["ln", source, target], check=True)
def unlink(self, target: str):
""" Remove a link to a file (similar to `rm`) """
try:
self.run(["rm", target], check=True)
except CalledProcessError as exc:
raise FileNotFoundError(target) from exc