aai/prepare_data.py

315 lines
11 KiB
Python
Raw Permalink Normal View History

2026-01-26 15:19:15 +00:00
"""
📊 PREPARE_DATA - Przygotowanie danych z wielu folderów
"""
import os
import re
import json
import random
import logging
from pathlib import Path
from typing import List, Dict, Any, Generator
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import logger, cfg
# ==================== KLASY DO PRZYGOTOWANIA DANYCH ====================
class DataPreparer:
"""Główna klasa do przygotowania danych"""
def __init__(self):
self.output_file = Path(cfg.prepared_dir) / "all_data.txt"
self.metadata_file = Path(cfg.prepared_dir) / "metadata.json"
self.stats = {
"total_files": 0,
"total_samples": 0,
"total_chars": 0,
"sources": {},
"errors": []
}
def find_data_folders(self) -> List[Path]:
"""Znajduje wszystkie foldery zaczynające się od 'data_'"""
current_dir = Path(".")
data_folders = []
for item in current_dir.iterdir():
if item.is_dir() and item.name.startswith("data_"):
data_folders.append(item)
logger.info(f"📁 Znaleziono folder: {item.name}")
# Dodaj też standardowy folder 'data' jeśli istnieje
if Path("data").exists():
data_folders.append(Path("data"))
return data_folders
def process_file(self, file_path: Path) -> List[str]:
"""Przetwarza pojedynczy plik i zwraca próbki"""
samples = []
try:
if file_path.suffix == '.txt':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Różne strategie dla różnych typów plików
if "news" in file_path.name.lower() or "article" in file_path.name.lower():
# Dla newsów - podziel na akapity
paragraphs = re.split(r'\n\s*\n', content)
for para in paragraphs:
para = para.strip()
if 100 < len(para) < 5000:
samples.append(para)
elif "code" in file_path.name.lower() or "python" in file_path.name.lower():
# Dla kodu - zachowaj całe funkcje
lines = content.split('\n')
current_chunk = []
for line in lines:
line = line.strip()
if line:
current_chunk.append(line)
# Jeśli znaleziono koniec funkcji lub duży blok
if line.startswith('def ') and len(current_chunk) > 3:
samples.append('\n'.join(current_chunk))
current_chunk = []
# Dodaj pozostały chunk
if current_chunk and len('\n'.join(current_chunk)) > 50:
samples.append('\n'.join(current_chunk))
else:
# Domyślnie - podziel na linie/akapity
lines = content.split('\n')
for line in lines:
line = line.strip()
if 20 < len(line) < 1000:
samples.append(line)
elif file_path.suffix == '.json':
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
for item in data:
if isinstance(item, str):
samples.append(item)
elif isinstance(item, dict):
# Konwertuj słownik na tekst
text = ' '.join([f"{k}: {v}" for k, v in item.items()])
if len(text) > 20:
samples.append(text)
elif file_path.suffix == '.csv':
import csv
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
text = ' '.join(row.values())
if len(text) > 20:
samples.append(text)
except Exception as e:
self.stats["errors"].append(f"{file_path}: {str(e)}")
return samples
def process_folder(self, folder_path: Path) -> Dict[str, Any]:
"""Przetwarza cały folder"""
folder_stats = {
"name": folder_path.name,
"files_processed": 0,
"samples_found": 0,
"samples": []
}
# Znajdź wszystkie pliki tekstowe
file_patterns = ['*.txt', '*.json', '*.csv']
files = []
for pattern in file_patterns:
files.extend(list(folder_path.rglob(pattern)))
logger.info(f" 📂 {folder_path.name}: {len(files)} plików")
# Przetwarzaj pliki równolegle
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(self.process_file, file): file for file in files}
for future in as_completed(futures):
file = futures[future]
try:
samples = future.result()
if samples:
folder_stats["samples"].extend(samples)
folder_stats["samples_found"] += len(samples)
folder_stats["files_processed"] += 1
except Exception as e:
self.stats["errors"].append(f"{file}: {str(e)}")
return folder_stats
def prepare_all_data(self) -> None:
"""Główna funkcja przygotowania danych"""
logger.info("=" * 60)
logger.info("📊 PRZYGOTOWYWANIE DANYCH Z WSZYSTKICH FOLDERÓW")
logger.info("=" * 60)
# Znajdź foldery
data_folders = self.find_data_folders()
if not data_folders:
logger.error("❌ Nie znaleziono folderów zaczynających się od 'data_'")
return
all_samples = []
# Przetwórz każdy folder
for folder in data_folders:
logger.info(f"\n🔍 Przetwarzam folder: {folder.name}")
folder_stats = self.process_folder(folder)
if folder_stats["samples"]:
all_samples.extend(folder_stats["samples"])
self.stats["sources"][folder.name] = folder_stats["samples_found"]
logger.info(f" ✅ Znaleziono: {folder_stats['samples_found']} próbek")
logger.info(f" 📝 Przykłady:")
for sample in random.sample(folder_stats["samples"], min(3, len(folder_stats["samples"]))):
logger.info(f"{sample[:80]}...")
else:
logger.warning(f" ⚠️ Brak danych w folderze {folder.name}")
# Przetasuj i ogranicz
if all_samples:
random.shuffle(all_samples)
# Ogranicz do 1 miliona próbek (dla pamięci)
if len(all_samples) > 1000000:
all_samples = all_samples[:1000000]
logger.warning(f"⚠️ Ograniczono do 1,000,000 próbek")
# Zapisz do jednego pliku
self._save_to_file(all_samples)
# Zapisz metadane
self._save_metadata()
# Podsumowanie
self._print_summary()
else:
logger.error("❌ Nie znaleziono żadnych danych!")
def _save_to_file(self, samples: List[str]) -> None:
"""Zapisuje wszystkie dane do jednego pliku"""
logger.info(f"\n💾 Zapisuję {len(samples):,} próbek do {self.output_file}")
with open(self.output_file, 'w', encoding='utf-8') as f:
for i, sample in enumerate(samples, 1):
f.write(sample + "\n\n")
# Progress bar co 10k próbek
if i % 10000 == 0:
logger.info(f" Zapisano {i:,}/{len(samples):,} próbek")
logger.info(f"✅ Zapisano wszystkie dane do {self.output_file}")
def _save_metadata(self) -> None:
"""Zapisuje metadane"""
metadata = {
"total_samples": self.stats["total_samples"],
"total_chars": self.stats["total_chars"],
"sources": self.stats["sources"],
"created": os.path.getmtime(str(self.output_file)),
"file_size": os.path.getsize(self.output_file),
"errors": self.stats["errors"][:10] # Tylko 10 pierwszych błędów
}
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logger.info(f"📊 Metadane zapisane do {self.metadata_file}")
def _print_summary(self) -> None:
"""Wyświetla podsumowanie"""
logger.info("=" * 60)
logger.info("📈 PODSUMOWANIE PRZYGOTOWANIA DANYCH")
logger.info("=" * 60)
total_samples = self.stats["total_samples"]
total_chars_mb = self.stats["total_chars"] / (1024 * 1024)
logger.info(f"📊 STATYSTYKI:")
logger.info(f" • Całkowite próbki: {total_samples:,}")
logger.info(f" • Rozmiar danych: {total_chars_mb:.1f} MB")
logger.info(f" • Źródła danych: {len(self.stats['sources'])}")
logger.info(f"\n📁 ŹRÓDŁA:")
for source, count in self.stats["sources"].items():
logger.info(f"{source}: {count:,} próbek")
if self.stats["errors"]:
logger.warning(f"\n⚠️ BŁĘDY ({len(self.stats['errors'])}):")
for error in self.stats["errors"][:5]:
logger.warning(f"{error}")
logger.info(f"\n💾 WYJŚCIE:")
logger.info(f" • Dane: {self.output_file}")
logger.info(f" • Metadane: {self.metadata_file}")
logger.info("\n🎮 UŻYCIE:")
logger.info(" python main.py --train # Trening na przygotowanych danych")
logger.info(" python main.py --prepare # Ponowne przygotowanie danych")
logger.info("=" * 60)
# ==================== FUNKCJE POMOCNICZE ====================
def clean_text(text: str) -> str:
"""Czyści tekst"""
# Usuń nadmiarowe białe znaki
text = re.sub(r'\s+', ' ', text)
# Usuń specjalne znaki (opcjonalnie)
# text = re.sub(r'[^\w\s.,!?;:()\-\'"ąćęłńóśźżĄĆĘŁŃÓŚŹŻ]', '', text)
return text.strip()
def split_into_chunks(text: str, max_chunk_size: int = 1000) -> List[str]:
"""Dzieli długi tekst na kawałki"""
words = text.split()
chunks = []
current_chunk = []
current_size = 0
for word in words:
word_size = len(word) + 1 # +1 dla spacji
if current_size + word_size > max_chunk_size and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [word]
current_size = word_size
else:
current_chunk.append(word)
current_size += word_size
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
# ==================== GŁÓWNA FUNKCJA ====================
def main():
"""Główna funkcja przygotowania danych"""
preparer = DataPreparer()
preparer.prepare_all_data()
if __name__ == "__main__":
main()