1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-30 12:24:14 +01:00

Added permission checks when opening files

Also fixed a tangential problem which arose regarding the group
enumerations which caused a recursive call the enumerate groups
from within the group enumeration.
This commit is contained in:
Caleb Stewart 2021-06-17 17:47:07 -04:00
parent 3c33d015e8
commit a1499f1a38
3 changed files with 90 additions and 9 deletions

View File

@ -15,6 +15,7 @@ and simply didn't have the time to go back and retroactively create one.
- Changed session tracking so session IDs aren't reused - Changed session tracking so session IDs aren't reused
- Changed zsh prompt to match CWD of other shell prompts - Changed zsh prompt to match CWD of other shell prompts
- Improved exception handling in `Manager.interactive` ([#133](https://github.com/calebstewart/pwncat/issues/133)) - Improved exception handling in `Manager.interactive` ([#133](https://github.com/calebstewart/pwncat/issues/133))
- Added explicit permission checks when opening files
## [0.4.2] - 2021-06-15 ## [0.4.2] - 2021-06-15
Quick patch release due to corrected bug in `ChannelFile` which caused command Quick patch release due to corrected bug in `ChannelFile` which caused command

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pwncat import pwncat
from pwncat.modules import ModuleFailed from pwncat.modules import Status, ModuleFailed
from pwncat.facts.linux import LinuxGroup from pwncat.facts.linux import LinuxGroup
from pwncat.platform.linux import Linux from pwncat.platform.linux import Linux
from pwncat.modules.enumerate import Schedule, EnumerateModule from pwncat.modules.enumerate import Schedule, EnumerateModule
@ -20,6 +20,7 @@ class Module(EnumerateModule):
users = {user.gid: user for user in session.run("enumerate", types=["user"])} users = {user.gid: user for user in session.run("enumerate", types=["user"])}
group_file = session.platform.Path("/etc/group") group_file = session.platform.Path("/etc/group")
groups = []
try: try:
with group_file.open("r") as filp: with group_file.open("r") as filp:
@ -34,13 +35,17 @@ class Module(EnumerateModule):
members.append(users[gid].name) members.append(users[gid].name)
# Build a group object # Build a group object
group = LinuxGroup(self.name, group_name, hash, gid, members) groups.append(
LinuxGroup(self.name, group_name, hash, gid, members)
)
yield group yield Status(group_name)
except (KeyError, ValueError, IndexError): except (KeyError, ValueError, IndexError):
# Bad group line # Bad group line
continue continue
yield from groups
except (FileNotFoundError, PermissionError) as exc: except (FileNotFoundError, PermissionError) as exc:
raise ModuleFailed(str(exc)) from exc raise ModuleFailed(str(exc)) from exc

View File

@ -11,6 +11,7 @@ Popen can be running at a time. It is imperative that you call
to calling any other pwncat methods. to calling any other pwncat methods.
""" """
import os import os
import stat
import time import time
import shlex import shlex
import shutil import shutil
@ -414,6 +415,9 @@ class LinuxWriter(BufferedIOBase):
if self.popen is None: if self.popen is None:
raise UnsupportedOperation("writer is detached") raise UnsupportedOperation("writer is detached")
if self.popen.poll() is not None:
raise PermissionError("file write failed")
if self.popen.platform.has_pty: if self.popen.platform.has_pty:
# Control sequences need escaping # Control sequences need escaping
translated = [] translated = []
@ -483,6 +487,49 @@ class LinuxWriter(BufferedIOBase):
self.detach() self.detach()
class LinuxPath(pathlib.PurePosixPath):
"""Special cases for Linux remote paths"""
def readable(self):
"""Test if a file is readable"""
uid = self._target._id["euid"]
gid = self._target._id["egid"]
groups = self._target._id["groups"]
file_uid = self.stat().st_uid
file_gid = self.stat().st_gid
file_mode = self.stat().st_mode
if uid == file_uid and (file_mode & stat.S_IRUSR):
return True
elif (gid == file_gid or file_gid in groups) and (file_mode & stat.S_IRGRP):
return True
elif file_mode & stat.S_IROTH:
return True
return False
def writable(self):
uid = self._target._id["euid"]
gid = self._target._id["egid"]
groups = self._target._id["groups"]
file_uid = self.stat().st_uid
file_gid = self.stat().st_gid
file_mode = self.stat().st_mode
if uid == file_uid and (file_mode & stat.S_IWUSR):
return True
elif (gid == file_gid or file_gid in groups) and (file_mode & stat.S_IWGRP):
return True
elif file_mode & stat.S_IWOTH:
return True
return False
class Linux(Platform): class Linux(Platform):
""" """
Concrete platform class abstracting interaction with a GNU/Linux remote Concrete platform class abstracting interaction with a GNU/Linux remote
@ -491,7 +538,7 @@ class Linux(Platform):
""" """
name = "linux" name = "linux"
PATH_TYPE = pathlib.PurePosixPath PATH_TYPE = LinuxPath
PROMPTS = { PROMPTS = {
"sh": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""", "sh": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""",
"dash": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""", "dash": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""",
@ -507,7 +554,7 @@ class Linux(Platform):
self.name = "linux" self.name = "linux"
self.command_running = None self.command_running = None
self._uid = None self._id = None
# This causes an stty to be sent. # This causes an stty to be sent.
# If we aren't in a pty, it doesn't matter. # If we aren't in a pty, it doesn't matter.
@ -766,10 +813,20 @@ class Linux(Platform):
while True: while True:
try: try:
proc = self.run( proc = self.run(
["id", "-ru"], capture_output=True, text=True, check=True "(id -ru;id -u;id -g;id -rg;id -G;)",
capture_output=True,
text=True,
check=True,
) )
self._uid = int(proc.stdout.rstrip("\n")) idents = proc.stdout.split("\n")
return self._uid self._id = {
"ruid": int(idents[0].strip()),
"euid": int(idents[1].strip()),
"rgid": int(idents[2].strip()),
"egid": int(idents[3].strip()),
"groups": [int(g.strip()) for g in idents[4].split(" ")],
}
return self._id["ruid"]
except ValueError: except ValueError:
continue continue
except CalledProcessError as exc: except CalledProcessError as exc:
@ -777,7 +834,7 @@ class Linux(Platform):
def getuid(self): def getuid(self):
"""Retrieve the current cached uid""" """Retrieve the current cached uid"""
return self._uid return self._id["ruid"]
def getenv(self, name: str): def getenv(self, name: str):
@ -1153,6 +1210,24 @@ class Linux(Platform):
if any(c not in "rwb" for c in mode): if any(c not in "rwb" for c in mode):
raise PlatformError(f"{mode}: unknown file mode") raise PlatformError(f"{mode}: unknown file mode")
if isinstance(path, str):
path = self.Path(path)
if "r" in mode and not path.exists():
raise FileNotFoundError(f"No such file or directory: {str(path)}")
if "r" in mode and not path.readable():
raise PermissionError(f"Permission Denied: {str(path)}")
if "w" in mode:
parent = path.parent
if "w" in mode and path.exists() and not path.writable():
raise PermissionError(f"Permission Denied: {str(path)}")
if "w" in mode and not path.exists() and not parent.writable():
raise PermissionError(f"Permission Denied: {str(path)}")
if "w" in mode and not path.exists() and not parent.exists():
raise FileNotFoundError(f"No such file or directory: {str(path)}")
# Save this just in case we are opening a text-mode stream # Save this just in case we are opening a text-mode stream
line_buffering = buffering == -1 or buffering == 1 line_buffering = buffering == -1 or buffering == 1