Alchemical Hands in the Hypnerotomachia Poliphili

Marginalia, Scholarship & Reception

← All Scripts

Chunk Documents

chunk_documents.py — 261 lines

Splits markdown files into ~1500-word semantic chunks for RAG/retrieval systems.

1"""Split markdown files into semantic chunks."""
2
3import re
4import os
5from pathlib import Path
6
7BASE_DIR = Path(__file__).resolve().parent.parent
8MD_DIR = BASE_DIR / "md"
9CHUNKS_DIR = BASE_DIR / "chunks"
10
11TARGET_CHUNK_SIZE = 1500  # words
12MIN_CHUNK_SIZE = 200  # words
13
14
15def parse_frontmatter(text):
16    """Extract YAML frontmatter and body from markdown."""
17    if text.startswith('---'):
18        end = text.find('---', 3)
19        if end > 0:
20            frontmatter = text[3:end].strip()
21            body = text[end + 3:].strip()
22            # Parse simple YAML
23            meta = {}
24            for line in frontmatter.split('\n'):
25                if ':' in line:
26                    key, _, val = line.partition(':')
27                    val = val.strip().strip('"').strip("'")
28                    meta[key.strip()] = val
29            return meta, body
30    return {}, text
31
32
33def find_section_breaks(text):
34    """Find natural section breaks in the text."""
35    # Look for markdown headings
36    heading_pattern = re.compile(r'^(#{1,3})\s+(.+)$', re.MULTILINE)
37    # Look for page markers
38    page_pattern = re.compile(r'^<!-- Page (\d+) -->$', re.MULTILINE)
39    # Look for horizontal rules
40    rule_pattern = re.compile(r'^---+$', re.MULTILINE)
41
42    breaks = []
43    for m in heading_pattern.finditer(text):
44        level = len(m.group(1))
45        breaks.append({
46            'pos': m.start(),
47            'type': 'heading',
48            'level': level,
49            'title': m.group(2).strip(),
50        })
51
52    for m in page_pattern.finditer(text):
53        breaks.append({
54            'pos': m.start(),
55            'type': 'page',
56            'level': 99,
57            'title': f'Page {m.group(1)}',
58        })
59
60    for m in rule_pattern.finditer(text):
61        breaks.append({
62            'pos': m.start(),
63            'type': 'rule',
64            'level': 99,
65            'title': '',
66        })
67
68    breaks.sort(key=lambda b: b['pos'])
69    return breaks
70
71
72def chunk_by_headings(text, meta):
73    """Split text into chunks based on headings, page markers, and size."""
74    breaks = find_section_breaks(text)
75    total_words = len(text.split())
76
77    # If we have good heading structure, split by headings
78    heading_breaks = [b for b in breaks if b['type'] == 'heading']
79
80    if len(heading_breaks) >= 3:
81        chunks = []
82        for i, brk in enumerate(heading_breaks):
83            start = brk['pos']
84            end = heading_breaks[i + 1]['pos'] if i + 1 < len(heading_breaks) else len(text)
85            chunk_text = text[start:end].strip()
86            word_count = len(chunk_text.split())
87
88            if word_count >= MIN_CHUNK_SIZE:
89                chunks.append({
90                    'text': chunk_text,
91                    'title': brk['title'],
92                    'word_count': word_count,
93                })
94            elif chunks:
95                # Merge small chunks with previous
96                chunks[-1]['text'] += '\n\n' + chunk_text
97                chunks[-1]['word_count'] += word_count
98
99        # Handle text before first heading
100        pre_text = text[:heading_breaks[0]['pos']].strip()
101        if pre_text and len(pre_text.split()) >= MIN_CHUNK_SIZE:
102            chunks.insert(0, {
103                'text': pre_text,
104                'title': 'Introduction',
105                'word_count': len(pre_text.split()),
106            })
107
108        # If chunks are still too big (>3000 words), sub-chunk them
109        final_chunks = []
110        for chunk in chunks:
111            if chunk['word_count'] > 3000:
112                sub = chunk_by_size(chunk['text'], base_title=chunk['title'])
113                final_chunks.extend(sub)
114            else:
115                final_chunks.append(chunk)
116        return final_chunks
117
118    # For large documents without headings, split by page markers
119    page_breaks = [b for b in breaks if b['type'] == 'page']
120    if page_breaks and total_words > TARGET_CHUNK_SIZE * 2:
121        return chunk_by_pages(text, page_breaks)
122
123    # Small documents: split by size if needed
124    if total_words > TARGET_CHUNK_SIZE * 2:
125        return chunk_by_size(text)
126
127    # Small enough to be one chunk
128    if total_words >= MIN_CHUNK_SIZE:
129        return [{'text': text, 'title': meta.get('title', 'Full Text'),
130                 'word_count': total_words}]
131    return []
132
133
134def chunk_by_pages(text, page_breaks):
135    """Group page markers into chunks of ~TARGET_CHUNK_SIZE words."""
136    chunks = []
137    current_text = []
138    current_words = 0
139    current_start_page = None
140
141    for i, brk in enumerate(page_breaks):
142        start = brk['pos']
143        end = page_breaks[i + 1]['pos'] if i + 1 < len(page_breaks) else len(text)
144        page_text = text[start:end].strip()
145        page_words = len(page_text.split())
146        page_num = brk['title'].replace('Page ', '')
147
148        if current_start_page is None:
149            current_start_page = page_num
150
151        if current_words + page_words > TARGET_CHUNK_SIZE and current_words >= MIN_CHUNK_SIZE:
152            chunks.append({
153                'text': '\n\n'.join(current_text),
154                'title': f'Pages {current_start_page}-{page_num}',
155                'word_count': current_words,
156            })
157            current_text = [page_text]
158            current_words = page_words
159            current_start_page = page_num
160        else:
161            current_text.append(page_text)
162            current_words += page_words
163
164    if current_text and current_words >= MIN_CHUNK_SIZE:
165        chunks.append({
166            'text': '\n\n'.join(current_text),
167            'title': f'Pages {current_start_page}-end',
168            'word_count': current_words,
169        })
170
171    return chunks
172
173
174def chunk_by_size(text, base_title=None):
175    """Split text into roughly equal chunks by word count."""
176    paragraphs = re.split(r'\n\n+', text)
177    chunks = []
178    current_text = []
179    current_words = 0
180
181    for para in paragraphs:
182        para_words = len(para.split())
183        if current_words + para_words > TARGET_CHUNK_SIZE and current_words >= MIN_CHUNK_SIZE:
184            n = len(chunks) + 1
185            title = f'{base_title} (part {n})' if base_title else f'Section {n}'
186            chunks.append({
187                'text': '\n\n'.join(current_text),
188                'title': title,
189                'word_count': current_words,
190            })
191            current_text = [para]
192            current_words = para_words
193        else:
194            current_text.append(para)
195            current_words += para_words
196
197    if current_text:
198        n = len(chunks) + 1
199        title = f'{base_title} (part {n})' if base_title and len(chunks) > 0 else (base_title or f'Section {n}')
200        chunks.append({
201            'text': '\n\n'.join(current_text),
202            'title': title,
203            'word_count': current_words,
204        })
205
206    return chunks
207
208
209def slugify(title):
210    """Create a filename-safe slug from a title."""
211    slug = re.sub(r'[^\w\s-]', '', title.lower())
212    slug = re.sub(r'[\s]+', '_', slug).strip('_')
213    return slug[:50] if slug else 'untitled'
214
215
216def main():
217    md_files = sorted(MD_DIR.glob('*.md'))
218    print(f"Found {len(md_files)} markdown files to chunk")
219
220    total_chunks = 0
221
222    for md_path in md_files:
223        doc_slug = md_path.stem
224        doc_chunks_dir = CHUNKS_DIR / doc_slug
225        doc_chunks_dir.mkdir(parents=True, exist_ok=True)
226
227        text = md_path.read_text(encoding='utf-8')
228        meta, body = parse_frontmatter(text)
229
230        chunks = chunk_by_headings(body, meta)
231
232        # Merge very small trailing chunks
233        if len(chunks) > 1 and chunks[-1]['word_count'] < MIN_CHUNK_SIZE:
234            chunks[-2]['text'] += '\n\n' + chunks[-1]['text']
235            chunks[-2]['word_count'] += chunks[-1]['word_count']
236            chunks.pop()
237
238        for i, chunk in enumerate(chunks):
239            chunk_slug = slugify(chunk['title'])
240            chunk_filename = f"chunk_{i + 1:03d}_{chunk_slug}.md"
241            chunk_path = doc_chunks_dir / chunk_filename
242
243            frontmatter = f"""---
244source: "md/{md_path.name}"
245chunk: {i + 1}
246total_chunks: {len(chunks)}
247section: "{chunk['title']}"
248word_count: {chunk['word_count']}
249---
250
251"""
252            chunk_path.write_text(frontmatter + chunk['text'], encoding='utf-8')
253
254        print(f"  {doc_slug}: {len(chunks)} chunks")
255        total_chunks += len(chunks)
256
257    print(f"\nTotal: {total_chunks} chunks from {len(md_files)} documents")
258
259
260if __name__ == "__main__":
261    main()