1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-27 19:04:15 +01:00

Merge branch 'master' into release-v0.5.0

This commit is contained in:
Caleb Stewart 2021-06-18 20:02:34 -04:00
commit 25fac6ae09
15 changed files with 329 additions and 147 deletions

View File

@ -11,6 +11,7 @@ and simply didn't have the time to go back and retroactively create one.
### Fixed ### Fixed
- Pinned container base image to alpine 3.13.5 and installed to virtualenv ([#134](https://github.com/calebstewart/pwncat/issues/134)) - Pinned container base image to alpine 3.13.5 and installed to virtualenv ([#134](https://github.com/calebstewart/pwncat/issues/134))
- Fixed syntax for f-strings in escalation command - Fixed syntax for f-strings in escalation command
- Re-added `readline` import for windows platform after being accidentally removed
### Changed ### Changed
- Changed session tracking so session IDs aren't reused - Changed session tracking so session IDs aren't reused
- Changed zsh prompt to match CWD of other shell prompts - Changed zsh prompt to match CWD of other shell prompts
@ -19,6 +20,10 @@ and simply didn't have the time to go back and retroactively create one.
- Added `ncat`-style ssl arguments to entrypoint and `connect` command - Added `ncat`-style ssl arguments to entrypoint and `connect` command
- Added query-string arguments to connection strings for both the entrypoint - Added query-string arguments to connection strings for both the entrypoint
and the `connect` command. and the `connect` command.
- Improved exception handling throughout framework ([#133](https://github.com/calebstewart/pwncat/issues/133))
- Added explicit permission checks when opening files
- Changed LinuxWriter close routine again to account for needed EOF signals ([#140](https://github.com/calebstewart/pwncat/issues/140))
- Added better file io test cases
## [0.4.2] - 2021-06-15 ## [0.4.2] - 2021-06-15
Quick patch release due to corrected bug in `ChannelFile` which caused command Quick patch release due to corrected bug in `ChannelFile` which caused command

View File

@ -43,7 +43,20 @@ class Connect(Socket):
) as progress: ) as progress:
progress.add_task("connecting", total=1, start=False) progress.add_task("connecting", total=1, start=False)
# Connect to the remote host # Connect to the remote host
# If we get an invalid host from the user, that cannot be resolved
# then we capture the GAI (getaddrinfo) exception and raise it as ChannelError
# so that it is handled properly by the parent methods
# We also try to catch ConnectionRefusedError after it
# this is caused when a wrong port number is used
try:
client = socket.create_connection((host, port)) client = socket.create_connection((host, port))
except socket.gaierror:
raise ChannelError(self, "invalid host provided")
except ConnectionRefusedError:
raise ChannelError(self, "connection refused, check your port")
progress.log( progress.log(
f"connection to " f"connection to "

View File

@ -22,7 +22,7 @@ import socket
import functools import functools
from typing import Optional from typing import Optional
from pwncat.channel import Channel, ChannelError, ChannelClosed from pwncat.channel import Channel, ChannelClosed
def connect_required(method): def connect_required(method):
@ -33,7 +33,7 @@ def connect_required(method):
@functools.wraps(method) @functools.wraps(method)
def _wrapper(self, *args, **kwargs): def _wrapper(self, *args, **kwargs):
if not self.connected: if not self.connected:
raise ChannelError(self, "channel not connected") raise ChannelClosed(self)
return method(self, *args, **kwargs) return method(self, *args, **kwargs)
return _wrapper return _wrapper
@ -129,7 +129,12 @@ class Socket(Channel):
return data return data
try: try:
data = data + self.client.recv(count) new_data = self.client.recv(count)
if new_data == b"":
self._connected = False
raise ChannelClosed(self)
return data + new_data
except BlockingIOError:
return data return data
except ssl.SSLWantReadError: except ssl.SSLWantReadError:
return data return data
@ -143,8 +148,10 @@ class Socket(Channel):
self._connected = False self._connected = False
raise ChannelClosed(self) from exc raise ChannelClosed(self) from exc
@connect_required
def close(self): def close(self):
if not self._connected:
return
self._connected = False self._connected = False
self.client.close() self.client.close()

View File

@ -49,7 +49,7 @@ class Ssh(Channel):
# Connect to the remote host's ssh server # Connect to the remote host's ssh server
sock = socket.create_connection((host, port)) sock = socket.create_connection((host, port))
except Exception as exc: except Exception as exc:
raise ChannelError(str(exc)) raise ChannelError(self, str(exc))
# Create a paramiko SSH transport layer around the socket # Create a paramiko SSH transport layer around the socket
t = paramiko.Transport(sock) t = paramiko.Transport(sock)

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pwncat import pwncat
from pwncat.manager import RawModeExit
from pwncat.commands import CommandDefinition from pwncat.commands import CommandDefinition
@ -11,4 +10,6 @@ class Command(CommandDefinition):
ARGS = {} ARGS = {}
def run(self, manager: "pwncat.manager.Manager", args): def run(self, manager: "pwncat.manager.Manager", args):
raise RawModeExit # This is caught by ``CommandParser.run`` which interprets
# it as a `C-d` sequence, and returns to the remote prompt.
raise EOFError

View File

@ -7,8 +7,10 @@ from rich.progress import Progress
import pwncat import pwncat
from pwncat.util import console from pwncat.util import console
from pwncat.channel import ChannelError
from pwncat.modules import ModuleFailed from pwncat.modules import ModuleFailed
from pwncat.commands import Complete, Parameter, CommandDefinition from pwncat.commands import Complete, Parameter, CommandDefinition
from pwncat.platform import PlatformError
class Command(CommandDefinition): class Command(CommandDefinition):
@ -279,11 +281,14 @@ class Command(CommandDefinition):
manager.target = session manager.target = session
used_implant = implant used_implant = implant
break break
except ModuleFailed: except (ChannelError, PlatformError, ModuleFailed):
db.transaction_manager.commit() db.transaction_manager.commit()
continue continue
if used_implant is not None: if used_implant is not None:
manager.target.log(f"connected via {used_implant.title(manager.target)}") manager.target.log(f"connected via {used_implant.title(manager.target)}")
else: else:
try:
manager.create_session(**query_args) manager.create_session(**query_args)
except (ChannelError, PlatformError) as exc:
manager.log(f"connection failed: {exc}")

View File

@ -45,17 +45,6 @@ class Command(CommandDefinition):
if not args.destination: if not args.destination:
args.destination = f"./{os.path.basename(args.source)}" args.destination = f"./{os.path.basename(args.source)}"
# else:
# access = pwncat.victim.access(args.destination)
# if Access.DIRECTORY in access:
# args.destination = os.path.join(
# args.destination, os.path.basename(args.source)
# )
# elif Access.PARENT_EXIST not in access:
# console.log(
# f"[cyan]{args.destination}[/cyan]: no such file or directory"
# )
# return
try: try:
length = os.path.getsize(args.source) length = os.path.getsize(args.source)

View File

@ -16,6 +16,8 @@ even if there was an uncaught exception. The normal method of creating a manager
""" """
import os import os
import sys import sys
import queue
import signal
import fnmatch import fnmatch
import pkgutil import pkgutil
import threading import threading
@ -36,9 +38,9 @@ import pwncat.modules.enumerate
from pwncat.util import RawModeExit, console from pwncat.util import RawModeExit, console
from pwncat.config import Config from pwncat.config import Config
from pwncat.target import Target from pwncat.target import Target
from pwncat.channel import Channel, ChannelClosed from pwncat.channel import Channel, ChannelError, ChannelClosed
from pwncat.commands import CommandParser from pwncat.commands import CommandParser
from pwncat.platform import Platform from pwncat.platform import Platform, PlatformError
class InteractiveExit(Exception): class InteractiveExit(Exception):
@ -319,9 +321,13 @@ class Session:
while self.layers: while self.layers:
self.layers.pop()(self) self.layers.pop()(self)
try:
self.platform.exit() self.platform.exit()
self.platform.channel.close() self.platform.channel.close()
except (PlatformError, ChannelError) as exc:
self.log(
f"[yellow]warning[/yellow]: unexpected exception while closing: {exc}"
)
self.died() self.died()
@ -551,6 +557,8 @@ class Manager:
while self.interactive_running: while self.interactive_running:
try:
# This is it's own main loop that will continue until # This is it's own main loop that will continue until
# it catches a C-d sequence. # it catches a C-d sequence.
try: try:
@ -570,62 +578,86 @@ class Manager:
self.log("no active session, returning to local prompt") self.log("no active session, returning to local prompt")
continue continue
self.target.platform.interactive = True
interactive_complete = threading.Event() interactive_complete = threading.Event()
output_thread = None
def output_thread_main(): def output_thread_main(
target: Session, exception_queue: queue.SimpleQueue
):
while not interactive_complete.is_set(): while not interactive_complete.is_set():
try:
data = self.target.platform.channel.recv(4096) data = target.platform.channel.recv(4096)
if data != b"" and data is not None: if data != b"" and data is not None:
try: data = target.platform.process_output(data)
data = self.target.platform.process_output(data)
sys.stdout.buffer.write(data) sys.stdout.buffer.write(data)
sys.stdout.buffer.flush() sys.stdout.buffer.flush()
except RawModeExit:
interactive_complete.set()
else: else:
interactive_complete.wait(timeout=0.1) interactive_complete.wait(timeout=0.1)
output_thread = threading.Thread(target=output_thread_main) except ChannelError as exc:
output_thread.start() exception_queue.put(exc)
interactive_complete.set()
# This is a hack to get the interactive loop out of a blocking
# read call. The interactive loop will receive a KeyboardInterrupt
os.kill(os.getpid(), signal.SIGINT)
except RawModeExit:
interactive_complete.set()
os.kill(os.getpid(), signal.SIGINT)
channel_closed = False try:
self.target.platform.interactive = True
exception_queue = queue.Queue(maxsize=1)
output_thread = threading.Thread(
target=output_thread_main, args=[self.target, exception_queue]
)
output_thread.start()
try: try:
self.target.platform.interactive_loop(interactive_complete) self.target.platform.interactive_loop(interactive_complete)
except RawModeExit: except RawModeExit:
pass pass
try:
raise exception_queue.get(block=False)
except queue.Empty:
pass
self.target.platform.interactive = False
except ChannelClosed: except ChannelClosed:
channel_closed = True
self.log( self.log(
f"[yellow]warning[/yellow]: {self.target.platform}: connection reset" f"[yellow]warning[/yellow]: {self.target.platform}: connection reset"
) )
except Exception: self.target.died()
finally:
interactive_complete.set()
if output_thread is not None:
output_thread.join()
output_thread.join()
except: # noqa: E722
# We don't want to die because of an uncaught exception, but
# at least let the user know something happened. This should
# probably be configurable somewhere.
pwncat.util.console.print_exception() pwncat.util.console.print_exception()
# Trigger thread to exit
interactive_complete.set()
output_thread.join()
# Exit interactive mode
if channel_closed:
self.target.died()
else:
self.target.platform.interactive = False
def create_session(self, platform: str, channel: Channel = None, **kwargs): def create_session(self, platform: str, channel: Channel = None, **kwargs):
""" r"""
Open a new session from a new or existing platform. If the platform Create a new session from a new or existing channel. The platform specified
is a string, a new platform is created using ``create_platform`` and should be the name registered name (e.g. ``linux``) of a platform class. If
a session is built around the platform. In that case, the arguments no existing channel is provided, the keyword arguments are used to construct
are the same as for ``create_platform``. a new channel.
A new Session object is returned which contains the created or :param platform: name of the platform to use
specified platform. :type platform: str
:param channel: A pre-constructed channel (default: None)
:type channel: Optional[Channel]
:param \*\*kwargs: keyword arguments for constructing a new channel
:rtype: Session
:raises:
ChannelError: there was an error while constructing the new channel
PlatformError: construction of a platform around the channel failed
""" """
session = Session(self, platform, channel, **kwargs) session = Session(self, platform, channel, **kwargs)
@ -638,6 +670,21 @@ class Manager:
return session return session
def find_session_by_channel(self, channel: Channel):
"""
Locate a session by it's channel object. This is mainly used when a ChannelError
is raised in order to locate the misbehaving session object from the exception
data.
:param channel: the channel you are looking for
:type channel: Channel
:rtype: Session
"""
for session in self.sessions.values():
if session.platform.channel is channel:
return session
def _process_input(self, data: bytes, has_prefix: bool): def _process_input(self, data: bytes, has_prefix: bool):
"""Process stdin data from the user in raw mode""" """Process stdin data from the user in raw mode"""

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pwncat import pwncat
from pwncat.modules import ModuleFailed from pwncat.modules import Status, ModuleFailed
from pwncat.facts.linux import LinuxGroup from pwncat.facts.linux import LinuxGroup
from pwncat.platform.linux import Linux from pwncat.platform.linux import Linux
from pwncat.modules.enumerate import Schedule, EnumerateModule from pwncat.modules.enumerate import Schedule, EnumerateModule
@ -20,6 +20,7 @@ class Module(EnumerateModule):
users = {user.gid: user for user in session.run("enumerate", types=["user"])} users = {user.gid: user for user in session.run("enumerate", types=["user"])}
group_file = session.platform.Path("/etc/group") group_file = session.platform.Path("/etc/group")
groups = []
try: try:
with group_file.open("r") as filp: with group_file.open("r") as filp:
@ -34,13 +35,17 @@ class Module(EnumerateModule):
members.append(users[gid].name) members.append(users[gid].name)
# Build a group object # Build a group object
group = LinuxGroup(self.name, group_name, hash, gid, members) groups.append(
LinuxGroup(self.name, group_name, hash, gid, members)
)
yield group yield Status(group_name)
except (KeyError, ValueError, IndexError): except (KeyError, ValueError, IndexError):
# Bad group line # Bad group line
continue continue
yield from groups
except (FileNotFoundError, PermissionError) as exc: except (FileNotFoundError, PermissionError) as exc:
raise ModuleFailed(str(exc)) from exc raise ModuleFailed(str(exc)) from exc

View File

@ -549,6 +549,11 @@ class Platform(ABC):
data = sys.stdin.buffer.read(64) data = sys.stdin.buffer.read(64)
has_prefix = self.session.manager._process_input(data, has_prefix) has_prefix = self.session.manager._process_input(data, has_prefix)
except KeyboardInterrupt:
# This is a hack to allow the output thread to signal completion
# We are in raw mode so this couldn't have come from a user pressing
# C-d.
pass
finally: finally:
pwncat.util.pop_term_state() pwncat.util.pop_term_state()
sys.stdin.reconfigure(line_buffering=False) sys.stdin.reconfigure(line_buffering=False)

View File

@ -11,6 +11,7 @@ Popen can be running at a time. It is imperative that you call
to calling any other pwncat methods. to calling any other pwncat methods.
""" """
import os import os
import stat
import time import time
import shlex import shlex
import shutil import shutil
@ -414,13 +415,16 @@ class LinuxWriter(BufferedIOBase):
if self.popen is None: if self.popen is None:
raise UnsupportedOperation("writer is detached") raise UnsupportedOperation("writer is detached")
if self.popen.poll() is not None:
raise PermissionError("file write failed")
if self.popen.platform.has_pty: if self.popen.platform.has_pty:
# Control sequences need escaping # Control sequences need escaping
translated = [] translated = []
for idx, c in enumerate(b): for idx, c in enumerate(b):
# Track when the last new line was # Track when the last new line was
if c == 0x0D: if c == 0x0A:
self.since_newline = 0 self.since_newline = 0
else: else:
self.since_newline += 1 self.since_newline += 1
@ -432,7 +436,7 @@ class LinuxWriter(BufferedIOBase):
# Track all characters in translated buffer # Track all characters in translated buffer
translated.append(c) translated.append(c)
if self.since_newline >= 4095: if self.since_newline >= 2048:
# Flush read immediately to prevent truncation of line # Flush read immediately to prevent truncation of line
translated.append(0x04) translated.append(0x04)
self.since_newline = 0 self.since_newline = 0
@ -463,26 +467,70 @@ class LinuxWriter(BufferedIOBase):
self.detach() self.detach()
return return
# The number of C-d's needed to trigger an EOF in # Trigger EOF in remote process
# the process and exit is inconsistent based on the self.popen.platform.channel.send(b"\x04")
# previous input. So, instead of trying to be deterministic, self.popen.platform.channel.send(b"\x04")
# we simply send one and check. We do this until we find if self.since_newline != 0:
# the ending delimeter and then exit. If the `on_close` self.popen.platform.channel.send(b"\x04")
# hook was setup properly, this should be fine. self.popen.platform.channel.send(b"\x04")
while True:
# Wait for the process to exit
try: try:
self.popen.stdin.write(b"\x04") self.popen.wait()
self.popen.stdin.flush() except KeyboardInterrupt:
# Check for completion # We *should* wait for the process to send the return codes.
self.popen.wait(timeout=0.1) # however, if the user wants to stop this process, we should
break # at least attempt to do whatever cleanup we can.
except pwncat.subprocess.TimeoutExpired: self.popen.kill()
continue self.popen.wait()
# Ensure we don't touch stdio again # Ensure we don't touch stdio again
self.detach() self.detach()
class LinuxPath(pathlib.PurePosixPath):
"""Special cases for Linux remote paths"""
def readable(self):
"""Test if a file is readable"""
uid = self._target._id["euid"]
gid = self._target._id["egid"]
groups = self._target._id["groups"]
file_uid = self.stat().st_uid
file_gid = self.stat().st_gid
file_mode = self.stat().st_mode
if uid == file_uid and (file_mode & stat.S_IRUSR):
return True
elif (gid == file_gid or file_gid in groups) and (file_mode & stat.S_IRGRP):
return True
elif file_mode & stat.S_IROTH:
return True
return False
def writable(self):
uid = self._target._id["euid"]
gid = self._target._id["egid"]
groups = self._target._id["groups"]
file_uid = self.stat().st_uid
file_gid = self.stat().st_gid
file_mode = self.stat().st_mode
if uid == file_uid and (file_mode & stat.S_IWUSR):
return True
elif (gid == file_gid or file_gid in groups) and (file_mode & stat.S_IWGRP):
return True
elif file_mode & stat.S_IWOTH:
return True
return False
class Linux(Platform): class Linux(Platform):
""" """
Concrete platform class abstracting interaction with a GNU/Linux remote Concrete platform class abstracting interaction with a GNU/Linux remote
@ -491,7 +539,7 @@ class Linux(Platform):
""" """
name = "linux" name = "linux"
PATH_TYPE = pathlib.PurePosixPath PATH_TYPE = LinuxPath
PROMPTS = { PROMPTS = {
"sh": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""", "sh": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""",
"dash": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""", "dash": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""",
@ -507,7 +555,7 @@ class Linux(Platform):
self.name = "linux" self.name = "linux"
self.command_running = None self.command_running = None
self._uid = None self._id = None
# This causes an stty to be sent. # This causes an stty to be sent.
# If we aren't in a pty, it doesn't matter. # If we aren't in a pty, it doesn't matter.
@ -663,7 +711,7 @@ class Linux(Platform):
) )
hostname = result.stdout.strip() hostname = result.stdout.strip()
except CalledProcessError: except CalledProcessError:
hostname = self.channel.getpeername()[0] hostname = self.channel.host
try: try:
self.session.update_task( self.session.update_task(
@ -766,10 +814,20 @@ class Linux(Platform):
while True: while True:
try: try:
proc = self.run( proc = self.run(
["id", "-ru"], capture_output=True, text=True, check=True "(id -ru;id -u;id -g;id -rg;id -G;)",
capture_output=True,
text=True,
check=True,
) )
self._uid = int(proc.stdout.rstrip("\n")) idents = proc.stdout.split("\n")
return self._uid self._id = {
"ruid": int(idents[0].strip()),
"euid": int(idents[1].strip()),
"rgid": int(idents[2].strip()),
"egid": int(idents[3].strip()),
"groups": [int(g.strip()) for g in idents[4].split(" ")],
}
return self._id["ruid"]
except ValueError: except ValueError:
continue continue
except CalledProcessError as exc: except CalledProcessError as exc:
@ -777,7 +835,7 @@ class Linux(Platform):
def getuid(self): def getuid(self):
"""Retrieve the current cached uid""" """Retrieve the current cached uid"""
return self._uid return self._id["ruid"]
def getenv(self, name: str): def getenv(self, name: str):
@ -1153,6 +1211,24 @@ class Linux(Platform):
if any(c not in "rwb" for c in mode): if any(c not in "rwb" for c in mode):
raise PlatformError(f"{mode}: unknown file mode") raise PlatformError(f"{mode}: unknown file mode")
if isinstance(path, str):
path = self.Path(path)
if "r" in mode and not path.exists():
raise FileNotFoundError(f"No such file or directory: {str(path)}")
if "r" in mode and not path.readable():
raise PermissionError(f"Permission Denied: {str(path)}")
if "w" in mode:
parent = path.parent
if "w" in mode and path.exists() and not path.writable():
raise PermissionError(f"Permission Denied: {str(path)}")
if "w" in mode and not path.exists() and not parent.writable():
raise PermissionError(f"Permission Denied: {str(path)}")
if "w" in mode and not path.exists() and not parent.exists():
raise FileNotFoundError(f"No such file or directory: {str(path)}")
# Save this just in case we are opening a text-mode stream # Save this just in case we are opening a text-mode stream
line_buffering = buffering == -1 or buffering == 1 line_buffering = buffering == -1 or buffering == 1
@ -1174,7 +1250,7 @@ class Linux(Platform):
except MissingBinary: except MissingBinary:
pass pass
else: else:
raise PlatformError("no available gtfobins writiers") raise PlatformError("no available gtfobins writers")
popen = self.Popen( popen = self.Popen(
payload, payload,
@ -1203,7 +1279,7 @@ class Linux(Platform):
except MissingBinary: except MissingBinary:
pass pass
else: else:
raise PlatformError("no available gtfobins writiers") raise PlatformError("no available gtfobins writers")
popen = self.Popen( popen = self.Popen(
payload, payload,

View File

@ -13,7 +13,6 @@ processes and open multiple files with this platform. However, you should be
careful to cleanup all processes and files prior to return from your method careful to cleanup all processes and files prior to return from your method
or code as the C2 will not attempt to garbage collect file or proces handles. or code as the C2 will not attempt to garbage collect file or proces handles.
""" """
import os
import sys import sys
import gzip import gzip
import json import json
@ -26,6 +25,7 @@ import hashlib
import pathlib import pathlib
import tarfile import tarfile
import binascii import binascii
import readline # noqa: F401
import functools import functools
import threading import threading
import subprocess import subprocess
@ -46,7 +46,7 @@ PWNCAT_WINDOWS_C2_VERSION = "v0.2.1"
PWNCAT_WINDOWS_C2_RELEASE_URL = "https://github.com/calebstewart/pwncat-windows-c2/releases/download/{version}/pwncat-windows-{version}.tar.gz" PWNCAT_WINDOWS_C2_RELEASE_URL = "https://github.com/calebstewart/pwncat-windows-c2/releases/download/{version}/pwncat-windows-{version}.tar.gz"
class PowershellError(Exception): class PowershellError(PlatformError):
"""Executing a powershell script caused an error""" """Executing a powershell script caused an error"""
def __init__(self, msg): def __init__(self, msg):
@ -55,7 +55,7 @@ class PowershellError(Exception):
self.message = msg self.message = msg
class ProtocolError(Exception): class ProtocolError(PlatformError):
def __init__(self, code: int, message: str): def __init__(self, code: int, message: str):
self.code = code self.code = code
self.message = message self.message = message
@ -908,7 +908,7 @@ function prompt {
transformed = bytearray(b"") transformed = bytearray(b"")
has_cr = False has_cr = False
for b in data: for idx, b in enumerate(data):
# Basically, we just transform bare \r to \r\n # Basically, we just transform bare \r to \r\n
if has_cr and b != ord("\n"): if has_cr and b != ord("\n"):
@ -924,10 +924,9 @@ function prompt {
if INTERACTIVE_END_MARKER[self.interactive_tracker] == b: if INTERACTIVE_END_MARKER[self.interactive_tracker] == b:
self.interactive_tracker += 1 self.interactive_tracker += 1
if self.interactive_tracker == len(INTERACTIVE_END_MARKER): if self.interactive_tracker == len(INTERACTIVE_END_MARKER):
# NOTE: this is a dirty hack to trigger the main input thread self.interactive_tracker = 0
# to leave interactive mode, because it's bound in an input call self.channel.unrecv(data[idx + 1 :])
os.kill(os.getpid(), signal.SIGINT) raise pwncat.util.RawModeExit
raise pwncat.manager.RawModeExit
else: else:
self.interactive_tracker = 0 self.interactive_tracker = 0

View File

@ -118,9 +118,9 @@ def isprintable(data) -> bool:
def human_readable_size(size, decimal_places=2): def human_readable_size(size, decimal_places=2):
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]: for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
if size < 1024.0: if size < 1000.0:
return f"{size:.{decimal_places}f}{unit}" return f"{size:.{decimal_places}f}{unit}"
size /= 1024.0 size /= 1000.0
return f"{size:.{decimal_places}f}{unit}" return f"{size:.{decimal_places}f}{unit}"

49
tests/test_fileio.py Normal file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
from pwncat.util import random_string
def do_file_test(session, content):
"""Do a generic file test"""
name = random_string() + ".txt"
mode = "b" if isinstance(content, bytes) else ""
with session.platform.open(name, mode + "w") as filp:
assert filp.write(content) == len(content)
with session.platform.open(name, mode + "r") as filp:
assert filp.read() == content
# In some cases, the act of reading/writing causes a shell to hang
# so double check that.
result = session.platform.run(
["echo", "hello world"], capture_output=True, text=True
)
assert result.stdout == "hello world\n"
def test_small_text(session):
"""Test writing a small text-only file"""
do_file_test(session, "hello world")
def test_large_text(session):
"""Test writing and reading a large text file"""
contents = ("A" * 1000 + "\n") * 10
do_file_test(session, contents)
def test_small_binary(session):
"""Test writing a small amount of binary data"""
contents = bytes(list(range(32)))
do_file_test(session, contents)
def test_large_binary(session):
contents = bytes(list(range(32))) * 400
do_file_test(session, contents)

View File

@ -10,27 +10,8 @@ from pwncat.util import random_string
from pwncat.platform.windows import PowershellError from pwncat.platform.windows import PowershellError
def test_platform_file_io(session):
""" Test file read/write of printable data """
# Generate random binary data
contents = os.urandom(1024)
# Create a new temporary file
with session.platform.tempfile(mode="wb") as filp:
filp.write(contents)
path = filp.name
# Ensure it exists
assert session.platform.Path(path).exists()
# Read the data back and ensure it matches
with session.platform.open(path, "rb") as filp:
assert contents == filp.read()
def test_platform_dir_io(session): def test_platform_dir_io(session):
""" Test creating a directory and interacting with the contents """ """Test creating a directory and interacting with the contents"""
# Create a path object representing the new remote directory # Create a path object representing the new remote directory
path = session.platform.Path(random_string()) path = session.platform.Path(random_string())
@ -61,7 +42,7 @@ def test_platform_run(session):
def test_platform_su(session): def test_platform_su(session):
""" Test running `su` """ """Test running `su`"""
try: try:
session.platform.su("john", "P@ssw0rd") session.platform.su("john", "P@ssw0rd")
@ -77,7 +58,7 @@ def test_platform_su(session):
def test_platform_sudo(session): def test_platform_sudo(session):
""" Testing running `sudo` """ """Testing running `sudo`"""
try: try:
@ -103,7 +84,7 @@ def test_platform_sudo(session):
def test_windows_powershell(windows): def test_windows_powershell(windows):
""" Test powershell execution """ """Test powershell execution"""
# Run a real powershell snippet # Run a real powershell snippet
r = windows.platform.powershell("$PSVersionTable.PSVersion") r = windows.platform.powershell("$PSVersionTable.PSVersion")