schloter/api/main.py

141 lines
4.3 KiB
Python

import sqlite3
import random
from fastapi import FastAPI, HTTPException, Depends
from typing import List, Optional
from threading import Lock
from datetime import datetime, timedelta
import os
app = FastAPI()
DB_PATH = './database/schloter.API.db'
lock = Lock()
# Create allowed_tokens.txt with example keys if it doesn't exist
def ensure_allowed_tokens_file(path='allowed_tokens.txt'):
if not os.path.exists(path):
with open(path, 'w') as f:
f.write('examplekey1\nexamplekey2\n')
print(f"Created {path} with example keys.")
ensure_allowed_tokens_file('allowed_tokens.txt')
# Database initialization
def init_db():
import os
db_dir = os.path.dirname(DB_PATH)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS quotes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
quote TEXT NOT NULL,
lastUse TEXT
)
''')
conn.commit()
conn.close()
init_db()
# Static token authentication
def load_allowed_tokens(path='allowed_tokens.txt'):
try:
with open(path, 'r') as f:
return set(line.strip() for line in f if line.strip())
except Exception:
return set()
ALLOWED_TOKENS = load_allowed_tokens('allowed_tokens.txt')
def require_token(authorization: Optional[str] = Depends(lambda: None)):
from fastapi import Request
def _require_token(request: Request):
auth = request.headers.get("authorization")
if not auth or not auth.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid token")
token = auth[7:].strip()
if token not in ALLOWED_TOKENS:
raise HTTPException(status_code=403, detail="Forbidden: Invalid token")
return _require_token
@app.get('/quotes/random')
def get_random_quote(token_check: None = Depends(require_token())):
with lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get all quotes
cursor.execute('SELECT id, quote, lastUse FROM quotes')
rows = cursor.fetchall()
if not rows:
conn.close()
return {'error': 'No quotes found'}
# Calculate cutoff date (20 days ago)
cutoff = (datetime.now() - timedelta(days=20)).date()
# Filter quotes not used in last 20 days
available = [row for row in rows if not row[2] or row[2] < str(cutoff)]
if not available:
# If all quotes are recent, allow any
available = rows
chosen = random.choice(available)
# Update lastUse for chosen quote
cursor.execute('UPDATE quotes SET lastUse = ? WHERE id = ?', (str(datetime.now().date()), chosen[0]))
conn.commit()
conn.close()
return {'id': chosen[0], 'quote': chosen[1]}
# List all quotes
@app.get('/quotes/list/all')
def list_quotes(token_check: None = Depends(require_token())):
with lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT id, quote, lastUse FROM quotes')
rows = cursor.fetchall()
conn.close()
return [{'id': row[0], 'quote': row[1], 'lastUse': row[2]} for row in rows]
# List quote by specific ID
@app.get('/quotes/list/{id}')
def get_quote_by_id(id: int, token_check: None = Depends(require_token())):
with lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT id, quote, lastUse FROM quotes WHERE id = ?', (id,))
row = cursor.fetchone()
conn.close()
if not row:
raise HTTPException(status_code=404, detail='Quote not found')
return {'id': row[0], 'quote': row[1], 'lastUse': row[2]}
# Add a new quote
@app.post('/quotes/add')
def add_quote(quote: str, token_check: None = Depends(require_token())):
with lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('INSERT INTO quotes (quote, lastUse) VALUES (?, NULL)', (quote,))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return {'id': new_id, 'quote': quote}
# Remove a quote by ID
@app.delete('/quotes/remove/{id}')
def remove_quote(id: int, token_check: None = Depends(require_token())):
with lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('DELETE FROM quotes WHERE id = ?', (id,))
conn.commit()
affected = cursor.rowcount
conn.close()
if affected == 0:
raise HTTPException(status_code=404, detail='Quote not found')
return {'status': 'deleted', 'id': id}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)