1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-12-03 13:54:15 +01:00

Merge branch 'platforms' of github.com:calebstewart/pwncat into platforms

This commit is contained in:
Caleb Stewart 2021-05-07 18:54:01 -04:00
commit 3f292b971f
4 changed files with 63 additions and 59 deletions

View File

@ -34,6 +34,8 @@ refer to the documentation for up to date usage and API documentation!
pwncat [documentation] is being built out on Read the Docs. Head there for
the latest usage and development documentation!
**pwncat requires Python 3.9+.**
## Modules
Recently, the architecture of the pwncat framework was redesigned to

View File

@ -26,7 +26,7 @@ from pwncat.target import Target
class InteractiveExit(Exception):
""" Indicates we should exit the interactive terminal """
"""Indicates we should exit the interactive terminal"""
class Session:
@ -97,7 +97,7 @@ class Session:
@property
def target(self) -> Target:
""" Retrieve the target object for this session """
"""Retrieve the target object for this session"""
try:
# Find target object
@ -133,7 +133,7 @@ class Session:
return user
def run(self, module: str, **kwargs):
""" Run a module on this session """
"""Run a module on this session"""
module_name = module
module = self.manager.modules.get(module_name)
@ -192,12 +192,12 @@ class Session:
@contextlib.contextmanager
def task(self, *args, **kwargs):
""" Get a new task in this session's progress instance """
"""Get a new task in this session's progress instance"""
# Ensure the variable exists even if an exception happens
# prior to task creation
task = None
started = self._progress._started
started = self._progress.live.is_started
if "status" not in kwargs:
kwargs["status"] = "..."
@ -223,7 +223,7 @@ class Session:
self._progress.stop()
def update_task(self, task, *args, **kwargs):
""" Update an active task """
"""Update an active task"""
self._progress.update(task, *args, **kwargs)
@ -238,7 +238,7 @@ class Session:
self.manager.target = None
def close(self):
""" Close the session and remove from manager tracking """
"""Close the session and remove from manager tracking"""
self.platform.channel.close()
@ -327,12 +327,12 @@ class Manager:
pass
def __enter__(self):
""" Begin manager context tracking """
"""Begin manager context tracking"""
return self
def __exit__(self, _, __, ___):
""" Ensure all sessions are closed """
"""Ensure all sessions are closed"""
while self.sessions:
self.sessions[0].close()
@ -364,7 +364,7 @@ class Manager:
self.parser = CommandParser(self)
def create_db_session(self):
""" Create a new SQLAlchemy database session and return it """
"""Create a new SQLAlchemy database session and return it"""
# Initialize a fallback database if needed
if self.db is None:
@ -396,7 +396,7 @@ class Manager:
setattr(self.modules[module_name], "name", module_name)
def log(self, *args, **kwargs):
""" Output a log entry """
"""Output a log entry"""
if self.target is not None:
self.target._progress.log(*args, **kwargs)
@ -412,7 +412,7 @@ class Manager:
@property
def target(self) -> Session:
""" Retrieve the currently focused target """
"""Retrieve the currently focused target"""
return self._target
@target.setter
@ -444,7 +444,7 @@ class Manager:
pwnlib.term.term_mode = False
def interactive(self):
""" Start interactive prompt """
"""Start interactive prompt"""
self.interactive_running = True
@ -541,7 +541,7 @@ class Manager:
return session
def _process_input(self, data: bytes, has_prefix: bool):
""" Process stdin data from the user in raw mode """
"""Process stdin data from the user in raw mode"""
for byte in data:
byte = bytes([byte])

View File

@ -25,7 +25,7 @@ function. """
class PlatformError(Exception):
""" Generic platform error. """
"""Generic platform error."""
class Path:
@ -43,16 +43,16 @@ class Path:
@classmethod
def cwd(cls):
""" Return a new concrete path referencing the current directory """
"""Return a new concrete path referencing the current directory"""
return cls(".").resolve()
@classmethod
def home(cls):
""" Return a new concrete path referencing the current user home directory """
"""Return a new concrete path referencing the current user home directory"""
return cls("~").resolve()
def writable(self) -> bool:
""" This is non-standard, but is useful """
"""This is non-standard, but is useful"""
user = self._target.current_user()
mode = self.stat().st_mode
@ -74,7 +74,7 @@ class Path:
return False
def stat(self) -> os.stat_result:
""" Run `stat` on the path and return a stat result """
"""Run `stat` on the path and return a stat result"""
if self._stat is not None:
return self._stat
@ -84,12 +84,12 @@ class Path:
return self._stat
def chmod(self, mode: int):
""" Execute `chmod` on the remote file to change permissions """
"""Execute `chmod` on the remote file to change permissions"""
self._target.chmod(str(self), mode)
def exists(self) -> bool:
""" Return true if the specified path exists on the remote system """
"""Return true if the specified path exists on the remote system"""
try:
self.stat()
@ -98,7 +98,7 @@ class Path:
return False
def expanduser(self) -> "Path":
""" Return a new path object with ~ and ~user expanded """
"""Return a new path object with ~ and ~user expanded"""
if not self.parts[0].startswith("~"):
return self.__class__(self)
@ -137,7 +137,7 @@ class Path:
return False
def is_file(self) -> bool:
""" Returns True if the path points to a regular file """
"""Returns True if the path points to a regular file"""
try:
return stat.S_ISREG(self.stat().st_mode)
@ -145,7 +145,7 @@ class Path:
return False
def is_mount(self) -> bool:
""" Returns True if the path is a mount point. """
"""Returns True if the path is a mount point."""
if str(self) == "/":
return True
@ -156,7 +156,7 @@ class Path:
return False
def is_symlink(self) -> bool:
""" Returns True if the path points to a symbolic link, False otherwise """
"""Returns True if the path points to a symbolic link, False otherwise"""
try:
return stat.S_ISLNK(self.stat().st_mode)
@ -164,7 +164,7 @@ class Path:
return False
def is_socket(self) -> bool:
""" Returns True if the path points to a Unix socket """
"""Returns True if the path points to a Unix socket"""
try:
return stat.S_ISSOCK(self.stat().st_mode)
@ -172,7 +172,7 @@ class Path:
return False
def is_fifo(self) -> bool:
""" Returns True if the path points to a FIFO """
"""Returns True if the path points to a FIFO"""
try:
return stat.S_ISFIFO(self.stat().st_mode)
@ -180,7 +180,7 @@ class Path:
return False
def is_block_device(self) -> bool:
""" Returns True if the path points to a block device """
"""Returns True if the path points to a block device"""
try:
return stat.S_ISBLK(self.stat().st_mode)
@ -188,7 +188,7 @@ class Path:
return False
def is_char_device(self) -> bool:
""" Returns True if the path points to a character device """
"""Returns True if the path points to a character device"""
try:
return stat.S_ISCHR(self.stat().st_mode)
@ -208,7 +208,7 @@ class Path:
yield self.__class__(*self.parts, name)
def lchmod(self, mode: int):
""" Modify a symbolic link's mode (same as chmod for non-symbolic links) """
"""Modify a symbolic link's mode (same as chmod for non-symbolic links)"""
self._target.chmod(str(self), mode, link=True)
@ -224,7 +224,7 @@ class Path:
return self._lstat
def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False):
""" Create a new directory at this given path. """
"""Create a new directory at this given path."""
if not exist_ok and self.exists():
raise FileExistsError(str(self))
@ -242,7 +242,7 @@ class Path:
errors: str = None,
newline: str = None,
):
""" Open the file pointed to by the path, like Platform.open """
"""Open the file pointed to by the path, like Platform.open"""
return self._target.open(
self,
@ -260,24 +260,24 @@ class Path:
return self._target.find_user(id=self.stat().st_uid).name
def read_bytes(self) -> bytes:
""" Return the binary contents of the pointed-to file as a bytes object """
"""Return the binary contents of the pointed-to file as a bytes object"""
with self.open("rb") as filp:
return filp.read()
def read_text(self, encoding: str = None, errors: str = None) -> str:
""" Return the decoded contents of the pointed-to file as a string """
"""Return the decoded contents of the pointed-to file as a string"""
with self.open("r", encoding=encoding, errors=errors) as filp:
return filp.read()
def readlink(self) -> "Path":
""" Return the path to which the symbolic link points """
"""Return the path to which the symbolic link points"""
return self._target.readlink(str(self))
def rename(self, target) -> "Path":
""" Rename the file or directory to the given target (str or Path). """
"""Rename the file or directory to the given target (str or Path)."""
self._target.rename(str(self), str(target))
@ -287,12 +287,12 @@ class Path:
return target
def replace(self, target) -> "Path":
""" Same as `rename` for Linux """
"""Same as `rename` for Linux"""
return self.rename(target)
def resolve(self, strict: bool = False):
""" Resolve the current path into an absolute path """
"""Resolve the current path into an absolute path"""
return self.__class__(self._target.abspath(str(self)))
@ -303,7 +303,7 @@ class Path:
return self.glob("**/" + pattern)
def rmdir(self):
""" Remove this directory. The directory must be empty. """
"""Remove this directory. The directory must be empty."""
if not self.is_dir():
raise NotADirectoryError(str(self))
@ -323,7 +323,7 @@ class Path:
return os.path.samestat(stat1, stat2)
def symlink_to(self, target, target_is_directory: bool = False):
""" Make this path a symbolic link to target. """
"""Make this path a symbolic link to target."""
if not isinstance(target, self.__class__):
target = self.__class__(target)
@ -355,7 +355,7 @@ class Path:
self.chmod(mode)
def unlink(self, missing_ok: bool = False):
""" Remove the file or symbolic link. """
"""Remove the file or symbolic link."""
if not missing_ok and not self.exists():
raise FileNotFoundError(str(self))
@ -370,7 +370,7 @@ class Path:
raise
def link_to(self, target):
""" Create a hard link pointing to a path named target """
"""Create a hard link pointing to a path named target"""
if not isinstance(target, self.__class__):
target = self.__class__(target)
@ -387,13 +387,13 @@ class Path:
raise OSError(exc.stdout) from exc
def write_bytes(self, data: bytes):
""" Open the file pointed to in bytes mode and write data to it. """
"""Open the file pointed to in bytes mode and write data to it."""
with self.open("wb") as filp:
filp.write(data)
def write_text(self, data: str, encoding: str = None, errors: str = None):
""" Open the file pointed to in text mode, and write data to it. """
"""Open the file pointed to in text mode, and write data to it."""
with self.open("w", encoding=encoding, errors=errors) as filp:
filp.write(data)
@ -502,12 +502,12 @@ class Platform(ABC):
return data
def __str__(self):
""" Retrieve a string describing the platform connection """
"""Retrieve a string describing the platform connection"""
return str(self.channel)
@abstractmethod
def getuid(self):
""" Get the current user ID """
"""Get the current user ID"""
@abstractmethod
def getenv(self, name: str) -> str:
@ -533,11 +533,11 @@ class Platform(ABC):
@abstractmethod
def abspath(self, path: str) -> str:
""" Attempt to resolve a path to an absolute path """
"""Attempt to resolve a path to an absolute path"""
@abstractmethod
def readlink(self, path: str):
""" Attempt to read the target of a link """
"""Attempt to read the target of a link"""
@abstractmethod
def whoami(self):
@ -830,31 +830,31 @@ class Platform(ABC):
@abstractmethod
def lstat(self, path: str) -> os.stat_result:
""" Perform the equivalent of the lstat syscall """
"""Perform the equivalent of the lstat syscall"""
@abstractmethod
def abspath(self, path: str) -> str:
""" Attempt to resolve a path to an absolute path """
"""Attempt to resolve a path to an absolute path"""
@abstractmethod
def readlink(self, path: str):
""" Attempt to read the target of a link """
"""Attempt to read the target of a link"""
@abstractmethod
def umask(self, mask: int = None):
""" Set or retrieve the current umask value """
"""Set or retrieve the current umask value"""
@abstractmethod
def touch(self, path: str):
""" Update a file modification time and possibly create it """
"""Update a file modification time and possibly create it"""
@abstractmethod
def chmod(self, path: str, mode: int, link: bool = False):
""" Update the file permissions """
"""Update the file permissions"""
@abstractmethod
def mkdir(self, path: str, mode: int = 0o777, parents: bool = False):
""" Create a new directory """
"""Create a new directory"""
@abstractmethod
def rename(self, source: str, target: str):
@ -863,19 +863,19 @@ class Platform(ABC):
@abstractmethod
def rmdir(self, target: str):
""" Remove the specified directory. It must be empty. """
"""Remove the specified directory. It must be empty."""
@abstractmethod
def symlink_to(self, source: str, target: str):
""" Create a symbolic link to source from target """
"""Create a symbolic link to source from target"""
@abstractmethod
def link_to(self, source: str, target: str):
""" Create a filesystem hard link. """
"""Create a filesystem hard link."""
@abstractmethod
def unlink(self, target: str):
""" Remove a link to a file (similar to `rm`) """
"""Remove a link to a file (similar to `rm`)"""
def register(platform: Type[Platform]):

View File

@ -36,3 +36,5 @@ typing-extensions==3.7.4.2
urllib3==1.25.9
wcwidth==0.1.9
python-rapidjson==0.9.1
ZODB==5.6.0
zodburi==2.4.0