| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- #!/usr/bin/env python3
- """
- Tests for PDF Advanced Features (Priority 2 & 3)
- Tests cover:
- - OCR support for scanned PDFs
- - Password-protected PDFs
- - Table extraction
- - Parallel processing
- - Caching
- """
- import unittest
- import sys
- import tempfile
- import shutil
- import io
- from pathlib import Path
- from unittest.mock import Mock, patch, MagicMock
- # Add parent directory to path for imports
- sys.path.insert(0, str(Path(__file__).parent.parent / "cli"))
- try:
- import fitz # PyMuPDF
- PYMUPDF_AVAILABLE = True
- except ImportError:
- PYMUPDF_AVAILABLE = False
- try:
- from PIL import Image
- import pytesseract
- TESSERACT_AVAILABLE = True
- except ImportError:
- TESSERACT_AVAILABLE = False
- class TestOCRSupport(unittest.TestCase):
- """Test OCR support for scanned PDFs (Priority 2)"""
- def setUp(self):
- if not PYMUPDF_AVAILABLE:
- self.skipTest("PyMuPDF not installed")
- from pdf_extractor_poc import PDFExtractor
- self.PDFExtractor = PDFExtractor
- self.temp_dir = tempfile.mkdtemp()
- def tearDown(self):
- if hasattr(self, 'temp_dir'):
- shutil.rmtree(self.temp_dir, ignore_errors=True)
- def test_ocr_initialization(self):
- """Test OCR flag initialization"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.use_ocr = True
- self.assertTrue(extractor.use_ocr)
- def test_extract_text_with_ocr_disabled(self):
- """Test that OCR can be disabled"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.use_ocr = False
- extractor.verbose = False
- # Create mock page with normal text
- mock_page = Mock()
- mock_page.get_text.return_value = "This is regular text"
- text = extractor.extract_text_with_ocr(mock_page)
- self.assertEqual(text, "This is regular text")
- mock_page.get_text.assert_called_once_with("text")
- def test_extract_text_with_ocr_sufficient_text(self):
- """Test OCR not triggered when sufficient text exists"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.use_ocr = True
- extractor.verbose = False
- # Create mock page with enough text
- mock_page = Mock()
- mock_page.get_text.return_value = "This is a long paragraph with more than 50 characters"
- text = extractor.extract_text_with_ocr(mock_page)
- self.assertEqual(len(text), 53) # Length after .strip()
- # OCR should not be triggered
- mock_page.get_pixmap.assert_not_called()
- @patch('pdf_extractor_poc.TESSERACT_AVAILABLE', False)
- def test_ocr_unavailable_warning(self):
- """Test warning when OCR requested but pytesseract not available"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.use_ocr = True
- extractor.verbose = True
- mock_page = Mock()
- mock_page.get_text.return_value = "Short" # Less than 50 chars
- # Capture output
- with patch('sys.stdout', new=io.StringIO()) as fake_out:
- text = extractor.extract_text_with_ocr(mock_page)
- output = fake_out.getvalue()
- self.assertIn("OCR requested but pytesseract not installed", output)
- self.assertEqual(text, "Short")
- @unittest.skipUnless(TESSERACT_AVAILABLE, "pytesseract not installed")
- def test_ocr_extraction_triggered(self):
- """Test OCR extraction when text is minimal"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.use_ocr = True
- extractor.verbose = False
- # Create mock page with minimal text
- mock_page = Mock()
- mock_page.get_text.return_value = "X" # Less than 50 chars
- # Mock pixmap and PIL Image
- mock_pix = Mock()
- mock_pix.width = 100
- mock_pix.height = 100
- mock_pix.samples = b'\x00' * (100 * 100 * 3)
- mock_page.get_pixmap.return_value = mock_pix
- with patch('pytesseract.image_to_string', return_value="OCR extracted text here"):
- text = extractor.extract_text_with_ocr(mock_page)
- # Should use OCR text since it's longer
- self.assertEqual(text, "OCR extracted text here")
- mock_page.get_pixmap.assert_called_once()
- class TestPasswordProtection(unittest.TestCase):
- """Test password-protected PDF support (Priority 2)"""
- def setUp(self):
- if not PYMUPDF_AVAILABLE:
- self.skipTest("PyMuPDF not installed")
- from pdf_extractor_poc import PDFExtractor
- self.PDFExtractor = PDFExtractor
- self.temp_dir = tempfile.mkdtemp()
- def tearDown(self):
- if hasattr(self, 'temp_dir'):
- shutil.rmtree(self.temp_dir, ignore_errors=True)
- def test_password_initialization(self):
- """Test password parameter initialization"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.password = "test_password"
- self.assertEqual(extractor.password, "test_password")
- def test_encrypted_pdf_detection(self):
- """Test detection of encrypted PDF"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.pdf_path = "test.pdf"
- extractor.password = "mypassword"
- extractor.verbose = False
- # Mock encrypted document (use MagicMock for __len__)
- mock_doc = MagicMock()
- mock_doc.is_encrypted = True
- mock_doc.authenticate.return_value = True
- mock_doc.metadata = {}
- mock_doc.__len__.return_value = 10
- with patch('fitz.open', return_value=mock_doc):
- # This would be called in extract_all()
- doc = fitz.open(extractor.pdf_path)
- self.assertTrue(doc.is_encrypted)
- result = doc.authenticate(extractor.password)
- self.assertTrue(result)
- def test_wrong_password_handling(self):
- """Test handling of wrong password"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.pdf_path = "test.pdf"
- extractor.password = "wrong_password"
- mock_doc = Mock()
- mock_doc.is_encrypted = True
- mock_doc.authenticate.return_value = False
- with patch('fitz.open', return_value=mock_doc):
- doc = fitz.open(extractor.pdf_path)
- result = doc.authenticate(extractor.password)
- self.assertFalse(result)
- def test_missing_password_for_encrypted_pdf(self):
- """Test error when password is missing for encrypted PDF"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.pdf_path = "test.pdf"
- extractor.password = None
- mock_doc = Mock()
- mock_doc.is_encrypted = True
- with patch('fitz.open', return_value=mock_doc):
- doc = fitz.open(extractor.pdf_path)
- self.assertTrue(doc.is_encrypted)
- self.assertIsNone(extractor.password)
- class TestTableExtraction(unittest.TestCase):
- """Test table extraction (Priority 2)"""
- def setUp(self):
- if not PYMUPDF_AVAILABLE:
- self.skipTest("PyMuPDF not installed")
- from pdf_extractor_poc import PDFExtractor
- self.PDFExtractor = PDFExtractor
- self.temp_dir = tempfile.mkdtemp()
- def tearDown(self):
- if hasattr(self, 'temp_dir'):
- shutil.rmtree(self.temp_dir, ignore_errors=True)
- def test_table_extraction_initialization(self):
- """Test table extraction flag initialization"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.extract_tables = True
- self.assertTrue(extractor.extract_tables)
- def test_table_extraction_disabled(self):
- """Test no tables extracted when disabled"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.extract_tables = False
- extractor.verbose = False
- mock_page = Mock()
- tables = extractor.extract_tables_from_page(mock_page)
- self.assertEqual(tables, [])
- # find_tables should not be called
- mock_page.find_tables.assert_not_called()
- def test_table_extraction_basic(self):
- """Test basic table extraction"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.extract_tables = True
- extractor.verbose = False
- # Create mock table
- mock_table = Mock()
- mock_table.extract.return_value = [
- ["Header 1", "Header 2", "Header 3"],
- ["Data 1", "Data 2", "Data 3"]
- ]
- mock_table.bbox = (0, 0, 100, 100)
- # Create mock tables result
- mock_tables = Mock()
- mock_tables.tables = [mock_table]
- mock_page = Mock()
- mock_page.find_tables.return_value = mock_tables
- tables = extractor.extract_tables_from_page(mock_page)
- self.assertEqual(len(tables), 1)
- self.assertEqual(tables[0]['row_count'], 2)
- self.assertEqual(tables[0]['col_count'], 3)
- self.assertEqual(tables[0]['table_index'], 0)
- def test_multiple_tables_extraction(self):
- """Test extraction of multiple tables from one page"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.extract_tables = True
- extractor.verbose = False
- # Create two mock tables
- mock_table1 = Mock()
- mock_table1.extract.return_value = [["A", "B"], ["1", "2"]]
- mock_table1.bbox = (0, 0, 50, 50)
- mock_table2 = Mock()
- mock_table2.extract.return_value = [["X", "Y", "Z"], ["10", "20", "30"]]
- mock_table2.bbox = (0, 60, 50, 110)
- mock_tables = Mock()
- mock_tables.tables = [mock_table1, mock_table2]
- mock_page = Mock()
- mock_page.find_tables.return_value = mock_tables
- tables = extractor.extract_tables_from_page(mock_page)
- self.assertEqual(len(tables), 2)
- self.assertEqual(tables[0]['table_index'], 0)
- self.assertEqual(tables[1]['table_index'], 1)
- def test_table_extraction_error_handling(self):
- """Test error handling during table extraction"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.extract_tables = True
- extractor.verbose = False
- mock_page = Mock()
- mock_page.find_tables.side_effect = Exception("Table extraction failed")
- # Should not raise, should return empty list
- tables = extractor.extract_tables_from_page(mock_page)
- self.assertEqual(tables, [])
- class TestCaching(unittest.TestCase):
- """Test caching of expensive operations (Priority 3)"""
- def setUp(self):
- if not PYMUPDF_AVAILABLE:
- self.skipTest("PyMuPDF not installed")
- from pdf_extractor_poc import PDFExtractor
- self.PDFExtractor = PDFExtractor
- self.temp_dir = tempfile.mkdtemp()
- def tearDown(self):
- if hasattr(self, 'temp_dir'):
- shutil.rmtree(self.temp_dir, ignore_errors=True)
- def test_cache_initialization(self):
- """Test cache is initialized"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor._cache = {}
- extractor.use_cache = True
- self.assertIsInstance(extractor._cache, dict)
- self.assertTrue(extractor.use_cache)
- def test_cache_set_and_get(self):
- """Test setting and getting cached values"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor._cache = {}
- extractor.use_cache = True
- # Set cache
- test_data = {"page": 1, "text": "cached content"}
- extractor.set_cached("page_1", test_data)
- # Get cache
- cached = extractor.get_cached("page_1")
- self.assertEqual(cached, test_data)
- def test_cache_miss(self):
- """Test cache miss returns None"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor._cache = {}
- extractor.use_cache = True
- cached = extractor.get_cached("nonexistent_key")
- self.assertIsNone(cached)
- def test_cache_disabled(self):
- """Test caching can be disabled"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor._cache = {}
- extractor.use_cache = False
- # Try to set cache
- extractor.set_cached("page_1", {"data": "test"})
- # Cache should be empty
- self.assertEqual(len(extractor._cache), 0)
- # Try to get cache
- cached = extractor.get_cached("page_1")
- self.assertIsNone(cached)
- def test_cache_overwrite(self):
- """Test cache can be overwritten"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor._cache = {}
- extractor.use_cache = True
- # Set initial value
- extractor.set_cached("page_1", {"version": 1})
- # Overwrite
- extractor.set_cached("page_1", {"version": 2})
- # Get cached value
- cached = extractor.get_cached("page_1")
- self.assertEqual(cached["version"], 2)
- class TestParallelProcessing(unittest.TestCase):
- """Test parallel page processing (Priority 3)"""
- def setUp(self):
- if not PYMUPDF_AVAILABLE:
- self.skipTest("PyMuPDF not installed")
- from pdf_extractor_poc import PDFExtractor
- self.PDFExtractor = PDFExtractor
- self.temp_dir = tempfile.mkdtemp()
- def tearDown(self):
- if hasattr(self, 'temp_dir'):
- shutil.rmtree(self.temp_dir, ignore_errors=True)
- def test_parallel_initialization(self):
- """Test parallel processing flag initialization"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.parallel = True
- extractor.max_workers = 4
- self.assertTrue(extractor.parallel)
- self.assertEqual(extractor.max_workers, 4)
- def test_parallel_disabled_by_default(self):
- """Test parallel processing is disabled by default"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.parallel = False
- self.assertFalse(extractor.parallel)
- def test_worker_count_auto_detect(self):
- """Test worker count auto-detection"""
- import os
- cpu_count = os.cpu_count()
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.max_workers = cpu_count
- self.assertIsNotNone(extractor.max_workers)
- self.assertGreater(extractor.max_workers, 0)
- def test_custom_worker_count(self):
- """Test custom worker count"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- extractor.max_workers = 8
- self.assertEqual(extractor.max_workers, 8)
- class TestIntegration(unittest.TestCase):
- """Integration tests for advanced features"""
- def setUp(self):
- if not PYMUPDF_AVAILABLE:
- self.skipTest("PyMuPDF not installed")
- from pdf_extractor_poc import PDFExtractor
- self.PDFExtractor = PDFExtractor
- self.temp_dir = tempfile.mkdtemp()
- def tearDown(self):
- if hasattr(self, 'temp_dir'):
- shutil.rmtree(self.temp_dir, ignore_errors=True)
- def test_full_initialization_with_all_features(self):
- """Test initialization with all advanced features enabled"""
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- # Set all advanced features
- extractor.use_ocr = True
- extractor.password = "test_password"
- extractor.extract_tables = True
- extractor.parallel = True
- extractor.max_workers = 4
- extractor.use_cache = True
- extractor._cache = {}
- # Verify all features are set
- self.assertTrue(extractor.use_ocr)
- self.assertEqual(extractor.password, "test_password")
- self.assertTrue(extractor.extract_tables)
- self.assertTrue(extractor.parallel)
- self.assertEqual(extractor.max_workers, 4)
- self.assertTrue(extractor.use_cache)
- def test_feature_combinations(self):
- """Test various feature combinations"""
- combinations = [
- {"use_ocr": True, "extract_tables": True},
- {"password": "test", "parallel": True},
- {"use_cache": True, "extract_tables": True, "parallel": True},
- {"use_ocr": True, "password": "test", "extract_tables": True, "parallel": True}
- ]
- for combo in combinations:
- extractor = self.PDFExtractor.__new__(self.PDFExtractor)
- for key, value in combo.items():
- setattr(extractor, key, value)
- # Verify all attributes are set correctly
- for key, value in combo.items():
- self.assertEqual(getattr(extractor, key), value)
- def test_page_data_includes_tables(self):
- """Test that page data includes table count"""
- # This tests that the page_data structure includes tables
- expected_keys = [
- 'page_number', 'text', 'markdown', 'headings',
- 'code_samples', 'images_count', 'extracted_images',
- 'tables', 'char_count', 'code_blocks_count', 'tables_count'
- ]
- # Just verify the structure is correct
- # Actual extraction is tested in other test classes
- page_data = {
- 'page_number': 1,
- 'text': 'test',
- 'markdown': 'test',
- 'headings': [],
- 'code_samples': [],
- 'images_count': 0,
- 'extracted_images': [],
- 'tables': [],
- 'char_count': 4,
- 'code_blocks_count': 0,
- 'tables_count': 0
- }
- for key in expected_keys:
- self.assertIn(key, page_data)
- if __name__ == '__main__':
- unittest.main()
|