def convert_vtt_to_txt(input_dir="transcripts", output_dir="transcripts_txt"): if not os.path.exists(output_dir): os.makedirs(output_dir) vtt_files = glob.glob(os.path.join(input_dir, "*.vtt")) for vtt_file in vtt_files: # Extract video ID from filename (before first dot) base = os.path.basename(vtt_file) video_id = base.split(".")[0] txt_path = os.path.join(output_dir, f"{video_id}.txt") if os.path.exists(txt_path): continue with open(vtt_file, "r", encoding="utf-8") as f: lines = f.readlines() text_lines = [] seen_lines = set() for line in lines: line = line.strip() # Skip empty, header, timestamp, and meta lines if ( not line or line.startswith("WEBVTT") or re.match(r"\d{2}:\d{2}:\d{2}", line) or re.match(r"\d{1,4}", line) or line.startswith("Kind:") or line.startswith("Language:") ): continue # Remove VTT markup and tags like , , <00:...> line = re.sub(r"<.*?>", "", line) # Deduplicate at line level if line in seen_lines: continue seen_lines.add(line) text_lines.append(line) full_text = " ".join(text_lines) sentences = split_into_sentences(full_text) # Remove consecutive and near-consecutive duplicates (sliding window) cleaned_sentences = [] window = 5 # Number of previous lines to check for duplicates prev = None count = 0 i = 0 while i < len(sentences): s = sentences[i] # Remove consecutive duplicates and limit long runs if s == prev: count += 1 else: count = 1 if count > 2: run_start = i - count + 1 run_end = i run_len = run_end - run_start + 1 keep = (run_len + 1) // 2 cleaned_sentences = cleaned_sentences[:-(count-1)] cleaned_sentences.extend([s] * keep) while i+1 < len(sentences) and sentences[i+1] == s: i += 1 prev = None count = 0 i += 1 continue # Remove near-consecutive duplicates within a sliding window if s in cleaned_sentences[-window:]: i += 1 prev = s continue cleaned_sentences.append(s) prev = s i += 1 with open(txt_path, "w", encoding="utf-8") as f: for sentence in cleaned_sentences: f.write(sentence + "\n") print(f"Converted {vtt_file} -> {txt_path}") import os import subprocess import json import glob import re def get_video_ids(channel_url): # Use yt-dlp to extract all video IDs from the channel import sys print("Fetching video list from channel...") command = [ sys.executable, "-m", "yt_dlp", "--flat-playlist", "--dump-json", channel_url ] result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: print("Error fetching video list:", result.stderr) return [] video_ids = [] for line in result.stdout.strip().split('\n'): try: data = json.loads(line) if 'id' in data: video_ids.append(data['id']) except json.JSONDecodeError: continue return video_ids def split_into_sentences(text): # Basic sentence splitter using regex (handles common abbreviations poorly, but works for most subtitles) sentences = re.split(r'(?<=[.!?])\s+', text) return [s.strip() for s in sentences if s.strip()] def download_transcripts(video_ids, output_dir="transcripts"): if not os.path.exists(output_dir): os.makedirs(output_dir) import sys for i, video_id in enumerate(video_ids): txt_path = os.path.join(output_dir, f"{video_id}.txt") vtt_glob = os.path.join(output_dir, f"{video_id}*.vtt") vtt_files = glob.glob(vtt_glob) if os.path.exists(txt_path) or vtt_files: print(f"[{i+1}/{len(video_ids)}] Transcript or .vtt already exists for: {video_id}, skipping.") continue vtt_path = os.path.join(output_dir, f"{video_id}.de.vtt") url = f"https://www.youtube.com/watch?v={video_id}" print(f"[{i+1}/{len(video_ids)}] Downloading subtitles for: {url}") manual_cmd = [ sys.executable, "-m", "yt_dlp", "--write-sub", "--sub-lang", "de", "--skip-download", "-o", os.path.join(output_dir, f"{video_id}.%(sub_lang)s.vtt"), url ] result = subprocess.run(manual_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if not os.path.exists(vtt_path): # Try auto-generated if manual not found auto_cmd = [ sys.executable, "-m", "yt_dlp", "--write-auto-sub", "--sub-lang", "de", "--skip-download", "-o", os.path.join(output_dir, f"{video_id}.%(sub_lang)s.vtt"), url ] result = subprocess.run(auto_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: print(f" ❌ Error downloading subtitles: {result.stderr}") continue if not os.path.exists(vtt_path): print(" ⚠️ No subtitles found for this video.") continue with open(vtt_path, "r", encoding="utf-8") as vtt_file: lines = vtt_file.readlines() # Remove header and timestamps, keep only text text_lines = [] for line in lines: line = line.strip() if not line or line.startswith("WEBVTT") or re.match(r"\d{2}:\d{2}:\d{2}", line) or re.match(r"\d{1,4}", line): continue text_lines.append(line) full_text = " ".join(text_lines) sentences = split_into_sentences(full_text) # Write each sentence to a new line in .txt with open(txt_path, "w", encoding="utf-8") as f: for sentence in sentences: f.write(sentence + "\n") print(f" ✅ Transcript saved to {video_id}.txt") def main(): channel_url = "https://www.youtube.com/@MichaelSchloter-k4y".strip() video_ids = get_video_ids(channel_url) if not video_ids: print("No videos found or failed to fetch.") return print(f"Found {len(video_ids)} videos. Starting transcript download...\n") download_transcripts(video_ids) print("\n✅ All available transcripts downloaded.") print("\nConverting all .vtt files to .txt files in transcripts_txt directory...") convert_vtt_to_txt() print("\n✅ All .vtt files converted to .txt files in transcripts_txt.") if __name__ == "__main__": main()