44
55from __future__ import annotations
66
7- import logging
87import traceback
98from itertools import chain
109from pathlib import Path
1110from typing import Iterable
1211
13- import structlog
1412from common_helper_unpacking_classifier import avg_entropy
13+ from structlog .testing import capture_logs
1514from unblob .extractor import carve_unknown_chunk , carve_valid_chunk
1615from unblob .file_utils import File
1716from unblob .finder import search_chunks
2625
2726MIN_FILE_ENTROPY = 0.01
2827
29- # deactivate internal logger of unblob because it can slow down searching chunks
30- structlog .configure (wrapper_class = structlog .make_filtering_bound_logger (logging .CRITICAL ))
31-
3228
3329class ZlibCarvingHandler (ZlibHandler ):
3430 NAME = 'zlib_carver'
@@ -51,7 +47,8 @@ def unpack_function(file_path: str, tmp_dir: str) -> dict:
5147 path = Path (file_path )
5248
5349 try :
54- with File .from_path (path ) as file :
50+ with File .from_path (path ) as file , capture_logs () as log_list :
51+ # unblob uses structlog for logging, but we can capture the logs with this convenient testing function
5552 for chunk in _find_chunks (path , file ):
5653 if isinstance (chunk , PaddingChunk ):
5754 continue
@@ -66,31 +63,30 @@ def unpack_function(file_path: str, tmp_dir: str) -> dict:
6663 carve_valid_chunk (extraction_dir , file , chunk )
6764 chunks .append (chunk .as_report (None ).asdict ())
6865
69- report = _create_report ( chunks ) if chunks else 'No valid chunks found.'
66+ report = _format_logs ( log_list )
7067 if filter_report :
7168 report += f'\n Filtered chunks:\n { filter_report } '
69+ if not chunks :
70+ report += '\n No valid chunks found.'
7271 except Exception as error :
7372 report = f'Error { error } during unblob extraction:\n { traceback .format_exc ()} '
7473 return {'output' : report }
7574
7675
76+ def _format_logs (logs : list [dict ]) -> str :
77+ output = ''
78+ for entry in logs :
79+ output += '\n ' .join (f'{ key } : { value } ' for key , value in entry .items () if key not in {'_verbosity' , 'log_level' })
80+ return output
81+
82+
7783def _find_chunks (file_path : Path , file : File ) -> Iterable [Chunk ]:
7884 task = Task (path = file_path , depth = 0 , blob_id = '' )
7985 known_chunks = remove_inner_chunks (search_chunks (file , file .size (), HANDLERS , TaskResult (task )))
8086 unknown_chunks = calculate_unknown_chunks (known_chunks , file .size ())
8187 yield from chain (known_chunks , unknown_chunks )
8288
8389
84- def _create_report (chunk_list : list [dict ]) -> str :
85- report = ['Extracted chunks:' ]
86- for chunk in sorted (chunk_list , key = lambda c : c ['start_offset' ]):
87- chunk_type = chunk .get ('handler_name' , 'unknown' )
88- report .append (
89- f'start: { chunk ["start_offset" ]} , end: { chunk ["end_offset" ]} , size: { chunk ["size" ]} , type: { chunk_type } '
90- )
91- return '\n ' .join (report )
92-
93-
9490def _has_low_entropy (file : File , chunk : UnknownChunk ) -> bool :
9591 file .seek (chunk .start_offset )
9692 content = file .read (chunk .size )
0 commit comments