S2OJ/judger/judge_client
daklqw 8b9bc9211c
Update judge_client
print(sys.stderr) => print(file=sys.stderr)
2021-04-16 10:37:37 +08:00

438 lines
11 KiB
Python

#!/usr/bin/python3
import json
import sys
import os
import pipes
import socket
from threading import Thread
import fcntl
import shutil
import traceback
import time
from contextlib import closing
import requests
import queue as queue
from queue import Queue, Empty
taskQ = Queue()
submission = None
# path related function
def uoj_url(uri):
return ("%s://%s%s" % (jconf['uoj_protocol'], jconf['uoj_host'], uri)).rstrip('/')
def uoj_judger_path(path = ''):
return "uoj_judger" + path
# os related funciton
def clean_up_folder(path):
for f in os.listdir(path):
f_path = os.path.join(path, f)
if os.path.isfile(f_path):
os.unlink(f_path)
else:
shutil.rmtree(f_path)
def execute(cmd):
if os.system(cmd):
raise Exception('failed to execute: %s' % cmd)
def freopen(f, g):
os.dup2(g.fileno(), f.fileno())
g.close()
# init
def init():
global jconf
os.chdir(os.path.dirname(os.path.realpath(__file__)))
with open('.conf.json', 'r') as fp:
jconf = json.load(fp)
assert 'uoj_protocol' in jconf
assert 'uoj_host' in jconf
assert 'judger_name' in jconf
assert 'judger_password' in jconf
assert 'socket_port' in jconf
assert 'socket_password' in jconf
# socket server
def socket_server_loop():
SOCK_CLOEXEC = 524288
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM | SOCK_CLOEXEC)) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', jconf['socket_port']))
s.listen(5)
while True:
try:
conn, addr = s.accept()
with closing(conn) as conn:
data = conn.recv(1024).decode()
assert data != None
task = json.loads(data)
assert task['password'] == jconf['socket_password']
assert 'cmd' in task
taskQ.put(task)
if task['cmd'] == 'stop':
print('the judge client is closing...')
taskQ.join()
conn.sendall(b'ok')
return 'stop'
except Exception:
print('['+time.asctime()+']', 'connection rejected', file=sys.stderr)
traceback.print_exc()
else:
print('['+time.asctime()+']', 'a new task accomplished', file=sys.stderr)
def start_judger_server():
global socket_server_thread
print_judge_client_status()
print('hello!', file=sys.stderr)
socket_server_thread = Thread(target = socket_server_loop)
socket_server_thread.setDaemon(True)
socket_server_thread.start()
judger_loop()
# report thread
def report_loop():
if 'is_hack' in submission:
return
while not submission_judged:
try:
with open(uoj_judger_path('/result/cur_status.txt'), 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
try:
status = f.read(100)
except Exception:
status = None
finally:
fcntl.flock(f, fcntl.LOCK_UN)
if status != None:
data = {}
data['update-status'] = True
data['id'] = submission['id']
if 'is_custom_test' in submission:
data['is_custom_test'] = True
data['status'] = status
uoj_interact(data)
time.sleep(0.2)
except Exception:
pass
# handle task in main thread
def handle_task():
need_restart = False
try:
while True:
task = taskQ.get_nowait()
if task['cmd'] == 'update':
try:
uoj_download('/judger', 'judger_update.zip')
execute('unzip -o judger_update.zip && cd %s && make clean && make' % uoj_judger_path())
except:
print("error when update", file=sys.stderr)
if jconf['judger_name'] == 'main_judger':
uoj_sync_judge_client()
need_restart = True
elif task['cmd'] == 'stop':
taskQ.task_done()
socket_server_thread.join()
print_judge_client_status()
print("goodbye!", file=sys.stderr)
sys.exit(0)
taskQ.task_done()
except Empty:
pass
if need_restart:
os.execl('./judge_client', './judge_client')
def print_judge_client_status():
print('[' + time.asctime() + ']', end=' ', file=sys.stderr)
if submission != None:
print(submission, end=' ', file=sys.stderr)
print(file=sys.stderr)
# interact with uoj_judger
def get_judger_result():
res = {}
with open(uoj_judger_path('/result/result.txt'), 'r') as fres:
res['score'] = 0
res['time'] = 0
res['memory'] = 0
while True:
line = fres.readline()
if line == '':
break
line = line.strip()
if line == 'details':
res['details'] = fres.read()
break
sp = line.split()
assert len(sp) >= 1
if sp[0] == 'error':
res['error'] = line[len('error') + 1:]
else:
assert len(sp) == 2
res[sp[0]] = sp[1]
res['score'] = int(res['score'])
res['time'] = int(res['time'])
res['memory'] = int(res['memory'])
res['status'] = 'Judged'
return res
def update_problem_data(problem_id, problem_mtime):
try:
if jconf['judger_name'] == 'main_judger':
return
copy_name = uoj_judger_path('/data/%d' % problem_id)
copy_zip_name = uoj_judger_path('/data/%d.zip' % problem_id)
if os.path.isdir(copy_name):
quoted_copy_name = pipes.quote(copy_name)
if os.path.getmtime(copy_name) >= problem_mtime:
execute('touch -a %s' % quoted_copy_name)
return
else:
execute('chmod 700 %s -R && rm -rf %s' % (quoted_copy_name, quoted_copy_name))
del_list = sorted(os.listdir(uoj_judger_path('/data')), key=lambda fname: os.path.getatime(uoj_judger_path('/data/%s' % fname)))[:-99]
for fname in del_list:
quoted_fname = pipes.quote(uoj_judger_path('/data/%s' % fname))
os.system('chmod 700 %s -R && rm -rf %s' % (quoted_fname, quoted_fname))
uoj_download('/problem/%d' % problem_id, copy_zip_name)
execute('cd %s && unzip -q %d.zip && rm %d.zip && chmod -w %d -R' % (uoj_judger_path('/data'), problem_id, problem_id, problem_id))
except Exception:
print_judge_client_status()
traceback.print_exc()
raise Exception('failed to update problem data of #%d' % problem_id)
else:
print_judge_client_status()
print('updated problem data of #%d successfully' % problem_id, file=sys.stderr)
def judge():
global report_thread
global submission_judged
clean_up_folder(uoj_judger_path('/work'))
clean_up_folder(uoj_judger_path('/result'))
update_problem_data(submission['problem_id'], submission['problem_mtime'])
with open(uoj_judger_path('/work/submission.conf'), 'w') as fconf:
uoj_download(submission['content']['file_name'], uoj_judger_path('/work/all.zip'))
execute("cd %s && unzip -q all.zip && rm all.zip" % pipes.quote(uoj_judger_path('/work')))
for k, v in submission['content']['config']:
print(k, v, file=fconf)
if 'is_hack' in submission:
if submission['hack']['input_type'] == 'USE_FORMATTER':
uoj_download(submission['hack']['input'], uoj_judger_path('/work/hack_input_raw.txt'))
execute('%s <%s >%s' % (
pipes.quote(uoj_judger_path('/run/formatter')),
pipes.quote(uoj_judger_path('/work/hack_input_raw.txt')),
pipes.quote(uoj_judger_path('/work/hack_input.txt'))))
else:
uoj_download(submission['hack']['input'], uoj_judger_path('/work/hack_input.txt'))
print('test_new_hack_only on', file=fconf)
elif 'is_custom_test' in submission:
print('custom_test on', file=fconf)
report_thread = Thread(target = report_loop)
report_thread.setDaemon(True)
submission_judged = False
report_thread.start()
execute(pipes.quote(uoj_judger_path('/main_judger')))
submission_judged = True
report_thread.join()
return get_judger_result()
# interact with uoj web server
def uoj_interact(data, files = {}):
data = data.copy()
data.update({
'judger_name': jconf['judger_name'],
'password': jconf['judger_password']
})
return requests.post(uoj_url('/judge/submit'), data=data, files=files).text
def uoj_download(uri, filename):
data = {
'judger_name': jconf['judger_name'],
'password': jconf['judger_password']
}
with open(filename, 'wb') as f:
r = requests.post(uoj_url('/judge/download' + uri), data=data, stream=True)
for chunk in r.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)
def uoj_sync_judge_client():
data = {
'judger_name': jconf['judger_name'],
'password': jconf['judger_password']
}
ret = requests.post(uoj_url('/judge/sync-judge-client'), data=data).text
if ret != "ok":
raise Exception('failed to sync judge clients: %s' % ret)
def send_and_fetch(result = None, fetch_new = True):
global submission
"""send judgement result, and fetch new submission to judge"""
data = {}
files = {}
if not fetch_new:
data['fetch_new'] = False
if result != None:
data['submit'] = True
if 'is_hack' in submission:
data['is_hack'] = True
data['id'] = submission['hack']['id']
if result != False and result['score']:
try:
print("succ hack!", file=sys.stderr)
files = {
('hack_input', open('uoj_judger/work/hack_input.txt', 'r')),
('std_output', open('uoj_judger/work/std_output.txt', 'r'))
}
except Exception:
print_judge_client_status()
traceback.print_exc()
result = False
elif 'is_custom_test' in submission:
data['is_custom_test'] = True
data['id'] = submission['id']
else:
data['id'] = submission['id']
if result == False:
result = {
'score': 0,
'error': 'Judgement Failed',
'details': 'Unknown Error'
}
result['status'] = 'Judged'
data['result'] = json.dumps(result, ensure_ascii=False)
while True:
try:
ret = uoj_interact(data, files)
print(ret)
except Exception:
print_judge_client_status()
traceback.print_exc()
else:
break
time.sleep(2)
try:
submission = json.loads(ret)
except Exception as e:
submission = None
return False
else:
return True
# judge client
def judger_loop():
ok = False
while True:
fetch_new = True
if ok and not (taskQ.empty() and socket_server_thread.isAlive()):
fetch_new = False
if not ok:
while True:
if not taskQ.empty():
handle_task()
if not socket_server_thread.isAlive():
raise Exception('socket server exited unexpectedly')
if send_and_fetch():
break
print('['+time.asctime()+']', 'Nothing to judge...')
time.sleep(2)
ok = True
print_judge_client_status()
print('judging', file=sys.stderr)
try:
res = judge()
except Exception:
print_judge_client_status()
traceback.print_exc()
res = False
ok = send_and_fetch(result=res,fetch_new=fetch_new)
# main function
def main():
init()
if len(sys.argv) == 1:
start_judger_server()
if len(sys.argv) == 2:
if sys.argv[1] == 'start':
pid = os.fork()
if pid == -1:
raise Exception('fork failed')
elif pid > 0:
return
else:
freopen(sys.stdout, open(os.devnull, 'wb'))
freopen(sys.stderr, open('log/judge.log', 'ab', buffering=0))
start_judger_server()
elif sys.argv[1] == 'update':
try:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.connect(('127.0.0.1', jconf['socket_port']))
s.sendall(json.dumps({
'password': jconf['socket_password'],
'cmd': 'update'
}).encode())
return
except Exception:
traceback.print_exc()
raise Exception('update failed')
elif sys.argv[1] == 'stop':
try:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.connect(('127.0.0.1', jconf['socket_port']))
s.sendall(json.dumps({
'password': jconf['socket_password'],
'cmd': 'stop'
}).encode())
if s.recv(10).decode() != 'ok':
raise Exception('stop failed')
return
except Exception:
traceback.print_exc()
raise Exception('stop failed')
raise Exception('invalid argument')
try:
main()
except Exception:
print_judge_client_status()
traceback.print_exc()
sys.exit(1)