ssctopper/server.py

493 lines
20 KiB
Python

#!/usr/bin/env python3
"""
SSCTopper Web Application - Zero-dependency Python web server.
Serves the SSC CGL question bank with syllabus browser and practice interface.
"""
import http.server
import json
import sqlite3
import os
import sys
import urllib.parse
import hashlib
import uuid
import http.cookies
import urllib.request
import signal
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
from db.init import DB_PATH, init_db, get_db
PORT = 8080
ROOT = os.path.dirname(os.path.abspath(__file__))
GOOGLE_CLIENT_ID = "273072123939-dd82h4o1rt3k7811sri6qgsig73b3916.apps.googleusercontent.com"
class SSCHandler(http.server.BaseHTTPRequestHandler):
"""HTTP request handler for SSCTopper."""
sessions = {} # session_id -> user_id
def get_user_id(self):
cookie_header = self.headers.get('Cookie')
if not cookie_header:
return None
cookie = http.cookies.SimpleCookie(cookie_header)
session_id = cookie.get('session_id')
if not session_id:
return None
return self.sessions.get(session_id.value)
def do_POST(self):
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
try:
data = json.loads(post_data)
except:
data = {}
if path == '/api/auth/signup':
self.handle_signup(data)
elif path == '/api/auth/login':
self.handle_login(data)
elif path == '/api/auth/google':
self.handle_google_login(data)
elif path == '/api/user/progress':
self.handle_progress(data)
else:
self.send_error(404)
def handle_google_login(self, data):
id_token = data.get('id_token')
if not id_token:
return self.json_response({'error': 'Missing ID token'}, 400)
# Verify token with Google API (Zero-dependency way)
try:
url = f"https://oauth2.googleapis.com/tokeninfo?id_token={id_token}"
with urllib.request.urlopen(url) as response:
google_data = json.loads(response.read().decode())
# Check for error in Google response
if 'error_description' in google_data:
return self.json_response({'error': google_data['error_description']}, 401)
# Security check: Verify audience (aud) matches our Client ID
aud = google_data.get('aud')
if aud != GOOGLE_CLIENT_ID:
return self.json_response({'error': 'Token was not issued for this application'}, 401)
email = google_data.get('email')
name = google_data.get('name', email.split('@')[0])
if not email:
return self.json_response({'error': 'Email not provided by Google'}, 400)
conn = get_db()
user = conn.execute("SELECT id, username FROM users WHERE email=?", (email,)).fetchone()
if not user:
# Create new user with random password (cannot be guessed)
random_pass = str(uuid.uuid4())
pass_hash = hashlib.sha256(random_pass.encode()).hexdigest()
try:
# Use email handle as username if possible, otherwise use full email
username = email.split('@')[0]
# Ensure username uniqueness (this is simple, could be better)
cursor = conn.execute("INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, pass_hash))
user_id = cursor.lastrowid
conn.commit()
username_final = username
except sqlite3.IntegrityError:
# Fallback to email as username
cursor = conn.execute("INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(email, email, pass_hash))
user_id = cursor.lastrowid
conn.commit()
username_final = email
else:
user_id = user[0]
username_final = user[1]
conn.close()
# Set session
session_id = str(uuid.uuid4())
self.sessions[session_id] = user_id
cookie = http.cookies.SimpleCookie()
cookie['session_id'] = session_id
cookie['session_id']['path'] = '/'
cookie['session_id']['httponly'] = True
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Set-Cookie', cookie.output(header=''))
self.end_headers()
self.wfile.write(json.dumps({'success': True, 'username': username_final}).encode())
except Exception as e:
print(f"Google Auth Error: {e}")
self.json_response({'error': 'Failed to verify Google account'}, 500)
def handle_signup(self, data):
username = data.get('username')
email = data.get('email')
password = data.get('password')
if not all([username, email, password]):
return self.json_response({'error': 'Missing fields'}, 400)
password_hash = hashlib.sha256(password.encode()).hexdigest()
conn = get_db()
try:
conn.execute("INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash))
conn.commit()
self.json_response({'success': True})
except sqlite3.IntegrityError:
self.json_response({'error': 'Username or email already exists'}, 400)
finally:
conn.close()
def handle_login(self, data):
username = data.get('username')
password = data.get('password')
if not all([username, password]):
return self.json_response({'error': 'Missing fields'}, 400)
password_hash = hashlib.sha256(password.encode()).hexdigest()
conn = get_db()
user = conn.execute("SELECT id FROM users WHERE username=? AND password_hash=?",
(username, password_hash)).fetchone()
conn.close()
if user:
session_id = str(uuid.uuid4())
self.sessions[session_id] = user[0]
cookie = http.cookies.SimpleCookie()
cookie['session_id'] = session_id
cookie['session_id']['path'] = '/'
cookie['session_id']['httponly'] = True
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Set-Cookie', cookie.output(header=''))
self.end_headers()
self.wfile.write(json.dumps({'success': True, 'username': username}).encode())
else:
self.json_response({'error': 'Invalid credentials'}, 401)
def handle_progress(self, data):
user_id = self.get_user_id()
if not user_id:
return self.json_response({'error': 'Unauthorized'}, 401)
question_id = data.get('question_id')
is_correct = data.get('is_correct')
time_taken = data.get('time_taken', 0.0)
if question_id is None or is_correct is None:
return self.json_response({'error': 'Missing fields'}, 400)
conn = get_db()
conn.execute("INSERT INTO user_answers (user_id, question_id, is_correct, time_taken) VALUES (?, ?, ?, ?)",
(user_id, int(question_id), bool(is_correct), float(time_taken)))
conn.commit()
conn.close()
self.json_response({'success': True})
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
path = parsed.path
params = urllib.parse.parse_qs(parsed.query)
if path == '/' or path == '/index.html':
self.serve_html()
elif path == '/robots.txt':
self.serve_static('robots.txt', 'text/plain')
elif path == '/sitemap.xml':
self.serve_static('sitemap.xml', 'application/xml')
elif path == '/api/syllabus':
self.api_syllabus()
elif path == '/api/questions':
self.api_questions(params)
elif path == '/api/stats':
self.api_stats()
elif path == '/api/mock-test':
self.api_mock_test(params)
elif path == '/api/user/profile':
self.api_user_profile()
else:
self.send_error(404)
def api_user_profile(self):
user_id = self.get_user_id()
if not user_id:
return self.json_response({'error': 'Unauthorized'}, 401)
# Parse timeframe
parsed = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed.query)
timeframe = params.get('timeframe', ['overall'])[0]
time_filter = ""
if timeframe == 'daily':
time_filter = "AND ua.answered_at >= datetime('now', '-1 day')"
elif timeframe == 'weekly':
time_filter = "AND ua.answered_at >= datetime('now', '-7 days')"
elif timeframe == 'monthly':
time_filter = "AND ua.answered_at >= datetime('now', 'start of month')"
conn = get_db()
user = conn.execute("SELECT username, email, created_at FROM users WHERE id=?", (user_id,)).fetchone()
# Get overall stats for timeframe
stats_query = f"SELECT COUNT(*), SUM(CASE WHEN is_correct=1 THEN 1 ELSE 0 END), AVG(time_taken) FROM user_answers ua WHERE user_id=? {time_filter}"
stats_row = conn.execute(stats_query, (user_id,)).fetchone()
total_attempts = stats_row[0] or 0
correct_attempts = stats_row[1] or 0
avg_time_overall = round(stats_row[2] or 0, 1)
# Topic-wise progress with time tracking
topic_progress = []
rows = conn.execute(f"""
SELECT t.id, t.name, st.name as subtopic, s.name as subject,
COUNT(DISTINCT q.id) as total_questions,
COUNT(DISTINCT ua.question_id) as answered_questions,
AVG(ua.time_taken) as avg_time
FROM topics t
JOIN subtopics st ON t.subtopic_id = st.id
JOIN subjects s ON st.subject_id = s.id
LEFT JOIN question_types qt ON qt.topic_id = t.id
LEFT JOIN questions q ON q.question_type_id = qt.id
LEFT JOIN user_answers ua ON ua.question_id = q.id AND ua.user_id = ? AND ua.is_correct = 1 {time_filter}
GROUP BY t.id
""", (user_id,)).fetchall()
for r in rows:
topic_progress.append({
'topic_id': r[0], 'topic': r[1], 'subtopic': r[2], 'subject': r[3],
'total': r[4], 'answered': r[5],
'percent': round(r[5] * 100 / r[4], 1) if r[4] > 0 else 0,
'avg_time': round(r[6] or 0, 1)
})
conn.close()
self.json_response({
'username': user[0],
'email': user[1],
'joined': user[2],
'stats': {
'total_attempts': total_attempts,
'correct_attempts': correct_attempts,
'accuracy': round(correct_attempts * 100 / total_attempts, 1) if total_attempts > 0 else 0,
'avg_time': avg_time_overall
},
'topic_progress': topic_progress
})
def serve_html(self):
html_path = os.path.join(ROOT, 'static', 'index.html')
with open(html_path, 'r') as f:
content = f.read()
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(content.encode('utf-8'))
def serve_static(self, filename, content_type):
file_path = os.path.join(ROOT, 'static', filename)
if not os.path.exists(file_path):
self.send_error(404)
return
with open(file_path, 'rb') as f:
content = f.read()
self.send_response(200)
self.send_header('Content-Type', content_type)
self.end_headers()
self.wfile.write(content)
def json_response(self, data, status=200):
self.send_response(status)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
def api_syllabus(self):
conn = get_db()
subjects = []
for s in conn.execute("SELECT id, name, tier, description, target_questions FROM subjects").fetchall():
subject = {'id': s[0], 'name': s[1], 'tier': s[2], 'description': s[3], 'target': s[4], 'subtopics': []}
for st in conn.execute("SELECT id, name, description FROM subtopics WHERE subject_id=?", (s[0],)).fetchall():
subtopic = {'id': st[0], 'name': st[1], 'description': st[2], 'topics': []}
for t in conn.execute("SELECT id, name, description FROM topics WHERE subtopic_id=?", (st[0],)).fetchall():
q_count = conn.execute("SELECT COUNT(*) FROM questions q JOIN question_types qt ON q.question_type_id=qt.id WHERE qt.topic_id=?", (t[0],)).fetchone()[0]
qtypes = [{'id': qt[0], 'name': qt[1]} for qt in conn.execute("SELECT id, name FROM question_types WHERE topic_id=?", (t[0],)).fetchall()]
subtopic['topics'].append({'id': t[0], 'name': t[1], 'description': t[2], 'question_count': q_count, 'question_types': qtypes})
subject['subtopics'].append(subtopic)
# Count total questions for subject
subject['question_count'] = conn.execute("""
SELECT COUNT(*) FROM questions q
JOIN question_types qt ON q.question_type_id = qt.id
JOIN topics t ON qt.topic_id = t.id
JOIN subtopics st ON t.subtopic_id = st.id
WHERE st.subject_id = ?
""", (s[0],)).fetchone()[0]
subjects.append(subject)
conn.close()
self.json_response(subjects)
def api_questions(self, params):
conn = get_db()
topic_id = params.get('topic_id', [None])[0]
qtype_id = params.get('qtype_id', [None])[0]
subject_id = params.get('subject_id', [None])[0]
difficulty = params.get('difficulty', [None])[0]
limit = int(params.get('limit', ['20'])[0])
offset = int(params.get('offset', ['0'])[0])
query = """SELECT q.id, q.question_text, q.option_a, q.option_b, q.option_c, q.option_d,
q.correct_option, q.explanation, q.difficulty,
qt.name as qtype_name, t.name as topic_name, st.name as subtopic_name, s.name as subject_name
FROM questions q
JOIN question_types qt ON q.question_type_id = qt.id
JOIN topics t ON qt.topic_id = t.id
JOIN subtopics st ON t.subtopic_id = st.id
JOIN subjects s ON st.subject_id = s.id
WHERE 1=1"""
args = []
if topic_id:
query += " AND t.id = ?"
args.append(int(topic_id))
if qtype_id:
query += " AND qt.id = ?"
args.append(int(qtype_id))
if subject_id:
query += " AND s.id = ?"
args.append(int(subject_id))
if difficulty:
query += " AND q.difficulty = ?"
args.append(int(difficulty))
# Get total count
count_query = query.replace("SELECT q.id, q.question_text, q.option_a, q.option_b, q.option_c, q.option_d,\n q.correct_option, q.explanation, q.difficulty,\n qt.name as qtype_name, t.name as topic_name, st.name as subtopic_name, s.name as subject_name", "SELECT COUNT(*)")
total = conn.execute(count_query, args).fetchone()[0]
query += " ORDER BY RANDOM() LIMIT ? OFFSET ?"
args.extend([limit, offset])
rows = conn.execute(query, args).fetchall()
questions = []
for r in rows:
questions.append({
'id': r[0], 'question_text': r[1],
'options': {'A': r[2], 'B': r[3], 'C': r[4], 'D': r[5]},
'correct_option': r[6], 'explanation': r[7], 'difficulty': r[8],
'qtype': r[9], 'topic': r[10], 'subtopic': r[11], 'subject': r[12]
})
conn.close()
self.json_response({'total': total, 'questions': questions})
def api_stats(self):
conn = get_db()
stats = {}
rows = conn.execute("""
SELECT s.name, COUNT(q.id) FROM subjects s
LEFT JOIN subtopics st ON st.subject_id = s.id
LEFT JOIN topics t ON t.subtopic_id = st.id
LEFT JOIN question_types qt ON qt.topic_id = t.id
LEFT JOIN questions q ON q.question_type_id = qt.id
GROUP BY s.id
""").fetchall()
for r in rows:
stats[r[0]] = r[1]
total = conn.execute("SELECT COUNT(*) FROM questions").fetchone()[0]
stats['total'] = total
topic_count = conn.execute("SELECT COUNT(*) FROM topics").fetchone()[0]
stats['topic_count'] = topic_count
stats['subject_count'] = conn.execute("SELECT COUNT(*) FROM subjects").fetchone()[0]
conn.close()
self.json_response(stats)
def api_mock_test(self, params):
conn = get_db()
subject_id = params.get('subject_id', [None])[0]
num = int(params.get('num', ['25'])[0])
query = """SELECT q.id, q.question_text, q.option_a, q.option_b, q.option_c, q.option_d,
q.correct_option, q.explanation, q.difficulty,
qt.name, t.name, st.name, s.name
FROM questions q
JOIN question_types qt ON q.question_type_id = qt.id
JOIN topics t ON qt.topic_id = t.id
JOIN subtopics st ON t.subtopic_id = st.id
JOIN subjects s ON st.subject_id = s.id"""
args = []
if subject_id:
query += " WHERE s.id = ?"
args.append(int(subject_id))
query += " ORDER BY RANDOM() LIMIT ?"
args.append(num)
rows = conn.execute(query, args).fetchall()
questions = []
for r in rows:
questions.append({
'id': r[0], 'question_text': r[1],
'options': {'A': r[2], 'B': r[3], 'C': r[4], 'D': r[5]},
'correct_option': r[6], 'explanation': r[7], 'difficulty': r[8],
'qtype': r[9], 'topic': r[10], 'subtopic': r[11], 'subject': r[12]
})
conn.close()
self.json_response({'questions': questions, 'total': len(questions)})
def log_message(self, format, *args):
pass # Suppress access logs
def main():
# Initialize DB if needed
if not os.path.exists(DB_PATH):
print("Database not found. Running generation pipeline...")
init_db()
# Check if we have questions, if not, generate some
conn = get_db()
try:
count = conn.execute("SELECT COUNT(*) FROM questions").fetchone()[0]
if count == 0:
print("Database is empty. Generating question bank...")
from generators.run_all import main as generate
generate()
except Exception as e:
print(f"Error checking question count: {e}")
finally:
conn.close()
server = http.server.HTTPServer(('0.0.0.0', PORT), SSCHandler)
print(f"\n🚀 SSCTopper running at http://localhost:{PORT}")
print(f" Database: {DB_PATH}")
print(f" Press Ctrl+C to stop\n")
signal.signal(signal.SIGINT, lambda s, f: (server.shutdown(), sys.exit(0)))
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
if __name__ == '__main__':
main()