Chunk Documents
Splits markdown files into ~1500-word semantic chunks for RAG/retrieval systems.
1"""Split markdown files into semantic chunks.""" 2 3import re 4import os 5from pathlib import Path 6 7BASE_DIR = Path(__file__).resolve().parent.parent 8MD_DIR = BASE_DIR / "md" 9CHUNKS_DIR = BASE_DIR / "chunks" 10 11TARGET_CHUNK_SIZE = 1500 # words 12MIN_CHUNK_SIZE = 200 # words 13 14 15def parse_frontmatter(text): 16 """Extract YAML frontmatter and body from markdown.""" 17 if text.startswith('---'): 18 end = text.find('---', 3) 19 if end > 0: 20 frontmatter = text[3:end].strip() 21 body = text[end + 3:].strip() 22 # Parse simple YAML 23 meta = {} 24 for line in frontmatter.split('\n'): 25 if ':' in line: 26 key, _, val = line.partition(':') 27 val = val.strip().strip('"').strip("'") 28 meta[key.strip()] = val 29 return meta, body 30 return {}, text 31 32 33def find_section_breaks(text): 34 """Find natural section breaks in the text.""" 35 # Look for markdown headings 36 heading_pattern = re.compile(r'^(#{1,3})\s+(.+)$', re.MULTILINE) 37 # Look for page markers 38 page_pattern = re.compile(r'^<!-- Page (\d+) -->$', re.MULTILINE) 39 # Look for horizontal rules 40 rule_pattern = re.compile(r'^---+$', re.MULTILINE) 41 42 breaks = [] 43 for m in heading_pattern.finditer(text): 44 level = len(m.group(1)) 45 breaks.append({ 46 'pos': m.start(), 47 'type': 'heading', 48 'level': level, 49 'title': m.group(2).strip(), 50 }) 51 52 for m in page_pattern.finditer(text): 53 breaks.append({ 54 'pos': m.start(), 55 'type': 'page', 56 'level': 99, 57 'title': f'Page {m.group(1)}', 58 }) 59 60 for m in rule_pattern.finditer(text): 61 breaks.append({ 62 'pos': m.start(), 63 'type': 'rule', 64 'level': 99, 65 'title': '', 66 }) 67 68 breaks.sort(key=lambda b: b['pos']) 69 return breaks 70 71 72def chunk_by_headings(text, meta): 73 """Split text into chunks based on headings, page markers, and size.""" 74 breaks = find_section_breaks(text) 75 total_words = len(text.split()) 76 77 # If we have good heading structure, split by headings 78 heading_breaks = [b for b in breaks if b['type'] == 'heading'] 79 80 if len(heading_breaks) >= 3: 81 chunks = [] 82 for i, brk in enumerate(heading_breaks): 83 start = brk['pos'] 84 end = heading_breaks[i + 1]['pos'] if i + 1 < len(heading_breaks) else len(text) 85 chunk_text = text[start:end].strip() 86 word_count = len(chunk_text.split()) 87 88 if word_count >= MIN_CHUNK_SIZE: 89 chunks.append({ 90 'text': chunk_text, 91 'title': brk['title'], 92 'word_count': word_count, 93 }) 94 elif chunks: 95 # Merge small chunks with previous 96 chunks[-1]['text'] += '\n\n' + chunk_text 97 chunks[-1]['word_count'] += word_count 98 99 # Handle text before first heading 100 pre_text = text[:heading_breaks[0]['pos']].strip() 101 if pre_text and len(pre_text.split()) >= MIN_CHUNK_SIZE: 102 chunks.insert(0, { 103 'text': pre_text, 104 'title': 'Introduction', 105 'word_count': len(pre_text.split()), 106 }) 107 108 # If chunks are still too big (>3000 words), sub-chunk them 109 final_chunks = [] 110 for chunk in chunks: 111 if chunk['word_count'] > 3000: 112 sub = chunk_by_size(chunk['text'], base_title=chunk['title']) 113 final_chunks.extend(sub) 114 else: 115 final_chunks.append(chunk) 116 return final_chunks 117 118 # For large documents without headings, split by page markers 119 page_breaks = [b for b in breaks if b['type'] == 'page'] 120 if page_breaks and total_words > TARGET_CHUNK_SIZE * 2: 121 return chunk_by_pages(text, page_breaks) 122 123 # Small documents: split by size if needed 124 if total_words > TARGET_CHUNK_SIZE * 2: 125 return chunk_by_size(text) 126 127 # Small enough to be one chunk 128 if total_words >= MIN_CHUNK_SIZE: 129 return [{'text': text, 'title': meta.get('title', 'Full Text'), 130 'word_count': total_words}] 131 return [] 132 133 134def chunk_by_pages(text, page_breaks): 135 """Group page markers into chunks of ~TARGET_CHUNK_SIZE words.""" 136 chunks = [] 137 current_text = [] 138 current_words = 0 139 current_start_page = None 140 141 for i, brk in enumerate(page_breaks): 142 start = brk['pos'] 143 end = page_breaks[i + 1]['pos'] if i + 1 < len(page_breaks) else len(text) 144 page_text = text[start:end].strip() 145 page_words = len(page_text.split()) 146 page_num = brk['title'].replace('Page ', '') 147 148 if current_start_page is None: 149 current_start_page = page_num 150 151 if current_words + page_words > TARGET_CHUNK_SIZE and current_words >= MIN_CHUNK_SIZE: 152 chunks.append({ 153 'text': '\n\n'.join(current_text), 154 'title': f'Pages {current_start_page}-{page_num}', 155 'word_count': current_words, 156 }) 157 current_text = [page_text] 158 current_words = page_words 159 current_start_page = page_num 160 else: 161 current_text.append(page_text) 162 current_words += page_words 163 164 if current_text and current_words >= MIN_CHUNK_SIZE: 165 chunks.append({ 166 'text': '\n\n'.join(current_text), 167 'title': f'Pages {current_start_page}-end', 168 'word_count': current_words, 169 }) 170 171 return chunks 172 173 174def chunk_by_size(text, base_title=None): 175 """Split text into roughly equal chunks by word count.""" 176 paragraphs = re.split(r'\n\n+', text) 177 chunks = [] 178 current_text = [] 179 current_words = 0 180 181 for para in paragraphs: 182 para_words = len(para.split()) 183 if current_words + para_words > TARGET_CHUNK_SIZE and current_words >= MIN_CHUNK_SIZE: 184 n = len(chunks) + 1 185 title = f'{base_title} (part {n})' if base_title else f'Section {n}' 186 chunks.append({ 187 'text': '\n\n'.join(current_text), 188 'title': title, 189 'word_count': current_words, 190 }) 191 current_text = [para] 192 current_words = para_words 193 else: 194 current_text.append(para) 195 current_words += para_words 196 197 if current_text: 198 n = len(chunks) + 1 199 title = f'{base_title} (part {n})' if base_title and len(chunks) > 0 else (base_title or f'Section {n}') 200 chunks.append({ 201 'text': '\n\n'.join(current_text), 202 'title': title, 203 'word_count': current_words, 204 }) 205 206 return chunks 207 208 209def slugify(title): 210 """Create a filename-safe slug from a title.""" 211 slug = re.sub(r'[^\w\s-]', '', title.lower()) 212 slug = re.sub(r'[\s]+', '_', slug).strip('_') 213 return slug[:50] if slug else 'untitled' 214 215 216def main(): 217 md_files = sorted(MD_DIR.glob('*.md')) 218 print(f"Found {len(md_files)} markdown files to chunk") 219 220 total_chunks = 0 221 222 for md_path in md_files: 223 doc_slug = md_path.stem 224 doc_chunks_dir = CHUNKS_DIR / doc_slug 225 doc_chunks_dir.mkdir(parents=True, exist_ok=True) 226 227 text = md_path.read_text(encoding='utf-8') 228 meta, body = parse_frontmatter(text) 229 230 chunks = chunk_by_headings(body, meta) 231 232 # Merge very small trailing chunks 233 if len(chunks) > 1 and chunks[-1]['word_count'] < MIN_CHUNK_SIZE: 234 chunks[-2]['text'] += '\n\n' + chunks[-1]['text'] 235 chunks[-2]['word_count'] += chunks[-1]['word_count'] 236 chunks.pop() 237 238 for i, chunk in enumerate(chunks): 239 chunk_slug = slugify(chunk['title']) 240 chunk_filename = f"chunk_{i + 1:03d}_{chunk_slug}.md" 241 chunk_path = doc_chunks_dir / chunk_filename 242 243 frontmatter = f"""--- 244source: "md/{md_path.name}" 245chunk: {i + 1} 246total_chunks: {len(chunks)} 247section: "{chunk['title']}" 248word_count: {chunk['word_count']} 249--- 250 251""" 252 chunk_path.write_text(frontmatter + chunk['text'], encoding='utf-8') 253 254 print(f" {doc_slug}: {len(chunks)} chunks") 255 total_chunks += len(chunks) 256 257 print(f"\nTotal: {total_chunks} chunks from {len(md_files)} documents") 258 259 260if __name__ == "__main__": 261 main()