schloter/extractor/extractor_yt.py

181 lines
7.0 KiB
Python

def convert_vtt_to_txt(input_dir="transcripts", output_dir="transcripts_txt"):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
vtt_files = glob.glob(os.path.join(input_dir, "*.vtt"))
for vtt_file in vtt_files:
# Extract video ID from filename (before first dot)
base = os.path.basename(vtt_file)
video_id = base.split(".")[0]
txt_path = os.path.join(output_dir, f"{video_id}.txt")
if os.path.exists(txt_path):
continue
with open(vtt_file, "r", encoding="utf-8") as f:
lines = f.readlines()
text_lines = []
seen_lines = set()
for line in lines:
line = line.strip()
# Skip empty, header, timestamp, and meta lines
if (
not line
or line.startswith("WEBVTT")
or re.match(r"\d{2}:\d{2}:\d{2}", line)
or re.match(r"\d{1,4}", line)
or line.startswith("Kind:")
or line.startswith("Language:")
):
continue
# Remove VTT markup and tags like </c>, <c>, <00:...>
line = re.sub(r"<.*?>", "", line)
# Deduplicate at line level
if line in seen_lines:
continue
seen_lines.add(line)
text_lines.append(line)
full_text = " ".join(text_lines)
sentences = split_into_sentences(full_text)
# Remove consecutive and near-consecutive duplicates (sliding window)
cleaned_sentences = []
window = 5 # Number of previous lines to check for duplicates
prev = None
count = 0
i = 0
while i < len(sentences):
s = sentences[i]
# Remove consecutive duplicates and limit long runs
if s == prev:
count += 1
else:
count = 1
if count > 2:
run_start = i - count + 1
run_end = i
run_len = run_end - run_start + 1
keep = (run_len + 1) // 2
cleaned_sentences = cleaned_sentences[:-(count-1)]
cleaned_sentences.extend([s] * keep)
while i+1 < len(sentences) and sentences[i+1] == s:
i += 1
prev = None
count = 0
i += 1
continue
# Remove near-consecutive duplicates within a sliding window
if s in cleaned_sentences[-window:]:
i += 1
prev = s
continue
cleaned_sentences.append(s)
prev = s
i += 1
with open(txt_path, "w", encoding="utf-8") as f:
for sentence in cleaned_sentences:
f.write(sentence + "\n")
print(f"Converted {vtt_file} -> {txt_path}")
import os
import subprocess
import json
import glob
import re
def get_video_ids(channel_url):
# Use yt-dlp to extract all video IDs from the channel
import sys
print("Fetching video list from channel...")
command = [
sys.executable, "-m", "yt_dlp",
"--flat-playlist",
"--dump-json",
channel_url
]
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print("Error fetching video list:", result.stderr)
return []
video_ids = []
for line in result.stdout.strip().split('\n'):
try:
data = json.loads(line)
if 'id' in data:
video_ids.append(data['id'])
except json.JSONDecodeError:
continue
return video_ids
def split_into_sentences(text):
# Basic sentence splitter using regex (handles common abbreviations poorly, but works for most subtitles)
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def download_transcripts(video_ids, output_dir="transcripts"):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
import sys
for i, video_id in enumerate(video_ids):
txt_path = os.path.join(output_dir, f"{video_id}.txt")
vtt_glob = os.path.join(output_dir, f"{video_id}*.vtt")
vtt_files = glob.glob(vtt_glob)
if os.path.exists(txt_path) or vtt_files:
print(f"[{i+1}/{len(video_ids)}] Transcript or .vtt already exists for: {video_id}, skipping.")
continue
vtt_path = os.path.join(output_dir, f"{video_id}.de.vtt")
url = f"https://www.youtube.com/watch?v={video_id}"
print(f"[{i+1}/{len(video_ids)}] Downloading subtitles for: {url}")
manual_cmd = [
sys.executable, "-m", "yt_dlp",
"--write-sub", "--sub-lang", "de", "--skip-download",
"-o", os.path.join(output_dir, f"{video_id}.%(sub_lang)s.vtt"), url
]
result = subprocess.run(manual_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if not os.path.exists(vtt_path):
# Try auto-generated if manual not found
auto_cmd = [
sys.executable, "-m", "yt_dlp",
"--write-auto-sub", "--sub-lang", "de", "--skip-download",
"-o", os.path.join(output_dir, f"{video_id}.%(sub_lang)s.vtt"), url
]
result = subprocess.run(auto_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f" ❌ Error downloading subtitles: {result.stderr}")
continue
if not os.path.exists(vtt_path):
print(" ⚠️ No subtitles found for this video.")
continue
with open(vtt_path, "r", encoding="utf-8") as vtt_file:
lines = vtt_file.readlines()
# Remove header and timestamps, keep only text
text_lines = []
for line in lines:
line = line.strip()
if not line or line.startswith("WEBVTT") or re.match(r"\d{2}:\d{2}:\d{2}", line) or re.match(r"\d{1,4}", line):
continue
text_lines.append(line)
full_text = " ".join(text_lines)
sentences = split_into_sentences(full_text)
# Write each sentence to a new line in .txt
with open(txt_path, "w", encoding="utf-8") as f:
for sentence in sentences:
f.write(sentence + "\n")
print(f" ✅ Transcript saved to {video_id}.txt")
def main():
channel_url = "https://www.youtube.com/@MichaelSchloter-k4y".strip()
video_ids = get_video_ids(channel_url)
if not video_ids:
print("No videos found or failed to fetch.")
return
print(f"Found {len(video_ids)} videos. Starting transcript download...\n")
download_transcripts(video_ids)
print("\n✅ All available transcripts downloaded.")
print("\nConverting all .vtt files to .txt files in transcripts_txt directory...")
convert_vtt_to_txt()
print("\n✅ All .vtt files converted to .txt files in transcripts_txt.")
if __name__ == "__main__":
main()