Skip to content

Commit b76cec6

Browse files
committed
feat: added new unpacking plugin base class
1 parent 8f0f4f5 commit b76cec6

File tree

10 files changed

+222
-135
lines changed

10 files changed

+222
-135
lines changed
Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,51 @@
1+
from __future__ import annotations
2+
3+
import importlib.util
14
import logging
5+
import sys
6+
from importlib.machinery import SourceFileLoader
7+
from inspect import isclass
28
from pathlib import Path
9+
from typing import Iterable, Type
310

4-
from common_helper_files import get_dirs_in_dir
5-
from pluginbase import PluginBase
6-
7-
from helperFunctions.file_system import get_src_dir
8-
11+
from plugins.base_class import UnpackingPlugin
912

10-
def import_plugins(plugin_mount, plugin_base_dir):
11-
plugin_base = PluginBase(package=plugin_mount)
12-
plugin_src_dirs = _get_plugin_src_dirs(plugin_base_dir)
13-
return plugin_base.make_plugin_source(searchpath=plugin_src_dirs)
13+
SRC_DIR = Path(__file__).parent.parent
14+
PLUGIN_DIR = SRC_DIR / 'plugins'
1415

1516

16-
def _get_plugin_src_dirs(base_dir):
17-
plug_in_base_path = Path(get_src_dir(), base_dir)
18-
plugin_dirs = get_dirs_in_dir(str(plug_in_base_path))
17+
def import_plugins(path=PLUGIN_DIR) -> list:
18+
"""Returns a list of modules where each module is an unpacking plugin."""
1919
plugins = []
20-
for plugin_path in plugin_dirs:
21-
# Ignore cache directories
22-
if Path(plugin_path).name == '__pycache__':
20+
for plugin_file in path.glob('**/code/*.py'): # type: Path
21+
if plugin_file.name == '__init__.py':
2322
continue
2423

25-
plugin_code_dir = Path(plugin_path, 'code')
26-
if plugin_code_dir.is_dir():
27-
plugins.append(str(plugin_code_dir))
28-
else:
29-
logging.warning(f'Plugin has no code directory: {plugin_path}')
24+
module_name = str(plugin_file.relative_to(SRC_DIR)).replace('/', '.')[: -len('.py')]
25+
loader = SourceFileLoader(module_name, str(plugin_file))
26+
spec = importlib.util.spec_from_loader(loader.name, loader)
27+
plugin_module = importlib.util.module_from_spec(spec)
28+
29+
sys.modules[spec.name] = plugin_module
30+
try:
31+
loader.exec_module(plugin_module)
32+
plugins.append(plugin_module)
33+
except Exception as error: # probably missing dependencies
34+
sys.modules.pop(spec.name)
35+
logging.exception(f'Could not import plugin {module_name} due to exception: {error}')
36+
3037
return plugins
38+
39+
40+
def find_plugin_classes(module) -> Iterable[Type[UnpackingPlugin]]:
41+
"""get all subclasses of UnpackingPlugin from the module"""
42+
for attr_name in dir(module):
43+
if attr_name.startswith('_'):
44+
continue
45+
attr = getattr(module, attr_name)
46+
if _is_plugin_class(attr):
47+
yield attr
48+
49+
50+
def _is_plugin_class(attr) -> bool:
51+
return attr != UnpackingPlugin and isclass(attr) and issubclass(attr, UnpackingPlugin)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Type
5+
6+
7+
class PluginError(Exception):
8+
pass
9+
10+
11+
class UnpackingPlugin(ABC):
12+
MIME_PATTERNS = () # must be overwritten by the subclass
13+
NAME = 'base'
14+
VERSION = '0.0.0'
15+
16+
@abstractmethod
17+
def unpack_file(self, file_path: str, tmp_dir: str) -> dict:
18+
"""
19+
Unpack a file `file_path` to `tmp_dir`. Must be implemented by the concrete unpacker subclass!
20+
The function returns a dictionary with metadata from unpacking the file. The key 'output' is expected and
21+
its value should contain the output of the used tool or relevant logging messages.
22+
23+
:param file_path: Path to the file to be unpacked.
24+
:param tmp_dir: Path to the temporary directory where the file will be unpacked.
25+
:return: The metadata.
26+
"""
27+
28+
def validate(self):
29+
if len(self.MIME_PATTERNS) == 0:
30+
raise PluginError(f'{self.NAME} is not a valid plugin (no MIME patterns defined)')
31+
if self.NAME == 'base':
32+
raise PluginError(f'{self.NAME} is not a valid plugin (no NAME defined)')
33+
if self.VERSION == '0.0.0':
34+
raise PluginError(f'{self.NAME} is not a valid plugin (no VERSION defined)')
35+
36+
@classmethod
37+
def from_old_module(cls, old_module) -> Type[UnpackingPlugin]:
38+
"""For backwards compatibility with old plugins, create a subclass dynamically."""
39+
return type(
40+
f'{old_module.__name__}.Unpacker',
41+
(cls,),
42+
{
43+
'NAME': old_module.NAME,
44+
'VERSION': old_module.VERSION,
45+
'MIME_PATTERNS': old_module.MIME_PATTERNS,
46+
'unpack_file': staticmethod(old_module.unpack_function),
47+
},
48+
)

fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import re
21
import zlib
32
from pathlib import Path
43

@@ -18,7 +17,7 @@ def test_unpacker_selection_generic(self):
1817
def test_extraction(self):
1918
in_file = f'{get_test_data_dir()}/generic_carver_test'
2019
files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(
21-
in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver']
20+
in_file, self.tmp_dir.name, self.unpacker.unpacking_plugins['generic_carver']
2221
)
2322
files = set(files)
2423
assert len(files) == 3, 'file number incorrect'
@@ -28,7 +27,7 @@ def test_extraction(self):
2827
def test_zlib_carving(self):
2928
in_file = TEST_DATA_DIR / 'zlib_carving_test'
3029
files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(
31-
in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver']
30+
in_file, self.tmp_dir.name, self.unpacker.unpacking_plugins['generic_carver']
3231
)
3332
assert len(files) == 9, 'file number incorrect'
3433
assert sum(1 if f.endswith('.zlib_carver') else 0 for f in files), 'wrong number of carved zlib streams'
@@ -40,7 +39,7 @@ def test_filter(self):
4039
in_file = TEST_DATA_DIR / 'carving_test_file'
4140
assert Path(in_file).is_file()
4241
files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(
43-
str(in_file), self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver']
42+
str(in_file), self.tmp_dir.name, self.unpacker.unpacking_plugins['generic_carver']
4443
)
4544
files = set(files)
4645
assert len(files) == 4, 'file number incorrect'
@@ -53,12 +52,11 @@ def test_fake_archives(self, file_format):
5352
in_file = TEST_DATA_DIR / f'fake_{file_format}.{file_format}'
5453
assert Path(in_file).is_file()
5554
meta = unpack_function(str(in_file), self.tmp_dir.name)
56-
assert meta == {'output': 'No valid chunks found.'}
55+
assert 'No valid chunks found.' in meta['output']
5756

5857
@pytest.mark.parametrize(('file_format', 'expected_size'), [('bz2', 52), ('zip', 170)])
5958
def test_trailing_data(self, file_format, expected_size):
6059
in_file = Path(get_test_data_dir()) / f'trailing_data.{file_format}'
6160
assert Path(in_file).is_file()
6261
meta = unpack_function(str(in_file), self.tmp_dir.name)
63-
carved_size = int(re.search(r'size: (\d+)', meta['output']).group(1))
64-
assert carved_size == expected_size
62+
assert f'chunk: 0x0-{hex(expected_size)}' in meta['output']

fact_extractor/plugins/unpacking/raw/test/test_raw.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def test_extraction(self):
2222
def test_extraction_encoded(self):
2323
input_file = Path(TEST_DATA_DIR, 'encoded.bin')
2424
unpacked_files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(
25-
str(input_file), self.tmp_dir.name, self.unpacker.unpacker_plugins['data/raw']
25+
str(input_file), self.tmp_dir.name, self.unpacker.unpacking_plugins['RAW']
2626
)
2727
assert meta_data['Intel Hex'] == 1
2828
assert meta_data['Motorola S-Record'] == 1

fact_extractor/plugins/unpacking/senao/test/test_plugin_senao.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def test_extraction(self):
1515
files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(
1616
str(in_file),
1717
self.tmp_dir.name,
18-
self.unpacker.unpacker_plugins['firmware/senao-v2b'],
18+
self.unpacker.unpacking_plugins['senao'],
1919
)
2020
assert len(files) == 1, 'unpacked file number incorrect'
2121
file = Path(files[0])
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
1-
def plugin_init():
2-
return 'plugin_loaded'
1+
NAME = 'plugin_one'
2+
VERSION = '0.0.1'
3+
MIME_PATTERNS = ('foo/bar',)
4+
5+
6+
def unpack_function(file_path, tmp_dir): # noqa: ARG001
7+
pass
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from plugins.base_class import UnpackingPlugin
2+
3+
4+
class UnpackerTwo(UnpackingPlugin):
5+
NAME = 'plugin_two'
6+
VERSION = '0.0.1'
7+
MIME_PATTERNS = ('test/123',)
8+
9+
def unpack_file(self, file_path, tmp_dir):
10+
pass
Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
1-
from helperFunctions.plugin import _get_plugin_src_dirs, import_plugins
1+
from pathlib import Path
22

3-
TEST_PLUGINS_BASE_PATH = 'test/data/plugin_system'
3+
from helperFunctions.plugin import find_plugin_classes, import_plugins
44

5+
TEST_PLUGINS_BASE_PATH = Path(__file__).parent.parent.parent / 'data/plugin_system'
56

6-
class TestHelperFunctionsPlugin:
7-
def test_get_plugin_src_dirs(self):
8-
result = _get_plugin_src_dirs(TEST_PLUGINS_BASE_PATH)
9-
assert isinstance(result, list), 'result is not a list'
10-
assert 'plugin_one' in sorted(result)[0], 'plugin not found'
11-
assert len(result) == 2, 'number of found plugin directories not correct'
127

13-
def test_load_plugins(self):
14-
result = import_plugins('plugins.test', TEST_PLUGINS_BASE_PATH)
15-
imported_plugins = result.list_plugins()
16-
assert len(imported_plugins) == 1, 'wrong number of plugins imported'
17-
assert imported_plugins[0] == 'plugin_one', 'plugin name not correct'
8+
def test_load_plugins():
9+
imported_modules = import_plugins(TEST_PLUGINS_BASE_PATH)
10+
assert len(imported_modules) == 2, 'wrong number of plugins imported'
11+
modules = {m.__name__.split('.')[-1]: m for m in imported_modules}
12+
assert 'plugin_one' in modules
13+
assert 'plugin_two' in modules
14+
assert len(list(find_plugin_classes(modules['plugin_one']))) == 0
15+
assert len(list(find_plugin_classes(modules['plugin_two']))) == 1

fact_extractor/test/unit/unpacker/test_unpacker.py

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,59 @@
22

33
from __future__ import annotations
44

5-
import gc
65
import json
7-
import os
86
import shutil
97
from configparser import ConfigParser
108
from pathlib import Path
119
from tempfile import TemporaryDirectory
1210
from unittest.mock import Mock, patch
1311

1412
from helperFunctions.file_system import get_test_data_dir
13+
from plugins.base_class import UnpackingPlugin
1514
from unpacker.unpack import Unpacker
1615

1716

1817
class TestUnpackerBase:
19-
def setup_method(self):
20-
self.config = ConfigParser()
21-
self.ds_tmp_dir = TemporaryDirectory(prefix='fact_tests_')
22-
self.tmp_dir = TemporaryDirectory(prefix='fact_tests_')
18+
@classmethod
19+
def setup_class(cls):
20+
cls.config = ConfigParser()
21+
cls.ds_tmp_dir = TemporaryDirectory(prefix='fact_tests_')
22+
cls.files_dir = Path(cls.ds_tmp_dir.name) / 'files'
23+
cls.reports_dir = Path(cls.ds_tmp_dir.name) / 'reports'
24+
25+
cls.config.add_section('unpack')
26+
cls.config.set('unpack', 'data_folder', cls.ds_tmp_dir.name)
27+
cls.config.set('unpack', 'blacklist', 'text/plain, image/png')
28+
cls.config.add_section('ExpertSettings')
29+
cls.config.set('ExpertSettings', 'header_overhead', '256')
30+
cls.config.set('ExpertSettings', 'unpack_threshold', '0.8')
2331

24-
self.config.add_section('unpack')
25-
self.config.set('unpack', 'data_folder', self.ds_tmp_dir.name)
26-
self.config.set('unpack', 'blacklist', 'text/plain, image/png')
27-
self.config.add_section('ExpertSettings')
28-
self.config.set('ExpertSettings', 'header_overhead', '256')
29-
self.config.set('ExpertSettings', 'unpack_threshold', '0.8')
32+
cls.unpacker = Unpacker(config=cls.config)
3033

31-
self.unpacker = Unpacker(config=self.config)
32-
os.makedirs(str(self.unpacker._report_folder), exist_ok=True) # pylint: disable=protected-access
33-
os.makedirs(str(self.unpacker._file_folder), exist_ok=True) # pylint: disable=protected-access
34+
cls.test_file_path = Path(get_test_data_dir(), 'get_files_test/testfile1')
3435

35-
self.test_file_path = Path(get_test_data_dir(), 'get_files_test/testfile1')
36+
def setup_method(self):
37+
self.tmp_dir = TemporaryDirectory(prefix='fact_tests_')
38+
self.unpacker._report_folder.mkdir(parents=True, exist_ok=True)
39+
self.unpacker._file_folder.mkdir(parents=True, exist_ok=True)
3640

3741
def teardown_method(self):
38-
self.ds_tmp_dir.cleanup()
3942
self.tmp_dir.cleanup()
40-
gc.collect()
43+
shutil.rmtree(self.unpacker._report_folder)
44+
shutil.rmtree(self.unpacker._file_folder)
45+
46+
@classmethod
47+
def teardown_class(cls):
48+
cls.ds_tmp_dir.cleanup()
4149

4250
def get_unpacker_meta(self):
4351
return json.loads(
4452
Path(self.unpacker._report_folder, 'meta.json').read_text() # pylint: disable=protected-access
4553
)
4654

4755
def check_unpacker_selection(self, mime_type, plugin_name):
48-
name = self.unpacker.get_unpacker(mime_type)[1]
49-
assert name == plugin_name, 'wrong unpacker plugin selected'
56+
unpacker = self.unpacker.get_unpacker(mime_type)
57+
assert unpacker.NAME == plugin_name, 'wrong unpacker plugin selected' # noqa: SIM300
5058

5159
def check_unpacking_of_standard_unpack_set(
5260
self,
@@ -59,9 +67,9 @@ def check_unpacking_of_standard_unpack_set(
5967
files = {f for f in files if not any(rule in f for rule in ignore or set())}
6068
assert len(files) == 3, f'file number incorrect: {meta_data}'
6169
assert files == {
62-
os.path.join(self.tmp_dir.name, additional_prefix_folder, 'testfile1'),
63-
os.path.join(self.tmp_dir.name, additional_prefix_folder, 'testfile2'),
64-
os.path.join(self.tmp_dir.name, additional_prefix_folder, 'generic folder/test file 3_.txt'),
70+
str(Path(self.tmp_dir.name, additional_prefix_folder, 'testfile1')),
71+
str(Path(self.tmp_dir.name, additional_prefix_folder, 'testfile2')),
72+
str(Path(self.tmp_dir.name, additional_prefix_folder, 'generic folder/test file 3_.txt')),
6573
}, f'not all files found: {meta_data}'
6674
if output:
6775
assert 'output' in meta_data
@@ -70,9 +78,11 @@ def check_unpacking_of_standard_unpack_set(
7078

7179
class TestUnpackerCore(TestUnpackerBase):
7280
def test_generic_carver_found(self):
73-
assert 'generic/carver' in list(self.unpacker.unpacker_plugins), 'generic carver plugin not found'
74-
name = self.unpacker.unpacker_plugins['generic/carver'][1]
75-
assert name == 'generic_carver', 'generic_carver plugin not found'
81+
assert 'generic_carver' in self.unpacker.unpacking_plugins, 'generic carver plugin not found'
82+
assert 'generic/carver' in self.unpacker.plugin_by_mime, 'generic carver MIME type not found'
83+
plugin = self.unpacker.get_unpacker('generic/carver')
84+
assert isinstance(plugin, UnpackingPlugin)
85+
assert plugin.NAME == 'generic_carver', 'generic_carver plugin not found'
7686

7787
def test_unpacker_selection_unknown(self):
7888
self.check_unpacker_selection('unknown/blah', 'generic_carver')
@@ -167,17 +177,17 @@ def test_main_unpack_function(self):
167177
test_file_path = Path(get_test_data_dir(), 'container/test.zip')
168178
self.main_unpack_check(test_file_path, 3, 0, '7z')
169179

170-
def test_main_unpack_exclude_archive(self):
180+
def test_main_unpack_exclude_archive(self, monkeypatch):
171181
test_file_path = Path(get_test_data_dir(), 'container/test.zip')
172-
self.unpacker.exclude = ['*test.zip']
182+
monkeypatch.setattr(self.unpacker, 'exclude', ['*test.zip'])
173183
self.main_unpack_check(test_file_path, 0, 1, None)
174184

175-
def test_main_unpack_exclude_subdirectory(self):
185+
def test_main_unpack_exclude_subdirectory(self, monkeypatch):
176186
test_file_path = Path(get_test_data_dir(), 'container/test.zip')
177-
self.unpacker.exclude = ['*/generic folder/*']
187+
monkeypatch.setattr(self.unpacker, 'exclude', ['*/generic folder/*'])
178188
self.main_unpack_check(test_file_path, 2, 1, '7z')
179189

180-
def test_main_unpack_exclude_files(self):
190+
def test_main_unpack_exclude_files(self, monkeypatch):
181191
test_file_path = Path(get_test_data_dir(), 'container/test.zip')
182-
self.unpacker.exclude = ['*/get_files_test/*test*']
192+
monkeypatch.setattr(self.unpacker, 'exclude', ['*/get_files_test/*test*'])
183193
self.main_unpack_check(test_file_path, 0, 3, '7z')

0 commit comments

Comments
 (0)