#!/usr/bin/python2 import json import sys import os import pipes import socket from threading import Thread import fcntl import shutil import traceback import time from contextlib import closing import requests import Queue as queue from Queue import Queue, Empty taskQ = Queue() submission = None # path related function def uoj_url(uri): return ("%s://%s%s" % (jconf['uoj_protocol'], jconf['uoj_host'], uri)).rstrip('/') def uoj_judger_path(path = ''): return "uoj_judger" + path # os related funciton def clean_up_folder(path): for f in os.listdir(path): f_path = os.path.join(path, f) if os.path.isfile(f_path): os.unlink(f_path) else: shutil.rmtree(f_path) def execute(cmd): if os.system(cmd): raise Exception('failed to execute: %s' % cmd) def freopen(f, g): os.dup2(g.fileno(), f.fileno()) g.close() # init def init(): global jconf os.chdir(os.path.dirname(os.path.realpath(__file__))) with open('.conf.json', 'r') as fp: jconf = json.load(fp) assert 'uoj_protocol' in jconf assert 'uoj_host' in jconf assert 'judger_name' in jconf assert 'judger_password' in jconf assert 'socket_port' in jconf assert 'socket_password' in jconf # socket server def socket_server_loop(): SOCK_CLOEXEC = 524288 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM | SOCK_CLOEXEC)) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('', jconf['socket_port'])) s.listen(5) while True: try: conn, addr = s.accept() with closing(conn) as conn: data = conn.recv(1024) assert data != None task = json.loads(data) assert task['password'] == jconf['socket_password'] assert 'cmd' in task taskQ.put(task) if task['cmd'] == 'stop': print 'the judge client is closing...' taskQ.join() conn.sendall('ok') return 'stop' except Exception: print >>sys.stderr, '['+time.asctime()+']', 'connection rejected' traceback.print_exc() else: print >>sys.stderr, '['+time.asctime()+']', 'a new task accomplished' def start_judger_server(): global socket_server_thread print_judge_client_status() print >>sys.stderr, 'hello!' socket_server_thread = Thread(target = socket_server_loop) socket_server_thread.setDaemon(True) socket_server_thread.start() judger_loop() # report thread def report_loop(): if 'is_hack' in submission: return while not submission_judged: try: with open(uoj_judger_path('/result/cur_status.txt'), 'rb') as f: fcntl.flock(f, fcntl.LOCK_SH) try: status = f.read(100) except Exception: status = None finally: fcntl.flock(f, fcntl.LOCK_UN) if status != None: data = {} data['update-status'] = True data['id'] = submission['id'] if 'is_custom_test' in submission: data['is_custom_test'] = True data['status'] = status uoj_interact(data) time.sleep(0.2) except Exception: pass # handle task in main thread def handle_task(): need_restart = False try: while True: task = taskQ.get_nowait() if task['cmd'] == 'update': if jconf['judger_name'] == 'main_judger': uoj_sync_judge_client() need_restart = True elif task['cmd'] == 'stop': taskQ.task_done() socket_server_thread.join() print_judge_client_status() print sys.stderr, "goodbye!" sys.exit(0) taskQ.task_done() except Empty: pass if need_restart: os.execl('./judge_client', './judge_client') def print_judge_client_status(): print >>sys.stderr, '[' + time.asctime() + ']', if submission != None: print >>sys.stderr, submission, print >>sys.stderr # interact with uoj_judger def get_judger_result(): res = {} with open(uoj_judger_path('/result/result.txt'), 'rb') as fres: res['score'] = 0 res['time'] = 0 res['memory'] = 0 while True: line = fres.readline() if line == '': break line = line.strip() if line == 'details': res['details'] = fres.read() break sp = line.split() assert len(sp) >= 1 if sp[0] == 'error': res['error'] = line[len('error') + 1:] else: assert len(sp) == 2 res[sp[0]] = sp[1] res['score'] = int(res['score']) res['time'] = int(res['time']) res['memory'] = int(res['memory']) res['status'] = 'Judged' return res def update_problem_data(problem_id, problem_mtime): try: if jconf['judger_name'] == 'main_judger': return copy_name = uoj_judger_path('/data/%d' % problem_id) copy_zip_name = uoj_judger_path('/data/%d.zip' % problem_id) if os.path.isdir(copy_name): quoted_copy_name = pipes.quote(copy_name) if os.path.getmtime(copy_name) >= problem_mtime: execute('touch -a %s' % quoted_copy_name) return else: execute('chmod 700 %s -R && rm -rf %s' % (quoted_copy_name, quoted_copy_name)) del_list = sorted(os.listdir(uoj_judger_path('/data')), key=lambda fname: os.path.getatime(uoj_judger_path('/data/%s' % fname)))[:-99] for fname in del_list: quoted_fname = pipes.quote(uoj_judger_path('/data/%s' % fname)) os.system('chmod 700 %s -R && rm -rf %s' % (quoted_fname, quoted_fname)) uoj_download('/problem/%d' % problem_id, copy_zip_name) execute('cd %s && unzip -q %d.zip && rm %d.zip && chmod -w %d -R' % (uoj_judger_path('/data'), problem_id, problem_id, problem_id)) except Exception: print_judge_client_status() traceback.print_exc() raise Exception('failed to update problem data of #%d' % problem_id) else: print_judge_client_status() print >>sys.stderr, 'updated problem data of #%d successfully' % problem_id def judge(): global report_thread global submission_judged clean_up_folder(uoj_judger_path('/work')) clean_up_folder(uoj_judger_path('/result')) update_problem_data(submission['problem_id'], submission['problem_mtime']) with open(uoj_judger_path('/work/submission.conf'), 'wb') as fconf: uoj_download(submission['content']['file_name'], uoj_judger_path('/work/all.zip')) execute("cd %s && unzip -q all.zip && rm all.zip" % pipes.quote(uoj_judger_path('/work'))) for k, v in submission['content']['config']: print >>fconf, k, v if 'is_hack' in submission: if submission['hack']['input_type'] == 'USE_FORMATTER': uoj_download(submission['hack']['input'], uoj_judger_path('/work/hack_input_raw.txt')) execute('%s <%s >%s' % ( pipes.quote(uoj_judger_path('/run/formatter')), pipes.quote(uoj_judger_path('/work/hack_input_raw.txt')), pipes.quote(uoj_judger_path('/work/hack_input.txt')))) else: uoj_download(submission['hack']['input'], uoj_judger_path('/work/hack_input.txt')) print >>fconf, 'test_new_hack_only on' elif 'is_custom_test' in submission: print >>fconf, 'custom_test on' report_thread = Thread(target = report_loop) report_thread.setDaemon(True) submission_judged = False report_thread.start() execute(pipes.quote(uoj_judger_path('/main_judger'))) submission_judged = True report_thread.join() return get_judger_result() # interact with uoj web server def uoj_interact(data, files = {}): data = data.copy() data.update({ 'judger_name': jconf['judger_name'], 'password': jconf['judger_password'] }) return requests.post(uoj_url('/judge/submit'), data=data, files=files).text def uoj_download(uri, filename): data = { 'judger_name': jconf['judger_name'], 'password': jconf['judger_password'] } with open(filename, 'wb') as f: r = requests.post(uoj_url('/judge/download' + uri), data=data, stream=True) for chunk in r.iter_content(chunk_size=65536): if chunk: f.write(chunk) def uoj_sync_judge_client(): data = { 'judger_name': jconf['judger_name'], 'password': jconf['judger_password'] } ret = requests.post(uoj_url('/judge/sync-judge-client'), data=data).text if ret != "ok": raise Exception('failed to sync judge clients: %s' % ret) def send_and_fetch(result = None, fetch_new = True): global submission """send judgement result, and fetch new submission to judge""" data = {} files = {} if not fetch_new: data['fetch_new'] = False if result != None: data['submit'] = True if 'is_hack' in submission: data['is_hack'] = True data['id'] = submission['hack']['id'] if result != False and result['score']: try: print >>sys.stderr, "succ hack!" files = { ('hack_input', open('uoj_judger/work/hack_input.txt', 'rb')), ('std_output', open('uoj_judger/work/std_output.txt', 'rb')) } except Exception: print_judge_client_status() traceback.print_exc() result = False elif 'is_custom_test' in submission: data['is_custom_test'] = True data['id'] = submission['id'] else: data['id'] = submission['id'] if result == False: result = { 'score': 0, 'error': 'Judgement Failed', 'details': 'Unknown Error' } result['status'] = 'Judged' data['result'] = json.dumps(result, ensure_ascii=False) while True: try: ret = uoj_interact(data, files) print ret except Exception: print_judge_client_status() traceback.print_exc() else: break time.sleep(2) try: submission = json.loads(ret) except Exception as e: submission = None return False else: return True # judge client def judger_loop(): ok = False while True: fetch_new = True if ok and not (taskQ.empty() and socket_server_thread.isAlive()): fetch_new = False if not ok: while True: if not taskQ.empty(): handle_task() if not socket_server_thread.isAlive(): raise Exception('socket server exited unexpectedly') if send_and_fetch(): break print '['+time.asctime()+']', 'Nothing to judge...' time.sleep(2) ok = True print_judge_client_status() print >>sys.stderr, 'judging' try: res = judge() except Exception: print_judge_client_status() traceback.print_exc() res = False ok = send_and_fetch(result=res,fetch_new=fetch_new) # main function def main(): init() if len(sys.argv) == 1: start_judger_server() if len(sys.argv) == 2: if sys.argv[1] == 'start': pid = os.fork() if pid == -1: raise Exception('fork failed') elif pid > 0: return else: freopen(sys.stdout, open(os.devnull, 'wb')) freopen(sys.stderr, open('log/judge.log', 'ab', buffering=0)) start_judger_server() elif sys.argv[1] == 'update': try: with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.connect(('127.0.0.1', jconf['socket_port'])) s.sendall(json.dumps({ 'password': jconf['socket_password'], 'cmd': 'update' })) return except Exception: traceback.print_exc() raise Exception('update failed') elif sys.argv[1] == 'stop': try: with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.connect(('127.0.0.1', jconf['socket_port'])) s.sendall(json.dumps({ 'password': jconf['socket_password'], 'cmd': 'stop' })) if s.recv(10) != 'ok': raise Exception('stop failed') return except Exception: traceback.print_exc() raise Exception('stop failed') raise Exception('invalid argument') try: main() except Exception: print_judge_client_status() traceback.print_exc() sys.exit(1)