mirror of
https://github.com/calebstewart/pwncat.git
synced 2024-11-24 01:25:37 +01:00
Fixed linux su; added more tests
This commit is contained in:
parent
8dccfdff77
commit
2212be9751
@ -2,9 +2,8 @@
|
||||
from typing import Optional
|
||||
|
||||
import rich.markup
|
||||
from persistent.list import PersistentList
|
||||
|
||||
from pwncat.db.fact import Fact
|
||||
from persistent.list import PersistentList
|
||||
|
||||
|
||||
class Group(Fact):
|
||||
|
@ -145,10 +145,12 @@ class Session:
|
||||
):
|
||||
return group
|
||||
|
||||
def iter_groups(self):
|
||||
def iter_groups(self, members: Optional[List[Union[str, int]]] = None):
|
||||
""" Iterate over groups for the target """
|
||||
|
||||
yield from self.run("enumerate.gather", progress=False, types=["group"])
|
||||
for group in self.run("enumerate.gather", progress=False, types=["group"]):
|
||||
if members is None or any(m in group.members for m in members):
|
||||
yield group
|
||||
|
||||
def register_fact(self, fact: "pwncat.db.Fact"):
|
||||
"""Register a fact with this session's target. This is useful when
|
||||
|
@ -1,42 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from pwncat.facts import EscalationSpawn
|
||||
from pwncat.channel import ChannelError
|
||||
from pwncat.modules import ModuleFailed
|
||||
from pwncat.modules.enumerate import Schedule, EnumerateModule
|
||||
from pwncat.platform.linux import Linux
|
||||
|
||||
|
||||
class TestNewSSHSession(EscalationSpawn):
|
||||
""" Escalation via SSH as root """
|
||||
|
||||
def __init__(self, source):
|
||||
super().__init__(source=source, source_uid=1000, uid=1001)
|
||||
|
||||
def escalate(self, session: "pwncat.manager.Manager") -> "pwncat.manager.Session":
|
||||
|
||||
try:
|
||||
new_session = session.manager.create_session(
|
||||
"linux",
|
||||
host="pwncat-ubuntu",
|
||||
user="john",
|
||||
identity="/home/caleb/.ssh/id_rsa",
|
||||
)
|
||||
except ChannelError as exc:
|
||||
raise ModuleFailed(str(exc)) from exc
|
||||
|
||||
return new_session
|
||||
|
||||
def title(self, session):
|
||||
return "ssh to [cyan]pwncat-ubuntu[cyan] as [blue]john[/blue]"
|
||||
|
||||
|
||||
class Module(EnumerateModule):
|
||||
""" Test enumeration to provide a EscalationSpawn fact """
|
||||
|
||||
PROVIDES = ["escalate.spawn"]
|
||||
SCHEDULE = Schedule.ONCE
|
||||
PLATFORM = [Linux]
|
||||
|
||||
def enumerate(self, session):
|
||||
yield TestNewSSHSession(self.name)
|
@ -145,7 +145,7 @@ class Path:
|
||||
"""Returns the name of the group owning the file. KeyError is raised
|
||||
if the file's GID isn't found in the system database."""
|
||||
|
||||
return self._target.find_group(id=self.stat().st_gid).name
|
||||
return self._target.session.find_group(id=self.stat().st_gid).name
|
||||
|
||||
def is_dir(self) -> bool:
|
||||
"""Returns True if the path points to a directory (or a symbolic link
|
||||
@ -278,7 +278,7 @@ class Path:
|
||||
"""Return the name of the user owning the file. KeyError is raised if
|
||||
the file's uid is not found in the System database"""
|
||||
|
||||
return self._target.find_user(id=self.stat().st_uid).name
|
||||
return self._target.session.find_user(id=self.stat().st_uid).name
|
||||
|
||||
def read_bytes(self) -> bytes:
|
||||
"""Return the binary contents of the pointed-to file as a bytes object"""
|
||||
|
@ -150,7 +150,10 @@ class PopenLinux(pwncat.subprocess.Popen):
|
||||
time.sleep(0.1)
|
||||
|
||||
# Flush more data to look for the EOF
|
||||
self.stdout_raw.read1(4096)
|
||||
try:
|
||||
self.stdout_raw.read1(4096)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
|
||||
return self.returncode
|
||||
|
||||
@ -1299,17 +1302,15 @@ class Linux(Platform):
|
||||
proc.stdin.write(password + "\n")
|
||||
proc.stdin.flush()
|
||||
|
||||
# Retrieve the response (this may take some time if wrong)
|
||||
result = proc.stdout.readline().lower()
|
||||
# line from when we pressed enter above
|
||||
proc.stdout.readline()
|
||||
|
||||
proc.stdin.write("\n")
|
||||
proc.stdin.flush()
|
||||
|
||||
if result.strip() == "":
|
||||
result = proc.stdout.readline().lower()
|
||||
# check the next few bytes; we have to go around the proc.stdin
|
||||
# because we need a peek with a timeout.
|
||||
result = self.channel.peek(10, timeout=5)
|
||||
|
||||
# Check for keywords indicating failure
|
||||
if "fail" in result or "incorrect" in result:
|
||||
if b"su: " in result.lower():
|
||||
|
||||
try:
|
||||
# The call failed, wait for the result
|
||||
@ -1631,7 +1632,10 @@ class Linux(Platform):
|
||||
"""Set or retrieve the current umask value"""
|
||||
|
||||
if mask is None:
|
||||
return int(self.run(["umask"], capture_output=True, text=True).stdout, 8)
|
||||
return int(
|
||||
self.run(["umask"], capture_output=True, text=True, check=True).stdout,
|
||||
8,
|
||||
)
|
||||
|
||||
self.run(["umask", oct(mask)[2:]])
|
||||
return mask
|
||||
|
@ -1,4 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import subprocess
|
||||
|
||||
import pytest
|
@ -1,10 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
import io
|
||||
import os
|
||||
import base64
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from pwncat.util import random_string
|
||||
from pwncat.platform.windows import PowershellError
|
||||
|
||||
|
||||
def test_platform_file_io(session):
|
||||
@ -97,3 +99,22 @@ def test_platform_sudo(session):
|
||||
assert output != "john"
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
|
||||
def test_windows_powershell(windows):
|
||||
""" Test powershell execution """
|
||||
|
||||
# Run a real powershell snippet
|
||||
r = windows.platform.powershell("$PSVersionTable.PSVersion")
|
||||
assert len(r) == 1
|
||||
assert isinstance(r[0], dict)
|
||||
|
||||
# Ensure we get an exception
|
||||
with pytest.raises(PowershellError):
|
||||
windows.platform.powershell("CommandletDoes-NotExist")
|
||||
|
||||
# Run from a file descriptor
|
||||
filp = io.BytesIO(b"""$PSVersionTable.PSVersion""")
|
||||
r = windows.platform.powershell(filp)
|
||||
assert len(r) == 1
|
||||
assert isinstance(r[0], dict)
|
Loading…
Reference in New Issue
Block a user