Skip to content

Commit

Permalink
ipk: Expand reading API, add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
matteodelabre committed Mar 1, 2022
1 parent 9f7f644 commit 3f11cc7
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 42 deletions.
164 changes: 164 additions & 0 deletions tests/test_ipk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Copyright (c) 2022 The Toltec Contributors
# SPDX-License-Identifier: MIT

import unittest
import tarfile
from tempfile import TemporaryDirectory
import os
from toltec import ipk
from io import BytesIO


test_package = {
"epoch": 1337,
"metadata": "Test: OK\n",
"scripts": {"postinst": "echo OK"},
"data": [
{
"type": "file",
"path": "./file1",
"contents": "file 1\ntest\n",
"mode": 0o644,
},
{
"type": "dir",
"path": "./zsubfolder",
"mode": 0o755,
},
{
"type": "file",
"path": "./zsubfolder/file2",
"contents": "this is\nfile 2",
"mode": 0o400,
},
],
}


class TestIpk(unittest.TestCase):
def setUp(self):
# Create data tree
self.temp_dir = TemporaryDirectory()
temp = self.temp_dir.name

for member in test_package["data"]:
if member["type"] == "file":
with open(temp + "/" + member["path"], "w") as file:
file.write(member["contents"])

os.chmod(temp + "/" + member["path"], member["mode"])
else:
os.mkdir(temp + "/" + member["path"], member["mode"])

def tearDown(self):
self.temp_dir.cleanup()

def test_write_control(self):
output = BytesIO()
ipk.write_control(
output,
test_package["epoch"],
test_package["metadata"],
test_package["scripts"],
)
output.seek(0)

with tarfile.TarFile.open(fileobj=output) as archive:
members = archive.getmembers()
self.assertEqual(len(members), 3)

self.assertTrue(members[0].isdir())
self.assertEqual(members[0].name, ".")
self.assertEqual(members[0].size, 0)
self.assertEqual(members[0].mode, 0o755)

self.assertTrue(members[1].isfile())
self.assertEqual(members[1].name, "./control")
self.assertEqual(members[1].size, len(test_package["metadata"]))
self.assertEqual(members[1].mode, 0o644)
self.assertEqual(
archive.extractfile(members[1]).read().decode(),
test_package["metadata"],
)

self.assertTrue(members[2].isfile())
self.assertEqual(members[2].name, "./postinst")
self.assertEqual(
members[2].size, len(test_package["scripts"]["postinst"])
)
self.assertEqual(members[2].mode, 0o755)
self.assertEqual(
archive.extractfile(members[2]).read().decode(),
test_package["scripts"]["postinst"],
)

for member in members:
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
self.assertEqual(member.uname, "")
self.assertEqual(member.gname, "")
self.assertEqual(member.mtime, test_package["epoch"])

def test_write_data_empty(self):
output = BytesIO()
ipk.write_data(output, test_package["epoch"])
output.seek(0)

with tarfile.TarFile.open(fileobj=output) as archive:
self.assertEqual(archive.getmembers(), [])

def test_write_data_dir(self):
output = BytesIO()
ipk.write_data(output, test_package["epoch"], self.temp_dir.name)
output.seek(0)

with tarfile.TarFile.open(fileobj=output) as archive:
members = archive.getmembers()
self.assertEqual(len(members), 4)

self.assertTrue(members[0].isdir())
self.assertEqual(members[0].name, ".")
self.assertEqual(members[0].size, 0)
self.assertEqual(members[0].mode, 0o700)

for expect, member in zip(test_package["data"], members[1:]):
if expect["type"] == "file":
self.assertTrue(member.isfile())
self.assertEqual(member.name, expect["path"])
self.assertEqual(member.size, len(expect["contents"]))
self.assertEqual(member.mode, expect["mode"])
self.assertEqual(
archive.extractfile(member).read().decode(),
expect["contents"],
)
else:
self.assertTrue(member.isdir())
self.assertEqual(member.name, expect["path"])
self.assertEqual(member.size, 0)
self.assertEqual(member.mode, expect["mode"])

self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
self.assertEqual(member.uname, "")
self.assertEqual(member.gname, "")
self.assertEqual(member.mtime, test_package["epoch"])

def test_write_read(self):
output = BytesIO()
ipk.write(
output,
test_package["epoch"],
test_package["metadata"],
test_package["scripts"],
self.temp_dir.name,
)

output.seek(0)

with ipk.Reader(output) as result:
self.assertEqual(result.metadata, test_package["metadata"])
self.assertEqual(result.scripts, test_package["scripts"])
self.assertEqual(
result.data.getnames(),
["."] + [member["path"] for member in test_package["data"]],
)
2 changes: 1 addition & 1 deletion toltec/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def _archive(package: Package, pkg_dir: str, ar_path: str) -> None:
epoch = int(package.parent.timestamp.timestamp())

with open(ar_path, "wb") as file:
ipk.make_ipk(
ipk.write(
file,
epoch=epoch,
pkg_dir=pkg_dir,
Expand Down
140 changes: 103 additions & 37 deletions toltec/ipk.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
# Copyright (c) 2021 The Toltec Contributors
# SPDX-License-Identifier: MIT
"""Make ipk packages."""
"""Read and write ipk packages."""

from gzip import GzipFile
from typing import Dict, IO, Optional
from typing import Dict, IO, Optional, Type, Union
from types import TracebackType
from io import BytesIO
import tarfile
import operator
import os


def _targz_open(fileobj: IO[bytes], epoch: int) -> tarfile.TarFile:
"""
Open a gzip compressed tar archive for writing.
Modified from :func:`tarfile.TarFile.gzopen` to support
setting the `mtime` attribute on `GzipFile`.
"""
"""Open a gzip compressed tar archive for writing."""
# HACK: Modified code from `tarfile.TarFile.gzopen` to support
# setting the `mtime` attribute on `GzipFile`
gzipobj = GzipFile(
filename="", mode="wb", compresslevel=9, fileobj=fileobj, mtime=epoch
)
Expand Down Expand Up @@ -82,23 +80,24 @@ def _add_file(
archive.addfile(_clean_info(None, epoch, info), BytesIO(data))


def make_control(
def write_control(
file: IO[bytes], epoch: int, metadata: str, scripts: Dict[str, str]
) -> None:
"""
Create the control sub-archive.
Create the control sub-archive of an ipk package.
See <https://www.debian.org/doc/debian-policy/ch-controlfields.html>
and <https://www.debian.org/doc/debian-policy/ch-maintainerscripts.html>.
:param file: file to which the sub-archive will be written
:param epoch: fixed modification time to set
:param epoch: fixed modification time to set in the archive metadata
:param metadata: package metadata (main control file)
:param scripts: optional maintainer scripts
"""
with _targz_open(file, epoch) as archive:
root_info = tarfile.TarInfo("./")
root_info.type = tarfile.DIRTYPE
root_info.mode = 0o755
archive.addfile(_clean_info(None, epoch, root_info))

_add_file(archive, "control", 0o644, epoch, metadata.encode())
Expand All @@ -107,35 +106,42 @@ def make_control(
_add_file(archive, name, 0o755, epoch, script.encode())


def make_data(file: IO[bytes], epoch: int, pkg_dir: str) -> None:
def write_data(
file: IO[bytes],
epoch: int,
pkg_dir: Optional[str] = None,
) -> None:
"""
Create the data sub-archive.
Create the data sub-archive of an ipk package.
:param file: file to which the sub-archive will be written
:param epoch: fixed modification time to set
:param pkg_dir: directory in which the package tree exists
:param epoch: fixed modification time to set in the archive metadata
:param pkg_dir: directory containing the package tree to include in the
data sub-archive, leave empty to generate an empty data archive
"""
with _targz_open(file, epoch) as archive:
archive.add(
pkg_dir, filter=lambda info: _clean_info(pkg_dir, epoch, info)
)
if pkg_dir is not None:
archive.add(
pkg_dir, filter=lambda info: _clean_info(pkg_dir, epoch, info)
)


def make_ipk(
def write(
file: IO[bytes],
epoch: int,
pkg_dir: str,
metadata: str,
scripts: Dict[str, str],
pkg_dir: Optional[str] = None,
) -> None:
"""
Create an ipk package.
:param file: file to which the package will be written
:param epoch: fixed modification time to set
:param pkg_dir: directory in which the package tree exists
:param epoch: fixed modification time to set in the archives metadata
:param metadata: package metadata (main control file)
:param scripts: optional maintainer scripts
:param pkg_dir: directory containing the package tree to include in the
data sub-archive, leave empty to generate an empty data archive
"""
with BytesIO() as control, BytesIO() as data, _targz_open(
file, epoch
Expand All @@ -144,29 +150,89 @@ def make_ipk(
root_info.type = tarfile.DIRTYPE
archive.addfile(_clean_info(None, epoch, root_info))

make_control(control, epoch, metadata, scripts)
write_control(control, epoch, metadata, scripts)
_add_file(archive, "control.tar.gz", 0o644, epoch, control.getvalue())

make_data(data, epoch, pkg_dir)
write_data(data, epoch, pkg_dir)
_add_file(archive, "data.tar.gz", 0o644, epoch, data.getvalue())

_add_file(archive, "debian-binary", 0o644, epoch, b"2.0\n")


def read_ipk_metadata(file: IO[bytes]) -> str:
"""
Read the metadata of an ipk package.
class Reader:
"""Read from ipk packages."""

:param file: package file from which to read metadata
:returns: metadata document
"""
with tarfile.TarFile.open(fileobj=file, mode="r:gz") as root_archive:
def __init__(self, file: Union[str, IO[bytes]]):
"""
Create a package reader.
:param file: path to the package file to read, or opened
file object for a package file (in the second case, the
package file object will not by closed on exit)
"""
self._file: Optional[IO[bytes]] = None

if isinstance(file, str):
self._file = open(file, "rb") # pylint:disable=consider-using-with
self._close = True
else:
self._file = file
self._close = False

self._root_archive: Optional[tarfile.TarFile] = None
self._data_file: Optional[IO[bytes]] = None

self.data: Optional[tarfile.TarFile] = None
self.metadata: Optional[str] = None
self.scripts: Dict[str, str] = {}

def __enter__(self) -> "Reader":
"""Load package data to memory."""
root_archive = tarfile.TarFile.open(fileobj=self._file)
control_file = root_archive.extractfile("./control.tar.gz")
assert control_file is not None

with tarfile.TarFile.open(
fileobj=control_file, mode="r:gz"
) as control_archive:
metadata_file = control_archive.extractfile("./control")
assert metadata_file is not None
return metadata_file.read().decode("utf-8")
with control_file:
with tarfile.TarFile.open(fileobj=control_file) as control_archive:
for member in control_archive.getmembers():
if member.isfile():
file = control_archive.extractfile(member)
assert file is not None
with file:
contents = file.read().decode("utf-8")
if member.name == "./control":
self.metadata = contents
else:
self.scripts[member.name[2:]] = contents

data_file = root_archive.extractfile("./data.tar.gz")
assert data_file is not None
self._root_archive = root_archive
self._data_file = data_file
self.data = tarfile.TarFile.open(fileobj=data_file)

return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_inst: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Free resources containing package data."""
if self.data is not None:
self.data.close()
self.data = None

if self._data_file is not None:
self._data_file.close()
self._data_file = None

if self._root_archive is not None:
self._root_archive.close()
self._root_archive = None

if self._file is not None:
if self._close:
self._file.close()

self._file = None
2 changes: 1 addition & 1 deletion toltec/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Package: # pylint: disable=too-many-instance-attributes
# Name of this package, unique among all recipes of a repository
name: str

# Recipe that declares this package
# Recipe used to generate this package
parent: Recipe

# Version number
Expand Down
Loading

0 comments on commit 3f11cc7

Please sign in to comment.