#!/usr/bin/env python3 """ SSCTopper Web Application - Zero-dependency Python web server. Serves the SSC CGL question bank with syllabus browser and practice interface. """ import http.server import json import sqlite3 import os import sys import urllib.parse import hashlib import uuid import http.cookies import urllib.request import signal sys.path.insert(0, os.path.join(os.path.dirname(__file__))) from db.init import DB_PATH, init_db, get_db PORT = 8080 ROOT = os.path.dirname(os.path.abspath(__file__)) GOOGLE_CLIENT_ID = "273072123939-dd82h4o1rt3k7811sri6qgsig73b3916.apps.googleusercontent.com" class SSCHandler(http.server.BaseHTTPRequestHandler): """HTTP request handler for SSCTopper.""" sessions = {} # session_id -> user_id def get_user_id(self): cookie_header = self.headers.get('Cookie') if not cookie_header: return None cookie = http.cookies.SimpleCookie(cookie_header) session_id = cookie.get('session_id') if not session_id: return None return self.sessions.get(session_id.value) def do_POST(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') try: data = json.loads(post_data) except: data = {} if path == '/api/auth/signup': self.handle_signup(data) elif path == '/api/auth/login': self.handle_login(data) elif path == '/api/auth/google': self.handle_google_login(data) elif path == '/api/user/progress': self.handle_progress(data) else: self.send_error(404) def handle_google_login(self, data): id_token = data.get('id_token') if not id_token: return self.json_response({'error': 'Missing ID token'}, 400) # Verify token with Google API (Zero-dependency way) try: url = f"https://oauth2.googleapis.com/tokeninfo?id_token={id_token}" with urllib.request.urlopen(url) as response: google_data = json.loads(response.read().decode()) # Check for error in Google response if 'error_description' in google_data: return self.json_response({'error': google_data['error_description']}, 401) # Security check: Verify audience (aud) matches our Client ID aud = google_data.get('aud') if aud != GOOGLE_CLIENT_ID: return self.json_response({'error': 'Token was not issued for this application'}, 401) email = google_data.get('email') name = google_data.get('name', email.split('@')[0]) if not email: return self.json_response({'error': 'Email not provided by Google'}, 400) conn = get_db() user = conn.execute("SELECT id, username FROM users WHERE email=?", (email,)).fetchone() if not user: # Create new user with random password (cannot be guessed) random_pass = str(uuid.uuid4()) pass_hash = hashlib.sha256(random_pass.encode()).hexdigest() try: # Use email handle as username if possible, otherwise use full email username = email.split('@')[0] # Ensure username uniqueness (this is simple, could be better) cursor = conn.execute("INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", (username, email, pass_hash)) user_id = cursor.lastrowid conn.commit() username_final = username except sqlite3.IntegrityError: # Fallback to email as username cursor = conn.execute("INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", (email, email, pass_hash)) user_id = cursor.lastrowid conn.commit() username_final = email else: user_id = user[0] username_final = user[1] conn.close() # Set session session_id = str(uuid.uuid4()) self.sessions[session_id] = user_id cookie = http.cookies.SimpleCookie() cookie['session_id'] = session_id cookie['session_id']['path'] = '/' cookie['session_id']['httponly'] = True self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Set-Cookie', cookie.output(header='')) self.end_headers() self.wfile.write(json.dumps({'success': True, 'username': username_final}).encode()) except Exception as e: print(f"Google Auth Error: {e}") self.json_response({'error': 'Failed to verify Google account'}, 500) def handle_signup(self, data): username = data.get('username') email = data.get('email') password = data.get('password') if not all([username, email, password]): return self.json_response({'error': 'Missing fields'}, 400) password_hash = hashlib.sha256(password.encode()).hexdigest() conn = get_db() try: conn.execute("INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", (username, email, password_hash)) conn.commit() self.json_response({'success': True}) except sqlite3.IntegrityError: self.json_response({'error': 'Username or email already exists'}, 400) finally: conn.close() def handle_login(self, data): username = data.get('username') password = data.get('password') if not all([username, password]): return self.json_response({'error': 'Missing fields'}, 400) password_hash = hashlib.sha256(password.encode()).hexdigest() conn = get_db() user = conn.execute("SELECT id FROM users WHERE username=? AND password_hash=?", (username, password_hash)).fetchone() conn.close() if user: session_id = str(uuid.uuid4()) self.sessions[session_id] = user[0] cookie = http.cookies.SimpleCookie() cookie['session_id'] = session_id cookie['session_id']['path'] = '/' cookie['session_id']['httponly'] = True self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Set-Cookie', cookie.output(header='')) self.end_headers() self.wfile.write(json.dumps({'success': True, 'username': username}).encode()) else: self.json_response({'error': 'Invalid credentials'}, 401) def handle_progress(self, data): user_id = self.get_user_id() if not user_id: return self.json_response({'error': 'Unauthorized'}, 401) question_id = data.get('question_id') is_correct = data.get('is_correct') time_taken = data.get('time_taken', 0.0) if question_id is None or is_correct is None: return self.json_response({'error': 'Missing fields'}, 400) conn = get_db() conn.execute("INSERT INTO user_answers (user_id, question_id, is_correct, time_taken) VALUES (?, ?, ?, ?)", (user_id, int(question_id), bool(is_correct), float(time_taken))) conn.commit() conn.close() self.json_response({'success': True}) def do_GET(self): parsed = urllib.parse.urlparse(self.path) path = parsed.path params = urllib.parse.parse_qs(parsed.query) if path == '/' or path == '/index.html': self.serve_html() elif path == '/robots.txt': self.serve_static('robots.txt', 'text/plain') elif path == '/sitemap.xml': self.serve_static('sitemap.xml', 'application/xml') elif path == '/api/syllabus': self.api_syllabus() elif path == '/api/questions': self.api_questions(params) elif path == '/api/stats': self.api_stats() elif path == '/api/mock-test': self.api_mock_test(params) elif path == '/api/user/profile': self.api_user_profile() else: self.send_error(404) def api_user_profile(self): user_id = self.get_user_id() if not user_id: return self.json_response({'error': 'Unauthorized'}, 401) # Parse timeframe parsed = urllib.parse.urlparse(self.path) params = urllib.parse.parse_qs(parsed.query) timeframe = params.get('timeframe', ['overall'])[0] time_filter = "" if timeframe == 'daily': time_filter = "AND ua.answered_at >= datetime('now', '-1 day')" elif timeframe == 'weekly': time_filter = "AND ua.answered_at >= datetime('now', '-7 days')" elif timeframe == 'monthly': time_filter = "AND ua.answered_at >= datetime('now', 'start of month')" conn = get_db() user = conn.execute("SELECT username, email, created_at FROM users WHERE id=?", (user_id,)).fetchone() # Get overall stats for timeframe stats_query = f"SELECT COUNT(*), SUM(CASE WHEN is_correct=1 THEN 1 ELSE 0 END), AVG(time_taken) FROM user_answers ua WHERE user_id=? {time_filter}" stats_row = conn.execute(stats_query, (user_id,)).fetchone() total_attempts = stats_row[0] or 0 correct_attempts = stats_row[1] or 0 avg_time_overall = round(stats_row[2] or 0, 1) # Topic-wise progress with time tracking topic_progress = [] rows = conn.execute(f""" SELECT t.id, t.name, st.name as subtopic, s.name as subject, COUNT(DISTINCT q.id) as total_questions, COUNT(DISTINCT ua.question_id) as answered_questions, AVG(ua.time_taken) as avg_time FROM topics t JOIN subtopics st ON t.subtopic_id = st.id JOIN subjects s ON st.subject_id = s.id LEFT JOIN question_types qt ON qt.topic_id = t.id LEFT JOIN questions q ON q.question_type_id = qt.id LEFT JOIN user_answers ua ON ua.question_id = q.id AND ua.user_id = ? AND ua.is_correct = 1 {time_filter} GROUP BY t.id """, (user_id,)).fetchall() for r in rows: topic_progress.append({ 'topic_id': r[0], 'topic': r[1], 'subtopic': r[2], 'subject': r[3], 'total': r[4], 'answered': r[5], 'percent': round(r[5] * 100 / r[4], 1) if r[4] > 0 else 0, 'avg_time': round(r[6] or 0, 1) }) conn.close() self.json_response({ 'username': user[0], 'email': user[1], 'joined': user[2], 'stats': { 'total_attempts': total_attempts, 'correct_attempts': correct_attempts, 'accuracy': round(correct_attempts * 100 / total_attempts, 1) if total_attempts > 0 else 0, 'avg_time': avg_time_overall }, 'topic_progress': topic_progress }) def serve_html(self): html_path = os.path.join(ROOT, 'static', 'index.html') with open(html_path, 'r') as f: content = f.read() self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.end_headers() self.wfile.write(content.encode('utf-8')) def serve_static(self, filename, content_type): file_path = os.path.join(ROOT, 'static', filename) if not os.path.exists(file_path): self.send_error(404) return with open(file_path, 'rb') as f: content = f.read() self.send_response(200) self.send_header('Content-Type', content_type) self.end_headers() self.wfile.write(content) def json_response(self, data, status=200): self.send_response(status) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8')) def api_syllabus(self): conn = get_db() subjects = [] for s in conn.execute("SELECT id, name, tier, description, target_questions FROM subjects").fetchall(): subject = {'id': s[0], 'name': s[1], 'tier': s[2], 'description': s[3], 'target': s[4], 'subtopics': []} for st in conn.execute("SELECT id, name, description FROM subtopics WHERE subject_id=?", (s[0],)).fetchall(): subtopic = {'id': st[0], 'name': st[1], 'description': st[2], 'topics': []} for t in conn.execute("SELECT id, name, description FROM topics WHERE subtopic_id=?", (st[0],)).fetchall(): q_count = conn.execute("SELECT COUNT(*) FROM questions q JOIN question_types qt ON q.question_type_id=qt.id WHERE qt.topic_id=?", (t[0],)).fetchone()[0] qtypes = [{'id': qt[0], 'name': qt[1]} for qt in conn.execute("SELECT id, name FROM question_types WHERE topic_id=?", (t[0],)).fetchall()] subtopic['topics'].append({'id': t[0], 'name': t[1], 'description': t[2], 'question_count': q_count, 'question_types': qtypes}) subject['subtopics'].append(subtopic) # Count total questions for subject subject['question_count'] = conn.execute(""" SELECT COUNT(*) FROM questions q JOIN question_types qt ON q.question_type_id = qt.id JOIN topics t ON qt.topic_id = t.id JOIN subtopics st ON t.subtopic_id = st.id WHERE st.subject_id = ? """, (s[0],)).fetchone()[0] subjects.append(subject) conn.close() self.json_response(subjects) def api_questions(self, params): conn = get_db() topic_id = params.get('topic_id', [None])[0] qtype_id = params.get('qtype_id', [None])[0] subject_id = params.get('subject_id', [None])[0] difficulty = params.get('difficulty', [None])[0] limit = int(params.get('limit', ['20'])[0]) offset = int(params.get('offset', ['0'])[0]) query = """SELECT q.id, q.question_text, q.option_a, q.option_b, q.option_c, q.option_d, q.correct_option, q.explanation, q.difficulty, qt.name as qtype_name, t.name as topic_name, st.name as subtopic_name, s.name as subject_name FROM questions q JOIN question_types qt ON q.question_type_id = qt.id JOIN topics t ON qt.topic_id = t.id JOIN subtopics st ON t.subtopic_id = st.id JOIN subjects s ON st.subject_id = s.id WHERE 1=1""" args = [] if topic_id: query += " AND t.id = ?" args.append(int(topic_id)) if qtype_id: query += " AND qt.id = ?" args.append(int(qtype_id)) if subject_id: query += " AND s.id = ?" args.append(int(subject_id)) if difficulty: query += " AND q.difficulty = ?" args.append(int(difficulty)) # Get total count count_query = query.replace("SELECT q.id, q.question_text, q.option_a, q.option_b, q.option_c, q.option_d,\n q.correct_option, q.explanation, q.difficulty,\n qt.name as qtype_name, t.name as topic_name, st.name as subtopic_name, s.name as subject_name", "SELECT COUNT(*)") total = conn.execute(count_query, args).fetchone()[0] query += " ORDER BY RANDOM() LIMIT ? OFFSET ?" args.extend([limit, offset]) rows = conn.execute(query, args).fetchall() questions = [] for r in rows: questions.append({ 'id': r[0], 'question_text': r[1], 'options': {'A': r[2], 'B': r[3], 'C': r[4], 'D': r[5]}, 'correct_option': r[6], 'explanation': r[7], 'difficulty': r[8], 'qtype': r[9], 'topic': r[10], 'subtopic': r[11], 'subject': r[12] }) conn.close() self.json_response({'total': total, 'questions': questions}) def api_stats(self): conn = get_db() stats = {} rows = conn.execute(""" SELECT s.name, COUNT(q.id) FROM subjects s LEFT JOIN subtopics st ON st.subject_id = s.id LEFT JOIN topics t ON t.subtopic_id = st.id LEFT JOIN question_types qt ON qt.topic_id = t.id LEFT JOIN questions q ON q.question_type_id = qt.id GROUP BY s.id """).fetchall() for r in rows: stats[r[0]] = r[1] total = conn.execute("SELECT COUNT(*) FROM questions").fetchone()[0] stats['total'] = total topic_count = conn.execute("SELECT COUNT(*) FROM topics").fetchone()[0] stats['topic_count'] = topic_count stats['subject_count'] = conn.execute("SELECT COUNT(*) FROM subjects").fetchone()[0] conn.close() self.json_response(stats) def api_mock_test(self, params): conn = get_db() subject_id = params.get('subject_id', [None])[0] num = int(params.get('num', ['25'])[0]) query = """SELECT q.id, q.question_text, q.option_a, q.option_b, q.option_c, q.option_d, q.correct_option, q.explanation, q.difficulty, qt.name, t.name, st.name, s.name FROM questions q JOIN question_types qt ON q.question_type_id = qt.id JOIN topics t ON qt.topic_id = t.id JOIN subtopics st ON t.subtopic_id = st.id JOIN subjects s ON st.subject_id = s.id""" args = [] if subject_id: query += " WHERE s.id = ?" args.append(int(subject_id)) query += " ORDER BY RANDOM() LIMIT ?" args.append(num) rows = conn.execute(query, args).fetchall() questions = [] for r in rows: questions.append({ 'id': r[0], 'question_text': r[1], 'options': {'A': r[2], 'B': r[3], 'C': r[4], 'D': r[5]}, 'correct_option': r[6], 'explanation': r[7], 'difficulty': r[8], 'qtype': r[9], 'topic': r[10], 'subtopic': r[11], 'subject': r[12] }) conn.close() self.json_response({'questions': questions, 'total': len(questions)}) def log_message(self, format, *args): pass # Suppress access logs def main(): # Initialize DB if needed if not os.path.exists(DB_PATH): print("Database not found. Running generation pipeline...") init_db() # Check if we have questions, if not, generate some conn = get_db() try: count = conn.execute("SELECT COUNT(*) FROM questions").fetchone()[0] if count == 0: print("Database is empty. Generating question bank...") from generators.run_all import main as generate generate() except Exception as e: print(f"Error checking question count: {e}") finally: conn.close() server = http.server.HTTPServer(('0.0.0.0', PORT), SSCHandler) print(f"\nšŸš€ SSCTopper running at http://localhost:{PORT}") print(f" Database: {DB_PATH}") print(f" Press Ctrl+C to stop\n") signal.signal(signal.SIGINT, lambda s, f: (server.shutdown(), sys.exit(0))) try: server.serve_forever() except KeyboardInterrupt: server.shutdown() if __name__ == '__main__': main()