mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-24 01:25:37 +01:00
Merge pull request #141 from calebstewart/issue-140-linux-file-writer-close
- Removed `C-d` loop in `LinuxWriter.close` - Added double `C-d` routine based on last character written to `LinuxWriter.close` - Changed upload success message to match size calculations from `rich.progress`. - Added better file IO test cases (small text, large text, small binary, large binary)
This commit is contained in:
commit
5ec4e35bb1
@ -14,6 +14,9 @@ and simply didn't have the time to go back and retroactively create one.
|
||||
### Changed
|
||||
- Changed session tracking so session IDs aren't reused
|
||||
- Changed zsh prompt to match CWD of other shell prompts
|
||||
- Changed LinuxWriter close routine again to account for needed EOF signals ([#140](https://github.com/calebstewart/pwncat/issues/140))
|
||||
### Added
|
||||
- Added better file io test cases
|
||||
|
||||
## [0.4.2] - 2021-06-15
|
||||
Quick patch release due to corrected bug in `ChannelFile` which caused command
|
||||
|
@ -45,17 +45,6 @@ class Command(CommandDefinition):
|
||||
|
||||
if not args.destination:
|
||||
args.destination = f"./{os.path.basename(args.source)}"
|
||||
# else:
|
||||
# access = pwncat.victim.access(args.destination)
|
||||
# if Access.DIRECTORY in access:
|
||||
# args.destination = os.path.join(
|
||||
# args.destination, os.path.basename(args.source)
|
||||
# )
|
||||
# elif Access.PARENT_EXIST not in access:
|
||||
# console.log(
|
||||
# f"[cyan]{args.destination}[/cyan]: no such file or directory"
|
||||
# )
|
||||
# return
|
||||
|
||||
try:
|
||||
length = os.path.getsize(args.source)
|
||||
|
@ -420,7 +420,7 @@ class LinuxWriter(BufferedIOBase):
|
||||
for idx, c in enumerate(b):
|
||||
|
||||
# Track when the last new line was
|
||||
if c == 0x0D:
|
||||
if c == 0x0A:
|
||||
self.since_newline = 0
|
||||
else:
|
||||
self.since_newline += 1
|
||||
@ -432,7 +432,7 @@ class LinuxWriter(BufferedIOBase):
|
||||
# Track all characters in translated buffer
|
||||
translated.append(c)
|
||||
|
||||
if self.since_newline >= 4095:
|
||||
if self.since_newline >= 2048:
|
||||
# Flush read immediately to prevent truncation of line
|
||||
translated.append(0x04)
|
||||
self.since_newline = 0
|
||||
@ -463,21 +463,22 @@ class LinuxWriter(BufferedIOBase):
|
||||
self.detach()
|
||||
return
|
||||
|
||||
# The number of C-d's needed to trigger an EOF in
|
||||
# the process and exit is inconsistent based on the
|
||||
# previous input. So, instead of trying to be deterministic,
|
||||
# we simply send one and check. We do this until we find
|
||||
# the ending delimeter and then exit. If the `on_close`
|
||||
# hook was setup properly, this should be fine.
|
||||
while True:
|
||||
try:
|
||||
self.popen.stdin.write(b"\x04")
|
||||
self.popen.stdin.flush()
|
||||
# Check for completion
|
||||
self.popen.wait(timeout=0.1)
|
||||
break
|
||||
except pwncat.subprocess.TimeoutExpired:
|
||||
continue
|
||||
# Trigger EOF in remote process
|
||||
self.popen.platform.channel.send(b"\x04")
|
||||
self.popen.platform.channel.send(b"\x04")
|
||||
if self.since_newline != 0:
|
||||
self.popen.platform.channel.send(b"\x04")
|
||||
self.popen.platform.channel.send(b"\x04")
|
||||
|
||||
# Wait for the process to exit
|
||||
try:
|
||||
self.popen.wait()
|
||||
except KeyboardInterrupt:
|
||||
# We *should* wait for the process to send the return codes.
|
||||
# however, if the user wants to stop this process, we should
|
||||
# at least attempt to do whatever cleanup we can.
|
||||
self.popen.kill()
|
||||
self.popen.wait()
|
||||
|
||||
# Ensure we don't touch stdio again
|
||||
self.detach()
|
||||
|
@ -118,9 +118,9 @@ def isprintable(data) -> bool:
|
||||
|
||||
def human_readable_size(size, decimal_places=2):
|
||||
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
|
||||
if size < 1024.0:
|
||||
if size < 1000.0:
|
||||
return f"{size:.{decimal_places}f}{unit}"
|
||||
size /= 1024.0
|
||||
size /= 1000.0
|
||||
return f"{size:.{decimal_places}f}{unit}"
|
||||
|
||||
|
||||
|
49
tests/test_fileio.py
Normal file
49
tests/test_fileio.py
Normal file
@ -0,0 +1,49 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from pwncat.util import random_string
|
||||
|
||||
|
||||
def do_file_test(session, content):
|
||||
"""Do a generic file test"""
|
||||
|
||||
name = random_string() + ".txt"
|
||||
mode = "b" if isinstance(content, bytes) else ""
|
||||
|
||||
with session.platform.open(name, mode + "w") as filp:
|
||||
assert filp.write(content) == len(content)
|
||||
|
||||
with session.platform.open(name, mode + "r") as filp:
|
||||
assert filp.read() == content
|
||||
|
||||
# In some cases, the act of reading/writing causes a shell to hang
|
||||
# so double check that.
|
||||
result = session.platform.run(
|
||||
["echo", "hello world"], capture_output=True, text=True
|
||||
)
|
||||
assert result.stdout == "hello world\n"
|
||||
|
||||
|
||||
def test_small_text(session):
|
||||
"""Test writing a small text-only file"""
|
||||
|
||||
do_file_test(session, "hello world")
|
||||
|
||||
|
||||
def test_large_text(session):
|
||||
"""Test writing and reading a large text file"""
|
||||
|
||||
contents = ("A" * 1000 + "\n") * 10
|
||||
do_file_test(session, contents)
|
||||
|
||||
|
||||
def test_small_binary(session):
|
||||
"""Test writing a small amount of binary data"""
|
||||
|
||||
contents = bytes(list(range(32)))
|
||||
do_file_test(session, contents)
|
||||
|
||||
|
||||
def test_large_binary(session):
|
||||
|
||||
contents = bytes(list(range(32))) * 400
|
||||
do_file_test(session, contents)
|
@ -10,27 +10,8 @@ from pwncat.util import random_string
|
||||
from pwncat.platform.windows import PowershellError
|
||||
|
||||
|
||||
def test_platform_file_io(session):
|
||||
""" Test file read/write of printable data """
|
||||
|
||||
# Generate random binary data
|
||||
contents = os.urandom(1024)
|
||||
|
||||
# Create a new temporary file
|
||||
with session.platform.tempfile(mode="wb") as filp:
|
||||
filp.write(contents)
|
||||
path = filp.name
|
||||
|
||||
# Ensure it exists
|
||||
assert session.platform.Path(path).exists()
|
||||
|
||||
# Read the data back and ensure it matches
|
||||
with session.platform.open(path, "rb") as filp:
|
||||
assert contents == filp.read()
|
||||
|
||||
|
||||
def test_platform_dir_io(session):
|
||||
""" Test creating a directory and interacting with the contents """
|
||||
"""Test creating a directory and interacting with the contents"""
|
||||
|
||||
# Create a path object representing the new remote directory
|
||||
path = session.platform.Path(random_string())
|
||||
@ -61,7 +42,7 @@ def test_platform_run(session):
|
||||
|
||||
|
||||
def test_platform_su(session):
|
||||
""" Test running `su` """
|
||||
"""Test running `su`"""
|
||||
|
||||
try:
|
||||
session.platform.su("john", "P@ssw0rd")
|
||||
@ -77,7 +58,7 @@ def test_platform_su(session):
|
||||
|
||||
|
||||
def test_platform_sudo(session):
|
||||
""" Testing running `sudo` """
|
||||
"""Testing running `sudo`"""
|
||||
|
||||
try:
|
||||
|
||||
@ -103,7 +84,7 @@ def test_platform_sudo(session):
|
||||
|
||||
|
||||
def test_windows_powershell(windows):
|
||||
""" Test powershell execution """
|
||||
"""Test powershell execution"""
|
||||
|
||||
# Run a real powershell snippet
|
||||
r = windows.platform.powershell("$PSVersionTable.PSVersion")
|
||||
|
Loading…
Reference in New Issue
Block a user