1
0
mirror of https://github.com/calebstewart/pwncat.git synced 2024-11-24 01:25:37 +01:00

Merge branch 'master' into issue-134-dockerhub-builds-failing

This commit is contained in:
Caleb Stewart 2021-06-16 17:31:54 -04:00 committed by GitHub
commit 6500ba72ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 159 additions and 134 deletions

View File

@ -10,6 +10,10 @@ and simply didn't have the time to go back and retroactively create one.
## [Unreleased]
### Fixed
- Pinned container base image to alpine 3.13.5 and installed to virtualenv ([#134](https://github.com/calebstewart/pwncat/issues/134))
- Fixed syntax for f-strings in escalation command
### Changed
- Changed session tracking so session IDs aren't reused
- Changed zsh prompt to match CWD of other shell prompts
## [0.4.2] - 2021-06-15
Quick patch release due to corrected bug in `ChannelFile` which caused command

View File

@ -290,9 +290,16 @@ def main():
transient=True,
) as progress:
task = progress.add_task("task", status="...")
while manager.sessions:
progress.update(task, status=str(manager.sessions[0].platform))
manager.sessions[0].close()
# Retrieve the existing session IDs list
session_ids = list(manager.sessions.keys())
# Close each session based on its ``session_id``
for session_id in session_ids:
progress.update(
task, status=str(manager.sessions[session_id].platform)
)
manager.sessions[session_id].close()
progress.update(task, status="done!", completed=100)

View File

@ -53,7 +53,7 @@ class ChannelClosed(ChannelError):
super().__init__(ch, "channel unexpectedly closed")
def cleanup(self, manager: "pwncat.manager.Manager"):
""" Cleanup this channel from the manager """
"""Cleanup this channel from the manager"""
# If we don't have a session, there's nothing to do
session = manager.find_session_by_channel(self.channel)
@ -120,7 +120,7 @@ class ChannelFile(RawIOBase):
@property
def blocking(self) -> bool:
""" Indicates whether to act like a blocking file or not. """
"""Indicates whether to act like a blocking file or not."""
return self._blocking
@blocking.setter
@ -128,15 +128,15 @@ class ChannelFile(RawIOBase):
self._blocking = value
def readable(self) -> bool:
""" Test if this is a readable file. """
"""Test if this is a readable file."""
return "r" in self.mode
def writable(self) -> bool:
""" Test if this is writable file. """
"""Test if this is writable file."""
return "w" in self.mode
def close(self):
""" Close the file for reading/writing. This method calls the on_close hook. """
"""Close the file for reading/writing. This method calls the on_close hook."""
if self.eof:
return
@ -147,7 +147,7 @@ class ChannelFile(RawIOBase):
self.on_close(self)
def readall(self):
""" Read all data until EOF """
"""Read all data until EOF"""
data = b""
@ -351,7 +351,7 @@ class Channel(ABC):
"""
def drain(self):
""" Drain any buffered data until there is nothing left """
"""Drain any buffered data until there is nothing left"""
while True:
data = self.recv(4096)

View File

@ -17,7 +17,7 @@ from pwncat.channel import Channel, ChannelError, ChannelClosed
class Ssh(Channel):
""" Wrap SSH shell channel in a pwncat channel. """
"""Wrap SSH shell channel in a pwncat channel."""
def __init__(
self,

View File

@ -396,20 +396,20 @@ def resolve_blocks(source: str):
class DatabaseHistory(History):
""" Yield history from the host entry in the database """
"""Yield history from the host entry in the database"""
def __init__(self, manager):
super().__init__()
self.manager = manager
def load_history_strings(self) -> Iterable[str]:
""" Load the history from the database """
"""Load the history from the database"""
with self.manager.db.transaction() as conn:
yield from reversed(conn.root.history)
def store_string(self, string: str) -> None:
""" Store a command in the database """
"""Store a command in the database"""
with self.manager.db.transaction() as conn:
conn.root.history.append(string)
@ -423,7 +423,7 @@ class CommandParser:
command mode."""
def __init__(self, manager: "pwncat.manager.Manager"):
""" We need to dynamically load commands from pwncat.commands """
"""We need to dynamically load commands from pwncat.commands"""
self.manager = manager
self.commands: List["CommandDefinition"] = []
@ -460,7 +460,7 @@ class CommandParser:
@bindings.add("c-q")
def _(event):
""" Exit interactive mode """
"""Exit interactive mode"""
get_app().exit(exception=pwncat.manager.InteractiveExit())
@ -483,7 +483,7 @@ class CommandParser:
)
def _render_toolbar(self):
""" Render the formatted text for the bottom toolbar """
"""Render the formatted text for the bottom toolbar"""
if self.manager.target is None:
markup_result = "Active Session: [red]None[/red]"
@ -537,7 +537,7 @@ class CommandParser:
break
def run_single(self):
""" Execute one Read-Execute iteration. This will prompt the user for input. """
"""Execute one Read-Execute iteration. This will prompt the user for input."""
if self.prompt is None:
self.setup_prompt()
@ -605,7 +605,7 @@ class CommandParser:
continue
def dispatch_line(self, line: str, prog_name: str = None):
""" Parse the given line of command input and dispatch a command """
"""Parse the given line of command input and dispatch a command"""
# Account for blank or whitespace only lines
line = line.strip()
@ -785,7 +785,7 @@ class CommandLexer(RegexLexer):
@classmethod
def build(cls, commands: List["CommandDefinition"]) -> Type["CommandLexer"]:
""" Build the RegexLexer token list from the command definitions """
"""Build the RegexLexer token list from the command definitions"""
root = []
for command in commands:
@ -826,7 +826,7 @@ class CommandLexer(RegexLexer):
class RemotePathCompleter(Completer):
""" Complete remote file names/paths """
"""Complete remote file names/paths"""
def __init__(self, manager: "pwncat.manager.Manager", *args, **kwargs):
super().__init__(*args, **kwargs)
@ -853,7 +853,7 @@ class RemotePathCompleter(Completer):
class LocalPathCompleter(Completer):
""" Complete local file names/paths. """
"""Complete local file names/paths."""
def get_completions(self, document: Document, complete_event: CompleteEvent):
@ -883,7 +883,7 @@ class CommandCompleter(Completer):
def __init__(
self, manager: "pwncat.manager.Manager", commands: List["CommandDefinition"]
):
""" Construct a new command completer """
"""Construct a new command completer"""
self.layers = {}
local_file_completer = LocalPathCompleter()
@ -918,7 +918,7 @@ class CommandCompleter(Completer):
def get_completions(
self, document: Document, complete_event: CompleteEvent
) -> Iterable[Completion]:
""" Get a list of completions for the given document """
"""Get a list of completions for the given document"""
text = document.text_before_cursor.lstrip()
try:

View File

@ -5,7 +5,7 @@ from pwncat.commands import CommandDefinition
class Command(CommandDefinition):
""" Return to the remote terminal """
"""Return to the remote terminal"""
PROG = "back"
ARGS = {}

View File

@ -18,7 +18,7 @@ from pwncat.commands import Complete, Parameter, CommandDefinition
class Command(CommandDefinition):
""" Download a file from the remote host to the local host"""
"""Download a file from the remote host to the local host"""
PROG = "download"
ARGS = {

View File

@ -20,7 +20,7 @@ def get_user_choices(command: CommandDefinition):
class Link:
""" Link in the escalation chain """
"""Link in the escalation chain"""
def __init__(self, old_session, escalation, result):
@ -32,7 +32,7 @@ class Link:
if self.escalation.type == "escalate.spawn":
self.result.log(
"leaving behind open session as [cyan]{self.old_session.current_user().name}[/cyan]"
f"leaving behind open session as [cyan]{self.old_session.current_user().name}[/cyan]"
)
self.old_session.manager.target = self.old_session
@ -133,7 +133,7 @@ class Command(CommandDefinition):
console.log("[yellow]warning[/yellow]: no direct escalations found")
def do_escalate(self, manager: "pwncat.manager.Manager", task, user, args):
""" Execute escalations until we find one that works """
"""Execute escalations until we find one that works"""
attempted = []
chain = []
@ -163,7 +163,7 @@ class Command(CommandDefinition):
continue
except IndexError:
manager.target.log(
"[red]error[/red]: no working escalation paths found for {user.name}"
f"[red]error[/red]: no working escalation paths found for {user.name}"
)
break

View File

@ -10,7 +10,7 @@ from pwncat.commands import Complete, Parameter, CommandDefinition
class Command(CommandDefinition):
""" List known commands and print their associated help documentation. """
"""List known commands and print their associated help documentation."""
def get_command_names(self):
try:

View File

@ -10,7 +10,7 @@ from pwncat.commands import Complete, Parameter, CommandDefinition, get_module_c
class Command(CommandDefinition):
""" View info about a module """
"""View info about a module"""
PROG = "info"
ARGS = {

View File

@ -6,7 +6,7 @@ from pwncat.commands import CommandDefinition
class Command(CommandDefinition):
""" Run a local shell command on your attacking machine """
"""Run a local shell command on your attacking machine"""
PROG = "local"
ARGS = None

View File

@ -10,7 +10,7 @@ from pwncat.commands import Complete, Parameter, CommandDefinition
class Command(CommandDefinition):
""" View info about a module """
"""View info about a module"""
PROG = "search"
ARGS = {

View File

@ -40,15 +40,15 @@ class Command(CommandDefinition):
if args.list or (not args.kill and args.session_id is None):
table = Table(title="Active Sessions", box=box.MINIMAL_DOUBLE_HEAD)
table.add_column("")
table.add_column("ID")
table.add_column("User")
table.add_column("Host ID")
table.add_column("Platform")
table.add_column("Type")
table.add_column("Address")
for ident, session in enumerate(manager.sessions):
ident = str(ident)
for session_id, session in manager.sessions.items():
ident = str(session_id)
kwargs = {"style": ""}
if session is manager.target:
ident = "*" + ident
@ -71,7 +71,8 @@ class Command(CommandDefinition):
console.log("[red]error[/red]: no session id specified")
return
if args.session_id < 0 or args.session_id >= len(manager.sessions):
# check if a session with the provided ``session_id`` exists or not
if args.session_id not in manager.sessions:
console.log(f"[red]error[/red]: {args.session_id}: no such session!")
return

View File

@ -5,7 +5,7 @@ from pwncat.commands import Complete, Parameter, CommandDefinition
class Command(CommandDefinition):
""" Set variable runtime variable parameters for pwncat """
"""Set variable runtime variable parameters for pwncat"""
def get_config_variables(self):
options = ["state"] + list(self.manager.config.values)

View File

@ -17,7 +17,7 @@ from pwncat.commands import Complete, Parameter, CommandDefinition
class Command(CommandDefinition):
""" Upload a file from the local host to the remote host"""
"""Upload a file from the local host to the remote host"""
PROG = "upload"
ARGS = {

View File

@ -6,7 +6,7 @@ from pwncat.commands import Complete, Parameter, CommandDefinition, get_module_c
class Command(CommandDefinition):
""" Set the currently used module in the config handler """
"""Set the currently used module in the config handler"""
PROG = "use"
ARGS = {

View File

@ -61,14 +61,14 @@ class KeyType:
def local_file_type(value: str) -> str:
""" Ensure the local file exists """
"""Ensure the local file exists"""
if not os.path.isfile(value):
raise ValueError(f"{value}: no such file or directory")
return value
def local_dir_type(value: str) -> str:
""" Ensure the path specifies a local directory """
"""Ensure the path specifies a local directory"""
# Expand ~ in the path
value = os.path.expanduser(value)
@ -143,7 +143,7 @@ class Config:
return new
def binding(self, name_or_value: Union[str, bytes]) -> str:
""" Get a key binding by it's key name or key value. """
"""Get a key binding by it's key name or key value."""
if isinstance(name_or_value, bytes):
binding = [
@ -157,7 +157,7 @@ class Config:
return self.bindings[key]
def set(self, name: str, value: Any, glob: bool = False):
""" Set a config value """
"""Set a config value"""
if (
(glob and name not in self.values)
@ -176,7 +176,7 @@ class Config:
self.locals[name] = self.module.ARGUMENTS[name].type(value)
def get(self, name: str, default=None):
""" get a value """
"""get a value"""
try:
return self[name]
@ -191,13 +191,13 @@ class Config:
self.module = module
def back(self):
""" Remove the currently used module and clear locals """
"""Remove the currently used module and clear locals"""
self.locals = {}
self.module = None
def __getitem__(self, name: str) -> Any:
""" Get a configuration item """
"""Get a configuration item"""
if name in self.locals:
return self.locals[name]
@ -205,7 +205,7 @@ class Config:
return self.values[name]["value"]
def __setitem__(self, name: str, value: Any):
""" Set a configuration item """
"""Set a configuration item"""
return self.set(name, value, glob=False)
def __iter__(self):

View File

@ -279,12 +279,12 @@ class PrivateKey(Implant):
return self.content
def remove(self, session: "pwncat.manager.Session"):
""" Remove the implant types from this private key """
"""Remove the implant types from this private key"""
raise KeepImplantFact()
def escalate(self, session: "pwncat.manager.Session"):
""" Escalate to the owner of this private key with a local ssh call """
"""Escalate to the owner of this private key with a local ssh call"""
if not self.authorized:
raise ModuleFailed("key is not authorized or failed")
@ -342,7 +342,7 @@ class PrivateKey(Implant):
def trigger(
self, manager: "pwncat.manager.Manager", target: "pwncat.target.Target"
):
""" Connect remotely to this target with the specified user and key """
"""Connect remotely to this target with the specified user and key"""
if not self.authorized:
raise ModuleFailed("key is not authorized or failed")

View File

@ -15,7 +15,7 @@ from pwncat.db import Fact
class ImplantType(enum.Flag):
""" Type of implant which was installed """
"""Type of implant which was installed"""
SPAWN = enum.auto()
""" Capable of spawning a new session to escalate privileges locally """

View File

@ -40,7 +40,7 @@ class Tamper(Fact):
@property
def revertable(self):
""" Test if this tamper is currently revertable """
"""Test if this tamper is currently revertable"""
return False
def revert(self, session: "pwncat.manager.Session"):

View File

@ -58,6 +58,7 @@ class Session:
channel: Optional[Channel] = None,
**kwargs,
):
self.id = manager.session_id
self.manager = manager
self.background = None
self._db_session = None
@ -87,7 +88,7 @@ class Session:
)
# Register this session with the manager
self.manager.sessions.append(self)
self.manager.sessions[self.id] = self
self.manager.target = self
# Initialize the host reference
@ -290,10 +291,10 @@ class Session:
def died(self):
if self not in self.manager.sessions:
if self.id not in self.manager.sessions:
return
self.manager.sessions.remove(self)
del self.manager.sessions[self.id]
if self.manager.target == self:
self.manager.target = None
@ -343,7 +344,8 @@ class Manager:
def __init__(self, config: str = None):
self.config = Config()
self.sessions: List[Session] = []
self.session_id = 0 # start with 0-indexed session IDs
self.sessions: Dict[int, Session] = {}
self.modules: Dict[str, pwncat.modules.BaseModule] = {}
self._target = None
self.parser = CommandParser(self)
@ -415,8 +417,12 @@ class Manager:
def __exit__(self, _, __, ___):
"""Ensure all sessions are closed"""
while self.sessions:
self.sessions[0].close()
# Retrieve the existing session IDs list
session_ids = list(self.sessions.keys())
# Close each session based on its ``session_id``
for session_id in session_ids:
self.sessions[session_id].close()
def open_database(self):
"""Create the internal engine and session builder
@ -503,7 +509,7 @@ class Manager:
@target.setter
def target(self, value: Session):
if value is not None and value not in self.sessions:
if value is not None and value not in self.sessions.values():
raise ValueError("invalid target")
self._target = value
@ -623,6 +629,13 @@ class Manager:
"""
session = Session(self, platform, channel, **kwargs)
# Increment the ``session_id`` variable upon adding a new session
# Session constructor will automatically grab the current
# ``session_id`` from the ``manager`` object passed as the first argument
self.session_id += 1
return session
def _process_input(self, data: bytes, has_prefix: bool):

View File

@ -43,7 +43,7 @@ LOADED_MODULES = {}
class NoValue:
""" Indicates that the module argument has no default value and is required. """
"""Indicates that the module argument has no default value and is required."""
class ModuleFailed(Exception):

View File

@ -12,7 +12,7 @@ class Module(BaseModule):
PLATFORM = None
def run(self, session: "pwncat.manager.Session"):
""" Iterate over all tampers and revert what we can """
"""Iterate over all tampers and revert what we can"""
current_user = session.current_user()

View File

@ -25,7 +25,7 @@ class Module(BaseModule):
}
def run(self, session, remove, escalate):
""" Perform the requested action """
"""Perform the requested action"""
if (not remove and not escalate) or (remove and escalate):
raise ModuleFailed("expected one of escalate or remove")

View File

@ -42,7 +42,7 @@ class Module(BaseModule):
}
def generate_markdown_table(self, data: List[List], headers: bool = False):
""" Generate a markdown table from the given data and headers """
"""Generate a markdown table from the given data and headers"""
# Get column widths
widths = [
@ -73,7 +73,7 @@ class Module(BaseModule):
return " \n".join(rows)
def run(self, session: "pwncat.manager.Session", output, template, fmt, custom):
""" Perform enumeration and optionally write report """
"""Perform enumeration and optionally write report"""
if custom:
env = jinja2.Environment(

View File

@ -11,7 +11,7 @@ from pwncat.modules.linux.implant.passwd import PasswdImplant
class AppendPasswd(EscalationReplace):
""" Escalation through adding a new user to /etc/passwd """
"""Escalation through adding a new user to /etc/passwd"""
def __init__(self, source, ability):
super().__init__(source=source, source_uid=ability.source_uid, uid=ability.uid)
@ -73,7 +73,7 @@ class AppendPasswd(EscalationReplace):
class Module(EnumerateModule):
""" Check for possible methods of escalation via modiying /etc/passwd """
"""Check for possible methods of escalation via modiying /etc/passwd"""
PROVIDES = ["escalate.replace"]
SCHEDULE = Schedule.PER_USER

View File

@ -15,7 +15,7 @@ class Module(EnumerateModule):
SCHEDULE = Schedule.ALWAYS
def enumerate(self, session: "pwncat.manager.Session"):
""" Locate usable file read abilities and generate escalations """
"""Locate usable file read abilities and generate escalations"""
# Ensure users are already cached
list(session.iter_users())

View File

@ -13,7 +13,7 @@ from pwncat.modules.enumerate import Schedule, EnumerateModule
class CVE_2017_5618(ExecuteAbility):
""" Exploit CVE-2017-5618 """
"""Exploit CVE-2017-5618"""
def __init__(self, source: str, screen):
super().__init__(source=source, source_uid=None, uid=0)
@ -21,7 +21,7 @@ class CVE_2017_5618(ExecuteAbility):
self.screen = screen
def shell(self, session: "pwncat.manager.Session"):
""" Execute a shell """
"""Execute a shell"""
# Write the rootshell source code
rootshell_source = textwrap.dedent(
@ -151,20 +151,20 @@ class CVE_2017_5618(ExecuteAbility):
return lambda s: s.platform.channel.send(b"exit\n")
def title(self, session):
""" Grab the description for this fact """
"""Grab the description for this fact"""
return f"[cyan]{self.screen.path}[/cyan] vulnerable to [red]CVE-2017-5618[/red]"
class Module(EnumerateModule):
""" Identify systems vulnerable to CVE-2017-5618 """
"""Identify systems vulnerable to CVE-2017-5618"""
PROVIDES = ["ability.execute"]
PLATFORM = [Linux]
SCHEDULE = Schedule.PER_USER
def enumerate(self, session: "pwncat.manager.Session"):
""" check for vulnerable screen versions """
"""check for vulnerable screen versions"""
for screen in session.run("enumerate", types=["software.screen.version"]):
if not screen.vulnerable or (screen.perms & 0o4000) == 0:

View File

@ -17,7 +17,7 @@ class Module(EnumerateModule):
SCHEDULE = Schedule.PER_USER
def enumerate(self, session: "pwncat.manager.Session"):
""" Check for vulnerability """
"""Check for vulnerability"""
try:
# Utilize the version enumeration to grab sudo version

View File

@ -49,7 +49,7 @@ class ServiceData(Fact):
def build_service_data(session, source, service):
""" Build a service data object from a dictionary """
"""Build a service data object from a dictionary"""
# Grab the user name if available
user = service.get("User", None).strip()

View File

@ -10,7 +10,7 @@ from pwncat.modules.implant import ImplantModule
class AuthorizedKeyImplant(PrivateKey):
""" A public key added to a user's authorized keys file """
"""A public key added to a user's authorized keys file"""
def __init__(self, source, user, key, pubkey):
super().__init__(
@ -25,12 +25,12 @@ class AuthorizedKeyImplant(PrivateKey):
self.pubkey = pubkey
def title(self, session: "pwncat.manager.Session"):
""" Provide a human-readable description """
"""Provide a human-readable description"""
user = session.find_user(uid=self.uid)
return f"backdoor public key added to [blue]{user.name}[/blue] authorized_keys"
def description(self, session: "pwncat.manager.Session"):
""" We don't want to print the whole key, since we installed it. """
"""We don't want to print the whole key, since we installed it."""
return None
def remove(self, session: "pwncat.manager.Session"):

View File

@ -24,7 +24,7 @@ class PamImplant(Implant):
self.log = log
def escalate(self, session: "pwncat.manager.Session"):
""" Escalate to root with the pam implant """
"""Escalate to root with the pam implant"""
try:
session.platform.su("root", password=self.password)
@ -32,7 +32,7 @@ class PamImplant(Implant):
raise ModuleFailed(str(exc)) from exc
def remove(self, session: "pwncat.manager.Session"):
""" Remove the installed implant """
"""Remove the installed implant"""
if session.current_user().id != 0:
raise ModuleFailed("root permissions required to remove pam module")
@ -85,7 +85,7 @@ class Module(ImplantModule):
}
def install(self, session: "pwncat.manager.Session", password, log):
""" install the pam module """
"""install the pam module"""
if session.current_user().id != 0:
raise ModuleFailed("root permissions required to install pam module")

View File

@ -9,7 +9,7 @@ from pwncat.modules.implant import ImplantModule
class PasswdImplant(Implant):
""" Implant tracker for a user added directly to /etc/passwd """
"""Implant tracker for a user added directly to /etc/passwd"""
def __init__(self, source, user, password, added_line):
super().__init__(source=source, types=["implant.replace"], uid=0)
@ -19,7 +19,7 @@ class PasswdImplant(Implant):
self.added_line = added_line
def escalate(self, session: "pwncat.manager.Session"):
""" Escalate privileges to the fake root account """
"""Escalate privileges to the fake root account"""
try:
session.platform.su(self.user, password=self.password)
@ -28,7 +28,7 @@ class PasswdImplant(Implant):
raise ModuleFailed(f"authentication as {self.user} failed")
def remove(self, session: "pwncat.manager.Session"):
""" Remove the added line """
"""Remove the added line"""
if session.platform.getuid() != 0:
raise ModuleFailed("removal requires root privileges")
@ -50,7 +50,7 @@ class PasswdImplant(Implant):
class Module(ImplantModule):
""" Add a user to /etc/passwd with a known password and UID/GID of 0. """
"""Add a user to /etc/passwd with a known password and UID/GID of 0."""
TYPE = ImplantType.REPLACE
PLATFORM = [Linux]
@ -74,7 +74,7 @@ class Module(ImplantModule):
backdoor_pass,
shell,
):
""" Add the new user """
"""Add the new user"""
if session.current_user().id != 0:
raise ModuleFailed("installation required root privileges")

View File

@ -16,7 +16,7 @@ class DomainObject(Fact):
self.sid = sid
def __getitem__(self, name: str):
""" Shortcut for getting properties from the `self.domain` property. """
"""Shortcut for getting properties from the `self.domain` property."""
return self.domain[name]
@ -36,14 +36,14 @@ class DomainObject(Fact):
class Module(EnumerateModule):
""" Retrieve domain membership information """
"""Retrieve domain membership information"""
PLATFORM = [Windows]
PROVIDES = ["domain.details"]
SCHEDULE = Schedule.ONCE
def enumerate(self, session: "pwncat.manager.Session"):
""" Perform enumeration """
"""Perform enumeration"""
# Ensure we have PowerView loaded
yield Status("loading powersploit recon")

View File

@ -15,7 +15,7 @@ class ComputerObject(Fact):
self.computer = data
def __getitem__(self, name: str):
""" Shortcut for getting properties from the `self.domain` property. """
"""Shortcut for getting properties from the `self.domain` property."""
return self.computer[name]
@ -23,14 +23,14 @@ class ComputerObject(Fact):
return f"[blue]{self['dnshostname']}[/blue] ([cyan]{self['name']}[/cyan])"
def is_dc(self):
""" Query if this computer object is a domain controller """
"""Query if this computer object is a domain controller"""
uac = self.computer.get("useraccountcontrol") or 0
return (uac & 0x2000) > 0
def is_rodc(self):
""" Query if this computer object is a read only domain controller """
"""Query if this computer object is a read only domain controller"""
uac = self.computer.get("useraccountcontrol") or 0
@ -57,14 +57,14 @@ class ComputerObject(Fact):
class Module(EnumerateModule):
""" Retrieve information on all domain computers """
"""Retrieve information on all domain computers"""
PLATFORM = [Windows]
PROVIDES = ["domain.computer"]
SCHEDULE = Schedule.ONCE
def enumerate(self, session: "pwncat.manager.Session"):
""" Perform enumeration """
"""Perform enumeration"""
# Check that we are in a domain
if not session.run("enumerate", types=["domain.details"]):

View File

@ -7,14 +7,14 @@ from pwncat.modules.enumerate import Schedule, EnumerateModule
class Module(EnumerateModule):
""" Retrieve information on all domain computers """
"""Retrieve information on all domain computers"""
PLATFORM = [Windows]
PROVIDES = ["domain.fileserver"]
SCHEDULE = Schedule.ONCE
def enumerate(self, session: "pwncat.manager.Session"):
""" Perform enumeration """
"""Perform enumeration"""
# Check that we are in a domain
if not session.run("enumerate", types=["domain.details"]):

View File

@ -9,7 +9,7 @@ from pwncat.modules.enumerate import Schedule, EnumerateModule
class DomainGroup(WindowsGroup):
""" Builds on Windows Groups to add domain specific information """
"""Builds on Windows Groups to add domain specific information"""
def __init__(self, source: str, domain: str, data: Dict, members: List[str]):
super().__init__(
@ -48,14 +48,14 @@ class DomainGroup(WindowsGroup):
class Module(EnumerateModule):
""" Retrieve information on all domain computers """
"""Retrieve information on all domain computers"""
PLATFORM = [Windows]
PROVIDES = ["domain.group", "group"]
SCHEDULE = Schedule.ONCE
def enumerate(self, session: "pwncat.manager.Session"):
""" Perform enumeration """
"""Perform enumeration"""
# Check that we are in a domain
if not session.run("enumerate", types=["domain.details"]):

View File

@ -15,7 +15,7 @@ class SiteObject(Fact):
self.site = data
def __getitem__(self, name: str):
""" Shortcut for getting properties from the `self.site` property. """
"""Shortcut for getting properties from the `self.site` property."""
return self.site[name]
@ -24,14 +24,14 @@ class SiteObject(Fact):
class Module(EnumerateModule):
""" Retrieve information on all domain computers """
"""Retrieve information on all domain computers"""
PLATFORM = [Windows]
PROVIDES = ["domain.site"]
SCHEDULE = Schedule.ONCE
def enumerate(self, session: "pwncat.manager.Session"):
""" Perform enumeration """
"""Perform enumeration"""
# Check that we are in a domain
if not session.run("enumerate", types=["domain.details"]):

View File

@ -10,7 +10,7 @@ from pwncat.modules.enumerate import Schedule, EnumerateModule
class DomainUser(WindowsUser):
""" Builds on Windows Groups to add domain specific information """
"""Builds on Windows Groups to add domain specific information"""
def __init__(
self,
@ -72,14 +72,14 @@ class DomainUser(WindowsUser):
class Module(EnumerateModule):
""" Retrieve information on all domain computers """
"""Retrieve information on all domain computers"""
PLATFORM = [Windows]
PROVIDES = ["domain.user", "user"]
SCHEDULE = Schedule.ONCE
def enumerate(self, session: "pwncat.manager.Session"):
""" Perform enumeration """
"""Perform enumeration"""
# Check that we are in a domain
if not session.run("enumerate", types=["domain.details"]):

View File

@ -11,7 +11,7 @@ from pwncat.modules.enumerate import Schedule, EnumerateModule
class ProcessData(Fact):
""" Remote process information """
"""Remote process information"""
def __init__(
self,
@ -42,7 +42,7 @@ class ProcessData(Fact):
self.owner = None
def kill(self, session):
""" Attempt to kill the process """
"""Attempt to kill the process"""
try:
session.platform.powershell(f"(Get-Process -Id {self.pid}).Kill()")
@ -71,7 +71,7 @@ class ProcessData(Fact):
raise PermissionError(f"cannot wait for process w/ pid {self.pid}")
def title(self, session):
""" Build a formatted description for this process """
"""Build a formatted description for this process"""
out = "[cyan]{name}[/cyan] (PID [blue]{pid}[/blue]) is {state} "
@ -107,7 +107,7 @@ class ProcessData(Fact):
class Module(EnumerateModule):
""" Retrieve a list of current processes running on the target """
"""Retrieve a list of current processes running on the target"""
PROVIDES = ["system.processes"]
PLATFORM = [Windows]

View File

@ -8,7 +8,7 @@ from pwncat.modules.enumerate import Schedule, EnumerateModule
class Module(EnumerateModule):
""" Enumerate users from a windows target """
"""Enumerate users from a windows target"""
PROVIDES = ["user"]
PLATFORM = [Windows]

View File

@ -15,7 +15,7 @@ class Module(EnumerateModule):
SCHEDULE = Schedule.ONCE
def enumerate(self, session: "pwncat.manager.Session"):
""" Yield WindowsGroup objects """
"""Yield WindowsGroup objects"""
try:
groups = session.platform.powershell("Get-LocalGroup")

View File

@ -123,7 +123,7 @@ class Path:
return False
def stat(self) -> os.stat_result:
""" Request file stat details """
"""Request file stat details"""
if self._stat is not None:
return self._stat
@ -142,7 +142,7 @@ class Path:
self._target.chmod(str(self), mode)
def exists(self) -> bool:
""" Test if the path exists on the target system """
"""Test if the path exists on the target system"""
try:
self.stat()
@ -526,7 +526,7 @@ class Platform(ABC):
@property
def manager(self):
""" Shortcut to accessing the manager """
"""Shortcut to accessing the manager"""
return self.session.manager
def interactive_loop(self, interactive_complete: "threading.Event"):
@ -566,11 +566,11 @@ class Platform(ABC):
@abstractmethod
def exit(self):
""" Exit this session """
"""Exit this session"""
@abstractmethod
def refresh_uid(self) -> Union[int, str]:
""" Refresh the cached UID of the current session. """
"""Refresh the cached UID of the current session."""
@abstractmethod
def getuid(self) -> Union[int, str]:

View File

@ -495,7 +495,7 @@ class Linux(Platform):
PROMPTS = {
"sh": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""",
"dash": """'$(command printf "(remote) $(whoami)@$(hostname):$PWD\\$ ")'""",
"zsh": """'%B%F{red}(remote) %B%F{yellow}%n@%M%B%F{reset}:%B%F{cyan}%(6~.%-1~/…/%4~.%5~)%B%(#.%b%F{white}#.%b%F{white}$)%b%F{reset} '""",
"zsh": """'%B%F{red}(remote) %B%F{yellow}%n@%M%B%F{reset}:%B%F{cyan}$PWD%B%(#.%b%F{white}#.%b%F{white}$)%b%F{reset} '""",
"default": """'$(command printf "\\[\\033[01;31m\\](remote)\\[\\033[0m\\] \\[\\033[01;33m\\]$(whoami)@$(hostname)\\[\\033[0m\\]:\\[\\033[1;36m\\]$PWD\\[\\033[0m\\]\\$ ")'""",
}
@ -573,7 +573,7 @@ class Linux(Platform):
self.refresh_uid()
def exit(self):
""" Exit this session """
"""Exit this session"""
self.channel.send(b"exit\n")
@ -776,7 +776,7 @@ class Linux(Platform):
raise PlatformError(str(exc)) from exc
def getuid(self):
""" Retrieve the current cached uid """
"""Retrieve the current cached uid"""
return self._uid
def getenv(self, name: str):
@ -1731,7 +1731,7 @@ class Linux(Platform):
self.run(["chmod", oct(mode)[2:], path])
def chown(self, path: str, uid: int, gid: int):
""" Change ownership of a file """
"""Change ownership of a file"""
try:
self.run(["chown", f"{uid}:{gid}", path], check=True)

View File

@ -221,7 +221,7 @@ class DotNetPlugin(object):
return functools.partial(self.run, key)
def run(self, method: str, *args):
""" Execute a method within the plugin """
"""Execute a method within the plugin"""
return self.platform.run_method("Reflection", "call", self.ident, method, args)
@ -415,7 +415,7 @@ class PopenWindows(pwncat.subprocess.Popen):
@dataclass
class BuiltinPluginInfo:
""" Tells pwncat where to find a builtin plugin """
"""Tells pwncat where to find a builtin plugin"""
name: str
""" A friendly name used when loading the plugin """
@ -541,7 +541,7 @@ class Windows(Platform):
self.run_method("StageTwo", "exit")
def parse_response(self, data: bytes):
""" Parse a line of data from the C2 """
"""Parse a line of data from the C2"""
with gzip.GzipFile(
fileobj=BytesIO(base64.b64decode(data.decode("utf-8").strip())),
@ -1432,7 +1432,7 @@ function prompt {
return False
def revert_to_self(self):
""" Revert any impersonations and return to the original user """
"""Revert any impersonations and return to the original user"""
return self.impersonate(0)

View File

@ -108,7 +108,7 @@ class Popen:
"""
def terminate(self):
""" Stop the child. """
"""Stop the child."""
def kill(self):
""" Kills the child """
"""Kills the child"""

View File

@ -13,7 +13,7 @@ from BTrees.OOBTree import OOBTree
class NAT(enum.Enum):
""" Indicates the current known state of NAT on the target host """
"""Indicates the current known state of NAT on the target host"""
UNKNOWN = enum.auto()
""" We currently don't have enough information to determine if NAT is used """

View File

@ -27,7 +27,7 @@ STORED_TERM_STATE = []
class State(Enum):
""" The current PtyHandler state """
"""The current PtyHandler state"""
NORMAL = auto()
RAW = auto()
@ -36,7 +36,7 @@ class State(Enum):
class Access(Flag):
""" Check if you are able to read/write/execute a file """
"""Check if you are able to read/write/execute a file"""
NONE = 0
EXISTS = auto()
@ -63,7 +63,7 @@ class Init(Enum):
class CommandSystemExit(Exception):
""" A command has requested that we exit pwncat (mostly used for exit command) """
"""A command has requested that we exit pwncat (mostly used for exit command)"""
class CompilationError(Exception):
@ -99,7 +99,7 @@ class RawModeExit(Exception):
def strip_markup(styled_text: str) -> str:
""" Strip rich markup from text """
"""Strip rich markup from text"""
text = markup.render(styled_text)
return text.plain
@ -221,7 +221,7 @@ def copyfileobj(src, dst, callback, nomv=False):
def random_string(length: int = 8):
""" Create a random alphanumeric string """
"""Create a random alphanumeric string"""
return random.choice(string.ascii_letters) + "".join(
random.choice(ALPHANUMERIC) for _ in range(length - 1)
)
@ -299,7 +299,7 @@ def pop_term_state():
def restore_terminal(state, new_line=True):
""" restore the stdio state from the result of "enter_raw_mode" """
"""restore the stdio state from the result of "enter_raw_mode" """
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, state[0])
# tty.setcbreak(sys.stdin)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, state[1])