test_pdf_advanced_features.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. #!/usr/bin/env python3
  2. """
  3. Tests for PDF Advanced Features (Priority 2 & 3)
  4. Tests cover:
  5. - OCR support for scanned PDFs
  6. - Password-protected PDFs
  7. - Table extraction
  8. - Parallel processing
  9. - Caching
  10. """
  11. import unittest
  12. import sys
  13. import tempfile
  14. import shutil
  15. import io
  16. from pathlib import Path
  17. from unittest.mock import Mock, patch, MagicMock
  18. # Add parent directory to path for imports
  19. sys.path.insert(0, str(Path(__file__).parent.parent / "cli"))
  20. try:
  21. import fitz # PyMuPDF
  22. PYMUPDF_AVAILABLE = True
  23. except ImportError:
  24. PYMUPDF_AVAILABLE = False
  25. try:
  26. from PIL import Image
  27. import pytesseract
  28. TESSERACT_AVAILABLE = True
  29. except ImportError:
  30. TESSERACT_AVAILABLE = False
  31. class TestOCRSupport(unittest.TestCase):
  32. """Test OCR support for scanned PDFs (Priority 2)"""
  33. def setUp(self):
  34. if not PYMUPDF_AVAILABLE:
  35. self.skipTest("PyMuPDF not installed")
  36. from pdf_extractor_poc import PDFExtractor
  37. self.PDFExtractor = PDFExtractor
  38. self.temp_dir = tempfile.mkdtemp()
  39. def tearDown(self):
  40. if hasattr(self, 'temp_dir'):
  41. shutil.rmtree(self.temp_dir, ignore_errors=True)
  42. def test_ocr_initialization(self):
  43. """Test OCR flag initialization"""
  44. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  45. extractor.use_ocr = True
  46. self.assertTrue(extractor.use_ocr)
  47. def test_extract_text_with_ocr_disabled(self):
  48. """Test that OCR can be disabled"""
  49. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  50. extractor.use_ocr = False
  51. extractor.verbose = False
  52. # Create mock page with normal text
  53. mock_page = Mock()
  54. mock_page.get_text.return_value = "This is regular text"
  55. text = extractor.extract_text_with_ocr(mock_page)
  56. self.assertEqual(text, "This is regular text")
  57. mock_page.get_text.assert_called_once_with("text")
  58. def test_extract_text_with_ocr_sufficient_text(self):
  59. """Test OCR not triggered when sufficient text exists"""
  60. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  61. extractor.use_ocr = True
  62. extractor.verbose = False
  63. # Create mock page with enough text
  64. mock_page = Mock()
  65. mock_page.get_text.return_value = "This is a long paragraph with more than 50 characters"
  66. text = extractor.extract_text_with_ocr(mock_page)
  67. self.assertEqual(len(text), 53) # Length after .strip()
  68. # OCR should not be triggered
  69. mock_page.get_pixmap.assert_not_called()
  70. @patch('pdf_extractor_poc.TESSERACT_AVAILABLE', False)
  71. def test_ocr_unavailable_warning(self):
  72. """Test warning when OCR requested but pytesseract not available"""
  73. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  74. extractor.use_ocr = True
  75. extractor.verbose = True
  76. mock_page = Mock()
  77. mock_page.get_text.return_value = "Short" # Less than 50 chars
  78. # Capture output
  79. with patch('sys.stdout', new=io.StringIO()) as fake_out:
  80. text = extractor.extract_text_with_ocr(mock_page)
  81. output = fake_out.getvalue()
  82. self.assertIn("OCR requested but pytesseract not installed", output)
  83. self.assertEqual(text, "Short")
  84. @unittest.skipUnless(TESSERACT_AVAILABLE, "pytesseract not installed")
  85. def test_ocr_extraction_triggered(self):
  86. """Test OCR extraction when text is minimal"""
  87. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  88. extractor.use_ocr = True
  89. extractor.verbose = False
  90. # Create mock page with minimal text
  91. mock_page = Mock()
  92. mock_page.get_text.return_value = "X" # Less than 50 chars
  93. # Mock pixmap and PIL Image
  94. mock_pix = Mock()
  95. mock_pix.width = 100
  96. mock_pix.height = 100
  97. mock_pix.samples = b'\x00' * (100 * 100 * 3)
  98. mock_page.get_pixmap.return_value = mock_pix
  99. with patch('pytesseract.image_to_string', return_value="OCR extracted text here"):
  100. text = extractor.extract_text_with_ocr(mock_page)
  101. # Should use OCR text since it's longer
  102. self.assertEqual(text, "OCR extracted text here")
  103. mock_page.get_pixmap.assert_called_once()
  104. class TestPasswordProtection(unittest.TestCase):
  105. """Test password-protected PDF support (Priority 2)"""
  106. def setUp(self):
  107. if not PYMUPDF_AVAILABLE:
  108. self.skipTest("PyMuPDF not installed")
  109. from pdf_extractor_poc import PDFExtractor
  110. self.PDFExtractor = PDFExtractor
  111. self.temp_dir = tempfile.mkdtemp()
  112. def tearDown(self):
  113. if hasattr(self, 'temp_dir'):
  114. shutil.rmtree(self.temp_dir, ignore_errors=True)
  115. def test_password_initialization(self):
  116. """Test password parameter initialization"""
  117. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  118. extractor.password = "test_password"
  119. self.assertEqual(extractor.password, "test_password")
  120. def test_encrypted_pdf_detection(self):
  121. """Test detection of encrypted PDF"""
  122. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  123. extractor.pdf_path = "test.pdf"
  124. extractor.password = "mypassword"
  125. extractor.verbose = False
  126. # Mock encrypted document (use MagicMock for __len__)
  127. mock_doc = MagicMock()
  128. mock_doc.is_encrypted = True
  129. mock_doc.authenticate.return_value = True
  130. mock_doc.metadata = {}
  131. mock_doc.__len__.return_value = 10
  132. with patch('fitz.open', return_value=mock_doc):
  133. # This would be called in extract_all()
  134. doc = fitz.open(extractor.pdf_path)
  135. self.assertTrue(doc.is_encrypted)
  136. result = doc.authenticate(extractor.password)
  137. self.assertTrue(result)
  138. def test_wrong_password_handling(self):
  139. """Test handling of wrong password"""
  140. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  141. extractor.pdf_path = "test.pdf"
  142. extractor.password = "wrong_password"
  143. mock_doc = Mock()
  144. mock_doc.is_encrypted = True
  145. mock_doc.authenticate.return_value = False
  146. with patch('fitz.open', return_value=mock_doc):
  147. doc = fitz.open(extractor.pdf_path)
  148. result = doc.authenticate(extractor.password)
  149. self.assertFalse(result)
  150. def test_missing_password_for_encrypted_pdf(self):
  151. """Test error when password is missing for encrypted PDF"""
  152. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  153. extractor.pdf_path = "test.pdf"
  154. extractor.password = None
  155. mock_doc = Mock()
  156. mock_doc.is_encrypted = True
  157. with patch('fitz.open', return_value=mock_doc):
  158. doc = fitz.open(extractor.pdf_path)
  159. self.assertTrue(doc.is_encrypted)
  160. self.assertIsNone(extractor.password)
  161. class TestTableExtraction(unittest.TestCase):
  162. """Test table extraction (Priority 2)"""
  163. def setUp(self):
  164. if not PYMUPDF_AVAILABLE:
  165. self.skipTest("PyMuPDF not installed")
  166. from pdf_extractor_poc import PDFExtractor
  167. self.PDFExtractor = PDFExtractor
  168. self.temp_dir = tempfile.mkdtemp()
  169. def tearDown(self):
  170. if hasattr(self, 'temp_dir'):
  171. shutil.rmtree(self.temp_dir, ignore_errors=True)
  172. def test_table_extraction_initialization(self):
  173. """Test table extraction flag initialization"""
  174. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  175. extractor.extract_tables = True
  176. self.assertTrue(extractor.extract_tables)
  177. def test_table_extraction_disabled(self):
  178. """Test no tables extracted when disabled"""
  179. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  180. extractor.extract_tables = False
  181. extractor.verbose = False
  182. mock_page = Mock()
  183. tables = extractor.extract_tables_from_page(mock_page)
  184. self.assertEqual(tables, [])
  185. # find_tables should not be called
  186. mock_page.find_tables.assert_not_called()
  187. def test_table_extraction_basic(self):
  188. """Test basic table extraction"""
  189. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  190. extractor.extract_tables = True
  191. extractor.verbose = False
  192. # Create mock table
  193. mock_table = Mock()
  194. mock_table.extract.return_value = [
  195. ["Header 1", "Header 2", "Header 3"],
  196. ["Data 1", "Data 2", "Data 3"]
  197. ]
  198. mock_table.bbox = (0, 0, 100, 100)
  199. # Create mock tables result
  200. mock_tables = Mock()
  201. mock_tables.tables = [mock_table]
  202. mock_page = Mock()
  203. mock_page.find_tables.return_value = mock_tables
  204. tables = extractor.extract_tables_from_page(mock_page)
  205. self.assertEqual(len(tables), 1)
  206. self.assertEqual(tables[0]['row_count'], 2)
  207. self.assertEqual(tables[0]['col_count'], 3)
  208. self.assertEqual(tables[0]['table_index'], 0)
  209. def test_multiple_tables_extraction(self):
  210. """Test extraction of multiple tables from one page"""
  211. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  212. extractor.extract_tables = True
  213. extractor.verbose = False
  214. # Create two mock tables
  215. mock_table1 = Mock()
  216. mock_table1.extract.return_value = [["A", "B"], ["1", "2"]]
  217. mock_table1.bbox = (0, 0, 50, 50)
  218. mock_table2 = Mock()
  219. mock_table2.extract.return_value = [["X", "Y", "Z"], ["10", "20", "30"]]
  220. mock_table2.bbox = (0, 60, 50, 110)
  221. mock_tables = Mock()
  222. mock_tables.tables = [mock_table1, mock_table2]
  223. mock_page = Mock()
  224. mock_page.find_tables.return_value = mock_tables
  225. tables = extractor.extract_tables_from_page(mock_page)
  226. self.assertEqual(len(tables), 2)
  227. self.assertEqual(tables[0]['table_index'], 0)
  228. self.assertEqual(tables[1]['table_index'], 1)
  229. def test_table_extraction_error_handling(self):
  230. """Test error handling during table extraction"""
  231. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  232. extractor.extract_tables = True
  233. extractor.verbose = False
  234. mock_page = Mock()
  235. mock_page.find_tables.side_effect = Exception("Table extraction failed")
  236. # Should not raise, should return empty list
  237. tables = extractor.extract_tables_from_page(mock_page)
  238. self.assertEqual(tables, [])
  239. class TestCaching(unittest.TestCase):
  240. """Test caching of expensive operations (Priority 3)"""
  241. def setUp(self):
  242. if not PYMUPDF_AVAILABLE:
  243. self.skipTest("PyMuPDF not installed")
  244. from pdf_extractor_poc import PDFExtractor
  245. self.PDFExtractor = PDFExtractor
  246. self.temp_dir = tempfile.mkdtemp()
  247. def tearDown(self):
  248. if hasattr(self, 'temp_dir'):
  249. shutil.rmtree(self.temp_dir, ignore_errors=True)
  250. def test_cache_initialization(self):
  251. """Test cache is initialized"""
  252. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  253. extractor._cache = {}
  254. extractor.use_cache = True
  255. self.assertIsInstance(extractor._cache, dict)
  256. self.assertTrue(extractor.use_cache)
  257. def test_cache_set_and_get(self):
  258. """Test setting and getting cached values"""
  259. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  260. extractor._cache = {}
  261. extractor.use_cache = True
  262. # Set cache
  263. test_data = {"page": 1, "text": "cached content"}
  264. extractor.set_cached("page_1", test_data)
  265. # Get cache
  266. cached = extractor.get_cached("page_1")
  267. self.assertEqual(cached, test_data)
  268. def test_cache_miss(self):
  269. """Test cache miss returns None"""
  270. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  271. extractor._cache = {}
  272. extractor.use_cache = True
  273. cached = extractor.get_cached("nonexistent_key")
  274. self.assertIsNone(cached)
  275. def test_cache_disabled(self):
  276. """Test caching can be disabled"""
  277. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  278. extractor._cache = {}
  279. extractor.use_cache = False
  280. # Try to set cache
  281. extractor.set_cached("page_1", {"data": "test"})
  282. # Cache should be empty
  283. self.assertEqual(len(extractor._cache), 0)
  284. # Try to get cache
  285. cached = extractor.get_cached("page_1")
  286. self.assertIsNone(cached)
  287. def test_cache_overwrite(self):
  288. """Test cache can be overwritten"""
  289. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  290. extractor._cache = {}
  291. extractor.use_cache = True
  292. # Set initial value
  293. extractor.set_cached("page_1", {"version": 1})
  294. # Overwrite
  295. extractor.set_cached("page_1", {"version": 2})
  296. # Get cached value
  297. cached = extractor.get_cached("page_1")
  298. self.assertEqual(cached["version"], 2)
  299. class TestParallelProcessing(unittest.TestCase):
  300. """Test parallel page processing (Priority 3)"""
  301. def setUp(self):
  302. if not PYMUPDF_AVAILABLE:
  303. self.skipTest("PyMuPDF not installed")
  304. from pdf_extractor_poc import PDFExtractor
  305. self.PDFExtractor = PDFExtractor
  306. self.temp_dir = tempfile.mkdtemp()
  307. def tearDown(self):
  308. if hasattr(self, 'temp_dir'):
  309. shutil.rmtree(self.temp_dir, ignore_errors=True)
  310. def test_parallel_initialization(self):
  311. """Test parallel processing flag initialization"""
  312. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  313. extractor.parallel = True
  314. extractor.max_workers = 4
  315. self.assertTrue(extractor.parallel)
  316. self.assertEqual(extractor.max_workers, 4)
  317. def test_parallel_disabled_by_default(self):
  318. """Test parallel processing is disabled by default"""
  319. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  320. extractor.parallel = False
  321. self.assertFalse(extractor.parallel)
  322. def test_worker_count_auto_detect(self):
  323. """Test worker count auto-detection"""
  324. import os
  325. cpu_count = os.cpu_count()
  326. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  327. extractor.max_workers = cpu_count
  328. self.assertIsNotNone(extractor.max_workers)
  329. self.assertGreater(extractor.max_workers, 0)
  330. def test_custom_worker_count(self):
  331. """Test custom worker count"""
  332. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  333. extractor.max_workers = 8
  334. self.assertEqual(extractor.max_workers, 8)
  335. class TestIntegration(unittest.TestCase):
  336. """Integration tests for advanced features"""
  337. def setUp(self):
  338. if not PYMUPDF_AVAILABLE:
  339. self.skipTest("PyMuPDF not installed")
  340. from pdf_extractor_poc import PDFExtractor
  341. self.PDFExtractor = PDFExtractor
  342. self.temp_dir = tempfile.mkdtemp()
  343. def tearDown(self):
  344. if hasattr(self, 'temp_dir'):
  345. shutil.rmtree(self.temp_dir, ignore_errors=True)
  346. def test_full_initialization_with_all_features(self):
  347. """Test initialization with all advanced features enabled"""
  348. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  349. # Set all advanced features
  350. extractor.use_ocr = True
  351. extractor.password = "test_password"
  352. extractor.extract_tables = True
  353. extractor.parallel = True
  354. extractor.max_workers = 4
  355. extractor.use_cache = True
  356. extractor._cache = {}
  357. # Verify all features are set
  358. self.assertTrue(extractor.use_ocr)
  359. self.assertEqual(extractor.password, "test_password")
  360. self.assertTrue(extractor.extract_tables)
  361. self.assertTrue(extractor.parallel)
  362. self.assertEqual(extractor.max_workers, 4)
  363. self.assertTrue(extractor.use_cache)
  364. def test_feature_combinations(self):
  365. """Test various feature combinations"""
  366. combinations = [
  367. {"use_ocr": True, "extract_tables": True},
  368. {"password": "test", "parallel": True},
  369. {"use_cache": True, "extract_tables": True, "parallel": True},
  370. {"use_ocr": True, "password": "test", "extract_tables": True, "parallel": True}
  371. ]
  372. for combo in combinations:
  373. extractor = self.PDFExtractor.__new__(self.PDFExtractor)
  374. for key, value in combo.items():
  375. setattr(extractor, key, value)
  376. # Verify all attributes are set correctly
  377. for key, value in combo.items():
  378. self.assertEqual(getattr(extractor, key), value)
  379. def test_page_data_includes_tables(self):
  380. """Test that page data includes table count"""
  381. # This tests that the page_data structure includes tables
  382. expected_keys = [
  383. 'page_number', 'text', 'markdown', 'headings',
  384. 'code_samples', 'images_count', 'extracted_images',
  385. 'tables', 'char_count', 'code_blocks_count', 'tables_count'
  386. ]
  387. # Just verify the structure is correct
  388. # Actual extraction is tested in other test classes
  389. page_data = {
  390. 'page_number': 1,
  391. 'text': 'test',
  392. 'markdown': 'test',
  393. 'headings': [],
  394. 'code_samples': [],
  395. 'images_count': 0,
  396. 'extracted_images': [],
  397. 'tables': [],
  398. 'char_count': 4,
  399. 'code_blocks_count': 0,
  400. 'tables_count': 0
  401. }
  402. for key in expected_keys:
  403. self.assertIn(key, page_data)
  404. if __name__ == '__main__':
  405. unittest.main()