Skip to content

Commit

Permalink
test time (#120)
Browse files Browse the repository at this point in the history
More groundwork for #39
  • Loading branch information
0xricksanchez authored Oct 25, 2022
1 parent 6247319 commit 7b29359
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 21 deletions.
9 changes: 9 additions & 0 deletions examples/like_dbg_confs/echo_module_arm64.ini
Original file line number Diff line number Diff line change
@@ -0,0 1,9 @@
[general]
arch = arm64

[kernel_builder]
## Provide a path to a parent directory that houses custom kernel modules (see the example)
custom_modules = examples/c_kmod/

[kernel_dl]
tag = 5.15
7 changes: 2 additions & 5 deletions src/debuggee.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 49,9 @@ def infer_panic_behavior(self) -> int:
return 0
elif "wait" in self.panic:
try:
ret = self.panic.split(" ")[1]
if not ret:
return 15
ret = int(self.panic.split(" ")[1])
return ret
except IndexError:
except (IndexError, ValueError):
return 15
else:
logger.error("Unknown requested panic behavior...")
Expand All @@ -72,7 70,6 @@ def run_container(self):
self.cmd = f"qemu-system-{self.qemu_arch} -m {self.memory} -smp {self.smp} -kernel {kernel}"
if self.qemu_arch == "aarch64":
self.cmd = " -cpu cortex-a72"
# self._add_smep_smap()
self.cmd = ' -machine type=virt -append "console=ttyAMA0 root=/dev/vda'
elif self.qemu_arch == "x86_64":
self.cmd = " -cpu qemu64"
Expand Down
33 changes: 20 additions & 13 deletions src/debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 20,39 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
user_cfg = kwargs.get("user_cfg", "")
cfg_setter(self, ["general", "debugger"], user_cfg, exclude_keys=["kernel_root"])
ctf_ctx = kwargs.get("ctf_ctx", False)
if ctf_ctx:
self.ctf_kernel = Path(kwargs.get("ctf_kernel", ""))
self.project_dir = Path.cwd() / self.ctf_dir
vmlinux = Path(self.project_dir) / "vmlinux"
if not vmlinux.exists() or b"ELF" not in sp.run(f"file {vmlinux}", shell=True, capture_output=True).stdout:
self._extract_vmlinux()
if kwargs.get("ctf_ctx", False):
self.ctf = True
self._set_ctf_ctx(kwargs)
else:
self.ctf = False
self.project_dir = Path.cwd() / self.kernel_root
self.ctf = 1 if ctf_ctx else 0
self.custom_gdb_script = Path("/home/") / self.user / Path(self.gdb_script).name
self.script_logging = "set -e" if kwargs.get("log_level", "INFO") == "INFO" else "set -eux"
self.skip_prompts = kwargs.get("skip_prompts", False)

def _extract_vmlinux(self) -> None:
def _set_ctf_ctx(self, kwargs) -> None:
self.ctf_kernel = Path(kwargs.get("ctf_kernel", ""))
self.project_dir = Path(self.ctf_dir).resolve().absolute()
vmlinux = Path(self.project_dir) / "vmlinux"
if not vmlinux.exists() or b"ELF" not in sp.run(f"file {vmlinux}", shell=True, capture_output=True).stdout:
if self._extract_vmlinux():
exit(-1)

def _extract_vmlinux(self) -> int:
vml_ext = Path(glob("**/extract-vmlinux.sh", recursive=True)[0]).resolve().absolute()
pkernel = self.ctf_kernel.resolve().absolute()
with new_context(self.ctf_dir):
vml_ext = glob("**/extract*", recursive=True)[0]
ret = sp.run(f"./{vml_ext} {Path(*self.ctf_kernel.parts[1:])} > vmlinux", shell=True, capture_output=True)
cmd = f"{vml_ext} {pkernel} > vmlinux"
ret = sp.run(f"{cmd}", shell=True, capture_output=True)
if ret.returncode == 0:
logger.info("Successfully extracted 'vmlinux' from compressed kernel")
return 0
else:
logger.error("Failed to extract 'vmlinux'")
exit(-1)
return 1

def run_container(self) -> None:
entrypoint = f'/bin/bash -c "{self.script_logging}; . /home/{self.user}/debugger.sh -a {self.arch} -p {self.docker_mnt} -c {self.ctf} -g {self.custom_gdb_script}"'
entrypoint = f'/bin/bash -c "{self.script_logging}; . /home/{self.user}/debugger.sh -a {self.arch} -p {self.docker_mnt} -c {int(self.ctf)} -g {self.custom_gdb_script}"'
runner = f'docker run -it --rm --security-opt seccomp=unconfined --cap-add=SYS_PTRACE -v {self.project_dir}:/io --net="host" {self.tag} {entrypoint}'
tmux("selectp -t 2")
tmux_shell(runner)
Expand Down
2 changes: 1 addition & 1 deletion src/rootfs_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 15,7 @@ class RootFSBuilder(DockerRunner):
def __init__(self, partial_run: bool = False, **kwargs) -> None:
super().__init__(**kwargs)
user_cfg = kwargs.get("user_cfg", "")
cfg_setter(self, ["rootfs_general", "rootfs_builder", "general"], user_cfg)
cfg_setter(self, ["general", "rootfs_general", "rootfs_builder"], user_cfg)
self.partial = partial_run
self.fs_name = self.rootfs_base self.arch self.rootfs_ftype
self.rootfs_path = self.rootfs_dir self.fs_name
Expand Down
Binary file added src/tests/files/testKernel_packed
Binary file not shown.
211 changes: 211 additions & 0 deletions src/tests/test_debuggee.py
Original file line number Diff line number Diff line change
@@ -0,0 1,211 @@
from pathlib import Path

from src.debuggee import Debuggee
from unittest.mock import patch, MagicMock
import pytest


@patch("subprocess.run")
def test_infer_qemu_fs_mount_cpio(sp_mock) -> None:
d = Debuggee(**{"kroot": "foo"})
mock = MagicMock()
mock.configure_mock(**{"stdout": b"A cpio archive dummy archive"})
sp_mock.return_value = mock
assert d.infer_qemu_fs_mount() == f" -initrd {d.rootfs}"


@patch("subprocess.run")
def test_infer_qemu_fs_mount_filesystem(sp_mock) -> None:
d = Debuggee(**{"kroot": "foo"})
mock = MagicMock()
mock.configure_mock(**{"stdout": b"Some filesystem data..."})
sp_mock.return_value = mock
assert d.infer_qemu_fs_mount() == f" -drive file={d.rootfs},format=raw"


@patch("subprocess.run")
def test_infer_qemu_fs_mount_error(sp_mock) -> None:
d = Debuggee(**{"kroot": "foo"})
mock = MagicMock()
mock.configure_mock(**{"stdout": b"foo bar baz"})
sp_mock.return_value = mock
with pytest.raises(SystemExit) as ext:
d.infer_qemu_fs_mount()
assert ext.type == SystemExit
assert ext.value.code == -1


@patch("subprocess.run")
def test_infer_qemu_fs_mount_cpio_ctf(sp_mock) -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": True})
mock = MagicMock()
mock.configure_mock(**{"stdout": b"A cpio archive dummy archive"})
sp_mock.return_value = mock
assert d.infer_qemu_fs_mount() == f" -initrd {d.rootfs.name}"


def test_assert_ctf_ctx_mode() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": True, "ctf_mount": "/foo", "ctf_kernel": "/a/path", "ctf_fs": "/another/path"})
assert d.ctf is True
assert d.ctf_mount == "/foo"
assert d.kernel == Path("/a/path")
assert d.rootfs == Path("/another/path")


@patch("src.misc.cfg_setter", return_value=None)
def test_assert_normal_mode(self) -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.kernel_root = "/foo"
d.arch = "x86_64"
d.rootfs_ftype = ""
assert d.ctf is False
assert d.kernel == Path(f"{d.docker_mnt}/{d.kernel_root}/arch/{d.arch}/boot/Image")
assert d.rootfs == Path(f"{d.docker_mnt}/{d.rootfs_dir}/{d.rootfs_base d.arch d.rootfs_ftype}")


def test_infer_panic_behavior_panic() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.panic = "reboot"
assert d.infer_panic_behavior() == -1


def test_infer_panic_behavior_halt() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.panic = "halt"
assert d.infer_panic_behavior() == 0


def test_infer_panic_behavior_wait_90() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.panic = "wait 90"
assert d.infer_panic_behavior() == 90


def test_infer_panic_behavior_wait_split_fail() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.panic = "wait"
assert d.infer_panic_behavior() == 15


def test_infer_panic_behavior_wait_conversion_fail() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.panic = "wait a"
assert d.infer_panic_behavior() == 15


def test_infer_panic_behavior_wait_unknown() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.panic = "foo"
with pytest.raises(SystemExit) as ext:
d.infer_panic_behavior()
assert ext.type == SystemExit
assert ext.value.code == -1


def test_add_smep() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.smep = True
d.smap = False
tmp = "some cmd -cpu foo"
d.cmd = tmp
d._add_smep_smap()
assert d.cmd == f"{tmp}, smep"


def test_add_smap() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.smep = False
d.smap = True
tmp = "some cmd -cpu foo"
d.cmd = tmp
d._add_smep_smap()
assert d.cmd == f"{tmp}, smap"


def test_add_smep_smap() -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.smep = True
d.smap = True
tmp = "some cmd -cpu foo"
d.cmd = tmp
d._add_smep_smap()
assert d.cmd == f"{tmp}, smep, smap"


@patch("src.misc.tmux")
@patch("src.misc.tmux_shell")
@patch("termios.tcflush", return_value=True)
@patch("builtins.input", lambda *args: "y")
@patch.object(Debuggee, "infer_qemu_fs_mount", return_value=" -initrd /foo/bar.cpio")
def test_run_x86_all_mitigations_kvm_gdb(tmock, tsmock, infer_mock, flush_mock) -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.kaslr = True
d.smep = True
d.smap = True
d.kpti = True
d.kvm = True
d.arch = "x86_64"
d.kernel = "/some/kernel/Image"
d.rootfs = Path("/foo/rootfs")
d.run()
assert (
d.cmd
== 'qemu-system-x86_64 -m 1024 -smp 1 -kernel /some/kernel/Image -cpu qemu64, smep, smap -append "console=ttyS0 root=/dev/sda earlyprintk=serial net.ifnames=0 kaslr pti=on oops=panic panic=0" -initrd /foo/bar.cpio -net user,host=10.0.2.10,hostfwd=tcp:127.0.0.1:10021-:22 -net nic,model=e1000 -nographic -pidfile vm.pid -enable-kvm -S -s'
)


@patch("src.misc.tmux")
@patch("src.misc.tmux_shell")
@patch("termios.tcflush", return_value=True)
@patch("builtins.input", lambda *args: "y")
@patch.object(Debuggee, "infer_qemu_fs_mount", return_value=" -initrd /foo/bar.cpio")
def test_run_x86_no_mitigations_kvm_gdb(tmock, tsmock, infer_mock, flush_mock) -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.kaslr = False
d.smep = False
d.smap = False
d.kpti = False
d.kvm = False
d.gdb = False
d.arch = "x86_64"
d.kernel = "/some/kernel/Image"
d.rootfs = Path("/foo/rootfs")
d.run()
assert (
d.cmd
== 'qemu-system-x86_64 -m 1024 -smp 1 -kernel /some/kernel/Image -cpu qemu64 -append "console=ttyS0 root=/dev/sda earlyprintk=serial net.ifnames=0 nokaslr nosmep nosmap nopti oops=panic panic=0" -initrd /foo/bar.cpio -net user,host=10.0.2.10,hostfwd=tcp:127.0.0.1:10021-:22 -net nic,model=e1000 -nographic -pidfile vm.pid'
)


@patch("src.misc.tmux")
@patch("src.misc.tmux_shell")
@patch("termios.tcflush", return_value=True)
@patch("builtins.input", lambda *args: "y")
@patch.object(Debuggee, "infer_qemu_fs_mount", return_value=" -initrd /foo/bar.cpio")
def test_run_arm_no_mitigations_kvm_on(tmock, tsmock, infer_mock, flush_mock) -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.kaslr = False
d.smep = False
d.smap = False
d.kpti = False
d.kvm = True
d.gdb = False
d.qemu_arch = "aarch64"
d.kernel = "/some/kernel/Image"
d.rootfs = Path("/foo/rootfs")
d.run()
assert (
d.cmd
== 'qemu-system-aarch64 -m 1024 -smp 1 -kernel /some/kernel/Image -cpu cortex-a72 -machine type=virt -append "console=ttyAMA0 root=/dev/vda earlyprintk=serial net.ifnames=0 nokaslr nosmep nosmap nopti oops=panic panic=0" -initrd /foo/bar.cpio -net user,host=10.0.2.10,hostfwd=tcp:127.0.0.1:10021-:22 -net nic,model=e1000 -nographic -pidfile vm.pid'
)


@patch("termios.tcflush", return_value=True)
@patch("builtins.input", lambda *args: "y")
def test_run_unknown_arch(flush_mock) -> None:
d = Debuggee(**{"kroot": "foo", "ctf_ctx": False})
d.qemu_arch = "foobar"
with pytest.raises(SystemExit) as ext:
d.run()
assert ext.type == SystemExit
assert ext.value.code == -1
92 changes: 92 additions & 0 deletions src/tests/test_debugger.py
Original file line number Diff line number Diff line change
@@ -0,0 1,92 @@
from pathlib import Path
from src.debugger import GDB_SCRIPT_HIST, Debugger
from unittest.mock import patch
import hashlib

TPATH = Path("/tmp/.hist")
TPATH_NEW = Path("/tmp/.hist_new")
PACKED_KERNEL = Path("src/tests/files/testKernel_packed")


def test_is_gdb_script_success() -> None:
d = Debugger(**{"kroot": "foo"})
GDB_SCRIPT_HIST.touch()
assert d._is_gdb_script_hist() is True


@patch("src.debugger.GDB_SCRIPT_HIST", Path("/tmp/.fake_file"))
def test_is_gdb_script_fail() -> None:
d = Debugger(**{"kroot": "foo"})
assert d._is_gdb_script_hist() is False


@patch("src.debugger.GDB_SCRIPT_HIST", TPATH)
def test_handle_gdb_change_update_existing() -> None:
d = Debugger(**{"kroot": "foo"})
d.force_rebuild = False
TPATH.touch()
d.gdb_script = Path("/tmp/.hist2")
d.gdb_script.touch()
cntn = "start\nbreak *0xdeadbeef\ncontinue"
d.gdb_script.write_text(cntn)
d._handle_gdb_change()
assert d.force_rebuild is True
assert TPATH.read_text() == hashlib.sha256(cntn.encode()).hexdigest()
TPATH.unlink()
d.gdb_script.unlink()


@patch("src.debugger.GDB_SCRIPT_HIST", TPATH_NEW)
def test_handle_gdb_change_new() -> None:
d = Debugger(**{"kroot": "foo"})
d.force_rebuild = False
d.gdb_script = Path("/tmp/.hist2")
d.gdb_script.touch()
cntn = "start\nbreak *0xdeadbeef\ncontinue"
d.gdb_script.write_text(cntn)
d._handle_gdb_change()
assert d.force_rebuild is False
assert TPATH_NEW.read_text() == hashlib.sha256(cntn.encode()).hexdigest()
d.gdb_script.unlink()
TPATH_NEW.unlink()


def test_set_ctf_ctx_in_init(tmp_path) -> None:
d = Debugger(**{"kroot": "foo", "ctf_ctx": True, "ctf_dir": tmp_path, "ctf_kernel": PACKED_KERNEL})
assert d.ctf is True


def test_set_ctf_ctx(tmp_path) -> None:
d = Debugger(**{"kroot": "foo"})
d.ctf_dir = tmp_path
d._set_ctf_ctx({"ctf_kernel": PACKED_KERNEL})
assert d.ctf is False


def test_extract_vmlinux_success(tmp_path) -> None:
d = Debugger(**{"kroot": "foo"})
d.ctf_kernel = PACKED_KERNEL
d.ctf_dir = tmp_path
assert d._extract_vmlinux() == 0


def test_extract_vmlinux_fail(tmp_path) -> None:
d = Debugger(**{"kroot": "foo"})
p = Path(tmp_path / "fake_kernel")
p.touch()
d.ctf_kernel = p
d.ctf_dir = tmp_path
assert d._extract_vmlinux() == 1


@patch("src.misc.tmux", return_value=None)
@patch("src.misc.tmux_shell")
@patch("termios.tcflush", return_value=True)
@patch("builtins.input", lambda *args: "y")
def test_run_container(tflush, tsmock, tmock) -> None:
d = Debugger(**{"kroot": "foo"})
d.project_dir = "/some/project_dir"
d.tag = "tag"
d.run_container()
expected = f"send-keys 'docker run -it --rm --security-opt seccomp=unconfined --cap-add=SYS_PTRACE -v {d.project_dir}:{d.docker_mnt} --net=\"host\" {d.tag} /bin/bash -c \"set -e; . /home/user/debugger.sh -a {d.arch} -p /io -c 0 -g /home/user/gdb_script\"' 'C-m'"
tmock.assert_called_with(expected)
Loading

0 comments on commit 7b29359

Please sign in to comment.