1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-27 19:04:15 +01:00

Multiple things

This commit is contained in:
Caleb Stewart 2020-11-06 00:19:52 -05:00
parent 97d329365f
commit 5072b01340
9 changed files with 708 additions and 75 deletions

View File

@ -173,14 +173,11 @@ class ChannelFile(RawIOBase):
if self.eof:
return 0
try:
n = self.channel.send(data)
except (BlockingIOError):
n = 0
if n == 0:
return None
written = 0
while written < len(data):
written += self.channel.send(data)
return n
return written
class Channel:

View File

@ -53,7 +53,15 @@ class Connect(Channel):
""" Send data to the remote shell. This is a blocking call
that only returns after all data is sent. """
self.client.sendall(data)
try:
written = 0
while written < len(data):
try:
written += self.client.send(data[written:])
except BlockingIOError:
pass
except BrokenPipeError as exc:
raise ChannelClosed(self) from exc
return len(data)

View File

@ -242,6 +242,10 @@ class CommandParser:
self.manager.log(f"[red]warning[/red]: {exc.channel}: channel closed")
# Ensure any existing sessions are cleaned from the manager
exc.cleanup(self.manager)
except pwncat.manager.InteractiveExit:
# Within a script, `exit` means to exit the script, not the
# interpreter
break
except Exception as exc:
console.log(
f"[red]error[/red]: [cyan]{name}[/cyan]: [yellow]{command}[/yellow]: {str(exc)}"
@ -255,11 +259,9 @@ class CommandParser:
try:
line = self.prompt.prompt().strip()
except (EOFError, OSError, KeyboardInterrupt):
pass
else:
if line != "":
self.dispatch_line(line)
except (EOFError, OSError, KeyboardInterrupt, pwncat.manager.InteractiveExit):
return
def run(self):
@ -550,8 +552,7 @@ class RemotePathCompleter(Completer):
if path == "":
path = "."
for name in self.manager.target.listdir(path):
name = name.decode("utf-8").strip()
for name in self.manager.target.platform.listdir(path):
if name.startswith(partial_name):
yield Completion(
name,

View File

@ -6,18 +6,13 @@ from pwncat.commands.base import CommandDefinition, Complete, Parameter
class Command(CommandDefinition):
"""
Exit pwncat. You must provide the "--yes" parameter.
This prevents accidental closing of your remote session.
Exit the interactive prompt. If sessions are active, you will
be prompted to confirm. This shouldn't be run from a configuration
script.
"""
PROG = "exit"
ARGS = {
"--yes,-y": Parameter(
Complete.NONE,
action="store_true",
help="Confirm you would like to close pwncat",
)
}
ARGS = {}
LOCAL = True
def run(self, manager, args):

View File

@ -39,7 +39,7 @@ class Command(CommandDefinition):
"destination": Parameter(Complete.REMOTE_FILE, nargs="?",),
}
def run(self, args):
def run(self, manager: "pwncat.manager.Manager", args):
# Create a progress bar for the download
progress = Progress(
@ -56,17 +56,17 @@ class Command(CommandDefinition):
if not args.destination:
args.destination = f"./{os.path.basename(args.source)}"
else:
access = pwncat.victim.access(args.destination)
if Access.DIRECTORY in access:
args.destination = os.path.join(
args.destination, os.path.basename(args.source)
)
elif Access.PARENT_EXIST not in access:
console.log(
f"[cyan]{args.destination}[/cyan]: no such file or directory"
)
return
# else:
# access = pwncat.victim.access(args.destination)
# if Access.DIRECTORY in access:
# args.destination = os.path.join(
# args.destination, os.path.basename(args.source)
# )
# elif Access.PARENT_EXIST not in access:
# console.log(
# f"[cyan]{args.destination}[/cyan]: no such file or directory"
# )
# return
try:
length = os.path.getsize(args.source)
@ -76,8 +76,8 @@ class Command(CommandDefinition):
"upload", filename=args.destination, total=length, start=False
)
with open(args.source, "rb") as source:
with pwncat.victim.open(
args.destination, "wb", length=length
with manager.target.platform.open(
args.destination, "wb"
) as destination:
progress.start_task(task_id)
copyfileobj(

View File

@ -21,9 +21,15 @@ class Group(Base):
host = relationship("Host", back_populates="groups")
name = Column(String)
members = relationship(
"User", back_populates="groups", secondary=SecondaryGroupAssociation
"User",
back_populates="groups",
secondary=SecondaryGroupAssociation,
lazy="selectin",
)
def __repr__(self):
return f"""Group(gid={self.id}, name={repr(self.name)}), members={repr(",".join(m.name for m in self.members))})"""
class User(Base):
@ -32,7 +38,7 @@ class User(Base):
# The users UID
id = Column(Integer, primary_key=True)
host_id = Column(Integer, ForeignKey("host.id"), primary_key=True)
host = relationship("Host", back_populates="users")
host = relationship("Host", back_populates="users", lazy="selectin")
# The users GID
gid = Column(Integer, ForeignKey("groups.id"))
# The actual DB Group object representing that group
@ -51,7 +57,10 @@ class User(Base):
shell = Column(String)
# The user's secondary groups
groups = relationship(
"Group", back_populates="members", secondary=SecondaryGroupAssociation
"Group",
back_populates="members",
secondary=SecondaryGroupAssociation,
lazy="selectin",
)
def __repr__(self):

View File

@ -75,12 +75,15 @@ class Session:
# Initialize the host reference
self.hash = self.platform.get_host_hash()
with self.db as session:
self.host = session.query(pwncat.db.Host).filter_by(hash=self.hash).first()
if self.host is None:
host = session.query(pwncat.db.Host).filter_by(hash=self.hash).first()
if host is None:
self.register_new_host()
else:
self.host = host.id
self.log("loaded known host from db")
self.platform.get_pty()
@property
def config(self):
""" Get the configuration object for this manager. This
@ -93,10 +96,13 @@ class Session:
hash has already been stored in ``self.hash`` """
# Create a new host object and add it to the database
self.host = pwncat.db.Host(hash=self.hash, platform=self.platform.name)
host = pwncat.db.Host(hash=self.hash, platform=self.platform.name)
with self.db as session:
session.add(self.host)
session.add(host)
session.commit()
self.host = host.id
self.log("registered new host w/ db")
@ -147,17 +153,12 @@ class Session:
"""
new_session = self._db_session is None
try:
if new_session:
if self._db_session is None:
self._db_session = self.manager.create_db_session()
yield self._db_session
finally:
if new_session and self._db_session is not None:
session = self._db_session
self._db_session = None
session.close()
self._db_session.commit()
@contextlib.contextmanager
def task(self, *args, **kwargs):
@ -194,6 +195,16 @@ class Session:
self._progress.update(task, *args, **kwargs)
def died(self):
if self not in self.manager.sessions:
return
self.manager.sessions.remove(self)
if self.manager.target == self:
self.manager.target = None
class Manager:
"""
@ -324,7 +335,7 @@ class Manager:
@target.setter
def target(self, value: Session):
if value not in self.sessions:
if value is not None and value not in self.sessions:
raise ValueError("invalid target")
self._target = value
@ -395,15 +406,14 @@ class Manager:
)
else:
data = self.target.platform.channel.recv(4096)
if data is None or len(data) == 0:
done = True
break
sys.stdout.buffer.write(data)
except RawModeExit:
pwncat.util.restore_terminal(term_state)
except ChannelClosed:
pwncat.util.restore_terminal(term_state)
self.log(f"[yellow]warning[/yellow]: {self.target}: connection reset")
self.log(
f"[yellow]warning[/yellow]: {self.target.platform}: connection reset"
)
self.target.died()
except Exception:
pwncat.util.restore_terminal(term_state)

View File

@ -4,6 +4,9 @@ import enum
import pathlib
import logging
import logging.handlers
import fnmatch
import stat
import os
import pwncat
import pwncat.subprocess
@ -19,7 +22,7 @@ class PlatformError(Exception):
""" Generic platform error. """
class Path(pathlib.PurePath):
class Path:
"""
A Concrete-Path. An instance of this class is bound to a
specific victim, and supports all semantics of a standard
@ -27,6 +30,266 @@ class Path(pathlib.PurePath):
`Path.cwd`.
"""
_target: "Platform"
_stat: os.stat_result
_lstat: os.stat_result
parts = []
def stat(self) -> os.stat_result:
""" Run `stat` on the path and return a stat result """
if self._stat is not None:
return self._stat
self._stat = self._target.stat(str(self))
return self._stat
def chmod(self, mode: int):
""" Execute `chmod` on the remote file to change permissions """
self._target.chmod(str(self), mode)
def exists(self) -> bool:
""" Return true if the specified path exists on the remote system """
try:
self.stat()
return True
except FileNotFoundError:
return False
def expanduser(self) -> "Path":
""" Return a new path object with ~ and ~user expanded """
if not self.parts[0].startswith("~"):
return self.__class__(self)
if self.parts[0] == "~":
return self.__class__(
self._target.find_user(self._target.whoami()).homedir, *self.parts[1:]
)
else:
return self.__class__(
self._target.find_user(self.parts[0][1:]).homedir, *self.parts[1:]
)
def glob(self, pattern: str) -> Generator["Path", None, None]:
""" Glob the given relative pattern in the directory represented
by this path, yielding all matching files (of any kind) """
for name in self._target.listdir(str(self)):
if fnmatch.fnmatch(name, pattern):
yield self / name
def group(self) -> str:
""" Returns the name of the group owning the file. KeyError is raised
if the file's GID isn't found in the system database. """
return self._target.find_group(id=self.stat().st_gid).name
def is_dir(self) -> bool:
""" Returns True if the path points to a directory (or a symbolic link
pointing to a directory). False if it points to another kind of file.
"""
try:
return stat.S_ISDIR(self.stat().st_mode)
except (FileNotFoundError, PermissionError):
return False
def is_file(self) -> bool:
""" Returns True if the path points to a regular file """
try:
return stat.S_ISREG(self.stat().st_mode)
except (FileNotFoundError, PermissionError):
return False
def is_mount(self) -> bool:
""" Returns True if the path is a mount point. """
def is_symlink(self) -> bool:
""" Returns True if the path points to a symbolic link, False otherwise """
try:
return stat.S_ISLNK(self.stat().st_mode)
except (FileNotFoundError, PermissionError):
return False
def is_socket(self) -> bool:
""" Returns True if the path points to a Unix socket """
try:
return stat.S_ISSOCK(self.stat().st_mode)
except (FileNotFoundError, PermissionError):
return False
def is_fifo(self) -> bool:
""" Returns True if the path points to a FIFO """
try:
return stat.S_ISFIFO(self.stat().st_mode)
except (FileNotFoundError, PermissionError):
return False
def is_block_device(self) -> bool:
""" Returns True if the path points to a block device """
try:
return stat.S_ISBLK(self.stat().st_mode)
except (FileNotFoundError, PermissionError):
return False
def is_char_device(self) -> bool:
""" Returns True if the path points to a character device """
try:
return stat.S_ISCHR(self.stat().st_mode)
except (FileNotFoundError, PermissionError):
return False
def iterdir(self) -> bool:
""" When the path points to a directory, yield path objects of the
directory contents. """
if not self.is_dir():
raise NotADirectoryError
for name in self._target.listdir(str(self)):
if name == "." or name == "..":
continue
yield self.__class__(*self.parts, name)
def lchmod(self, mode: int):
""" Modify a symbolic link's mode (same as chmod for non-symbolic links) """
self._target.chmod(str(self), mode, link=True)
def lstat(self) -> os.stat_result:
""" Same as stat except operate on the symbolic link file itself rather
than the file it points to. """
if self._lstat is not None:
return self._lstat
self._lstat = self._target.lstat(str(self))
return self._lstat
def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False):
""" Create a new directory at this given path. """
def open(
self,
mode: str = "r",
buffering: int = -1,
encoding: str = None,
errors: str = None,
newline: str = None,
):
""" Open the file pointed to by the path, like Platform.open """
return self._target.open(
self,
mode=mode,
buffering=buffering,
encoding=encoding,
errors=errors,
newline=newline,
)
def owner(self) -> str:
""" Return the name of the user owning the file. KeyError is raised if
the file's uid is not found in the System database """
return self._target.find_user(id=self.stat().st_uid).name
def read_bytes(self) -> bytes:
""" Return the binary contents of the pointed-to file as a bytes object """
with self.open("rb") as filp:
return filp.read()
def read_text(self, encoding: str = None, errors: str = None) -> str:
""" Return the decoded contents of the pointed-to file as a string """
with self.open("r", encoding=encoding, errors=errors) as filp:
return filp.read()
def readlink(self) -> "Path":
""" Return the path to which the symbolic link points """
return self._target.readlink(str(self))
def rename(self, target) -> "Path":
""" Rename the file or directory to the given target (str or Path). """
def replace(self, target) -> "Path":
""" Sawme as `rename` for Linux """
def resolve(self, strict: bool = False):
""" Resolve the current path into an absolute path """
return self.__class__(self._target.abspath(str(self)))
def rglob(self, pattern: str) -> Generator["Path", None, None]:
""" This is like calling Path.glob() with "**/" added to in the front
of the given relative pattern """
return self.glob("**/" + pattern)
def rmdir(self):
""" Remove this directory. The directory must be empty. """
def samefile(self, otherpath: "Path"):
""" Return whether this path points to the same file as other_path
which can be either a Path object or a string. """
if not isinstance(otherpath, Path):
otherpath = self.__class__(otherpath)
stat1 = self.stat()
stat2 = otherpath.stat()
return os.path.samestat(stat1, stat2)
def symlink_to(self, target, target_is_directory: bool = False):
""" Make this path a symbolic link to target. """
def touch(self, mode: int = 0o666, exist_ok: bool = True):
""" Createa file at this path. If the file already exists, function
succeeds if exist_ok is true (and it's modification time is updated).
Otherwise FileExistsError is raised. """
existed = self.exists()
if not exist_ok and existed:
raise FileExistsError(str(self))
self._target.touch(str(self))
if not existed:
self.chmod(mode)
def unlink(self, missing_ok: bool = False):
""" Remove the file or symbolic link. """
def link_to(self, target):
""" Create a hard link pointing to a path named target """
def write_bytes(self, data: bytes):
""" Open the file pointed to in bytes mode and write data to it. """
with self.open("wb") as filp:
filp.write(data)
def write_text(self, data: str, encoding: str = None, errors: str = None):
""" Open the file pointed to in text mode, and write data to it. """
with self.open("w", encoding=encoding, errors=errors) as filp:
filp.write(data)
class Platform:
""" Abstracts interactions with a target of a specific platform.
@ -63,9 +326,154 @@ class Platform:
handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
self.logger.addHandler(handler)
base_path = self.PATH_TYPE
target = self
class RemotePath(base_path, Path):
_target = target
_stat = None
def __init__(self, *args):
base_path.__init__(*args)
self.PATH_TYPE = RemotePath
def __str__(self):
return str(self.channel)
def reload_users(self):
""" Reload the user and group cache. This is automatically called
if the cache hasn't been built yet, but may be called manually
if you know the users have changed. This method is also called
if a lookup for a specific user or group ID fails. """
raise NotImplementedError(f"{self.name} did not implement reload_users")
def iter_users(self) -> Generator["pwncat.db.User", None, None]:
""" Iterate over all users on the remote system """
with self.session.db as db:
users = db.query(pwncat.db.User).filter_by(host_id=self.session.host).all()
if users is None:
self.reload_users()
users = (
db.query(pwncat.db.User).filter_by(host_id=self.session.host).all()
)
if users is not None:
for user in users:
_ = user.groups
yield user
return
def find_user(
self,
name: Optional[str] = None,
id: Optional[int] = None,
_recurse: bool = True,
) -> "pwncat.db.User":
""" Locate a user by name or UID. If the user/group cache has not
been built, then reload_users is automatically called. If the
lookup fails, reload_users is called automatically to ensure that
there has not been a user/group update remotely. If the user
still cannot be found, a KeyError is raised. """
with self.session.db as db:
user = db.query(pwncat.db.User).filter_by(host_id=self.session.host)
if name is not None:
user = user.filter_by(name=name)
if id is not None:
user = user.filter_by(id=id)
user = user.first()
if user is None and _recurse:
self.reload_users()
return self.find_user(name=name, id=id, _recurse=False)
elif user is None:
raise KeyError
return user
def iter_groups(self) -> Generator["pwncat.db.Group", None, None]:
""" Iterate over all groups on the remote system """
with self.session.db as db:
groups = (
db.query(pwncat.db.Group).filter_by(host_id=self.session.host).all()
)
if groups is None:
self.reload_users()
groups = (
db.query(pwncat.db.Group).filter_by(host_id=self.session.host).all()
)
if groups is not None:
for group in groups:
_ = group.members
yield group
return
def find_group(
self,
name: Optional[str] = None,
id: Optional[int] = None,
_recurse: bool = True,
) -> "pwncat.db.Group":
""" Locate a group by name or GID. If the user/group cache has not
been built, then reload_users is automatically called. If the
lookup fails, reload_users is called automatically to ensure that
there has not been a user/group update remotely. If the group
still cannot be found, a KeyError is raised. """
with self.session.db as db:
group = db.query(pwncat.db.Group).filter_by(host_id=self.session.host)
if name is not None:
group = group.filter_by(name=name)
if id is not None:
group = group.filter_by(id=id)
group = group.first()
if group is None and _recurse:
self.reload_users()
return self.find_group(name=name, id=id, _recurse=False)
elif group is None:
raise KeyError
return group
def stat(self, path: str) -> os.stat_result:
""" Run stat on a path on the remote system and return a stat result
This is mainly used by the concrete Path type to fill in a majority
of it's methods. If the specified path does not exist or cannot be
accessed, a FileNotFoundError or PermissionError is raised respectively
"""
def lstat(self, path: str) -> os.stat_result:
""" Run stat on the symbolic link and return a stat result object.
This has the same semantics as the `stat` method. """
def abspath(self, path: str) -> str:
""" Attempt to resolve a path to an absolute path """
def readlink(self, path: str):
""" Attempt to read the target of a link """
def current_user(self):
""" Retrieve a user object for the current user """
def whoami(self):
""" Retrieve's only name of the current user (may be faster depending
on platform) """
def listdir(self, path=None) -> Generator[str, None, None]:
""" List the contents of a directory. If ``path`` is None,
then the contents of the current directory is listed. The
@ -220,7 +628,7 @@ class Platform:
return completed_proc
def path(self, path: Optional[str] = None) -> Path:
def Path(self, path: Optional[str] = None) -> Path:
"""
Takes the given string and returns a concrete path for this host.
This path object conforms to the "concrete path" definition of the
@ -235,6 +643,8 @@ class Platform:
:rtype: Path
"""
return self.PATH_TYPE(path)
def chdir(self, path: Union[str, Path]):
"""
Change directories to the given path. This method returns the current

View File

@ -412,10 +412,16 @@ class LinuxPath(pathlib.PurePosixPath):
super().__init__(*pathsegments)
self._target = target
self._stat = None
def stat(self) -> os.stat_result:
""" Run `stat` on the path and return a stat result """
if self._stat is not None:
return self._stat
return self._target.stat(str(self))
def chmod(self, mode: int):
""" Execute `chmod` on the remote file to change permissions """
@ -435,7 +441,8 @@ class LinuxPath(pathlib.PurePosixPath):
def is_dir(self) -> bool:
""" Returns True if the path points to a directory (or a symbolic link
pointing to a directory). False if it points to another kind of file. """
pointing to a directory). False if it points to another kind of file.
"""
def is_file(self) -> bool:
""" Returns True if the path points to a regular file """
@ -543,6 +550,8 @@ class Linux(Platform):
information on the implemented methods and interface definition.
"""
PATH_TYPE = pathlib.PurePosixPath
def __init__(self, session, channel: pwncat.channel.Channel, log: str = None):
super().__init__(session, channel, log)
@ -704,6 +713,19 @@ class Linux(Platform):
does not exist, or you do not have execute permissions.
"""
try:
p = self.run(
["ls", "--all", "-1", path],
encoding="utf-8",
capture_output=True,
check=True,
)
except CalledProcessError:
return
for name in p.stdout.split("\n"):
yield name
def which(self, name: str, quote: bool = False) -> str:
"""
Locate the specified binary on the remote host. Normally, this is done through
@ -887,21 +909,6 @@ class Linux(Platform):
code_delim.encode("utf-8") + b"\n",
)
def Path(self, path: Optional[str] = None) -> Path:
"""
Takes the given string and returns a concrete path for this host.
This path object conforms to the "concrete path" definition of the
standard python ``pathlib`` library. Generally speaking, it is a
subclass of ``pathlib.PurePath`` which implements the concrete
features by being bound to this specific victim. If no path is
specified, a path representing the current directory is returned.
:param path: a relative or absolute path path
:type path: str
:return: a concrete path object
:rtype: pwncat.platform.Path
"""
def chdir(self, path: Union[str, Path]):
"""
Change directories to the given path. This method returns the current
@ -1137,3 +1144,199 @@ class Linux(Platform):
self.channel.send(command.encode("utf-8"))
self._interactive = value
def whoami(self):
""" Get the name of the current user """
return self.run(
["whoami"], capture_output=True, check=True, encoding="utf-8"
).stdout.rstrip("\n")
def reload_users(self):
""" Reload users from the remote host and cache them in the database """
with self.session.db as db:
# Delete existing users from the database
db.query(pwncat.db.User).filter_by(host_id=self.session.host).delete()
db.query(pwncat.db.Group).filter_by(host_id=self.session.host).delete()
db.commit()
with self.open("/etc/passwd") as filp:
etc_passwd = filp.readlines()
with self.open("/etc/group") as filp:
etc_group = filp.readlines()
with self.session.db as db:
users = {}
for user_line in etc_passwd:
name, password, uid, gid, description, home, shell = user_line.rstrip(
"\n"
).split(":")
user = pwncat.db.User(
host_id=self.session.host,
id=uid,
gid=gid,
name=name,
homedir=home,
shell=shell,
fullname=description,
)
# Some users have a hash here, but they shouldn't
if password != "x":
user.hash = password
# Track for group creation
users[name] = user
# Add to the database
db.add(user)
for group_line in etc_group:
name, _, id, *members = group_line.rstrip("\n").split(":")
members = ":".join(members).split(",")
# Build the group
group = pwncat.db.Group(host_id=self.session.host, id=id, name=name)
for name in members:
if name in users:
group.members.append(users[name])
db.add(group)
db.commit()
def _parse_stat(self, result: str) -> os.stat_result:
""" Parse the output of a stat command """
# Reverse the string. The filename may have a space in it, so we do this
# to properly parse it.
result = result.rstrip("\n")[::-1]
fields = [field[::-1] for field in result.split(" ")]
# Field order:
# 0 optimal I/O transfer size
# 1 time of file birth
# 2 time of last status change
# 3 time of last data modification
# 4 time of last access
# 5 minor device type (hex)
# 6 major device type (hex)
# 7 number of hard links
# 8 inode number
# 9 device number (hex)
# 10 group id of owner
# 11 user id of owner
# 12 raw mode (hex)
# 13 number of blocks allocated
# 14 total size in bytes
stat = os.stat_result(
tuple(
[
int(fields[12], 16),
int(fields[8]),
int(fields[9], 16),
int(fields[7]),
int(fields[11]),
int(fields[10]),
int(fields[14]),
int(fields[4]),
int(fields[3]),
int(fields[2]),
int(fields[13]),
int(fields[1]),
]
)
)
return stat
def stat(self, path: str) -> os.stat_result:
""" Perform the equivalent of the stat syscall on
the remote host """
try:
result = self.run(
[
"stat",
"-L",
"-c",
"%n %s %b %f %u %g %D %i %h %t %T %X %Y %Z %W %o",
path,
],
capture_output=True,
encoding="utf-8",
check=True,
)
except CalledProcessError as exc:
raise FileNotFoundError(path) from exc
return self._parse_stat(result.stdout)
def lstat(self, path: str) -> os.stat_result:
""" Perform the equivalent of the lstat syscall """
try:
result = self.run(
[
"stat",
"-c",
"%n %s %b %f %u %g %D %i %h %t %T %X %Y %Z %W %o",
path,
],
capture_output=True,
encoding="utf-8",
check=True,
)
except CalledProcessError as exc:
raise FileNotFoundError(path) from exc
return self._parse_stat(result.stdout)
def abspath(self, path: str) -> str:
""" Attempt to resolve a path to an absolute path """
try:
result = self.run(
["realpath", path], capture_output=True, text=True, check=True
)
return result.stdout.rstrip("\n")
except CalledProcessError as exc:
raise FileNotFoundError(path) from exc
def readlink(self, path: str):
""" Attempt to read the target of a link """
try:
self.lstat(path)
result = self.run(
["readlink", path], capture_output=True, text=True, check=True
)
return result.stdout.rstrip("\n")
except CalledProcessError as exc:
raise OSError(f"Invalid argument: '{path}'") from exc
def umask(self, mask: int = None):
""" Set or retrieve the current umask value """
if mask is None:
return int(self.run(["umask"], capture_output=True, text=True).stdout, 8)
self.run(["umask", oct(mask)[2:]])
return mask
def touch(self, path: str):
""" Update a file modification time and possibly create it """
self.run(["touch", path])
def chmod(self, path: str, mode: int, link: bool = False):
""" Update the file permissions """
if link:
self.run(["chmod", "-h", oct(mode)[2:], path])
else:
self.run(["chmod", oct(mode)[2:], path])