S2OJ/judger/judge_client
Baoshuo bcf5ce8b06
All checks were successful
continuous-integration/drone/push Build is passing
feat(judger): uoj_judger_v2
ref: https://github-redirect.dependabot.com/UniversalOJ/UOJ-System/pull/113

Co-authored-by: vfleaking <vfleaking@163.com>
Co-authored-by: Yefori-Go <110314400+Yefori-Go@users.noreply.github.com>
2022-10-04 20:57:49 +08:00

605 lines
20 KiB
Python
Executable File

#!/usr/bin/python3
import json
import sys
import os
import pipes
import socket
from threading import Thread, Lock, Condition, RLock
import fcntl
import shutil
import time
import logging
import codecs
from contextlib import closing
from typing import *
import requests
from queue import Queue, Empty
jconf: dict
# os related funciton
def clean_up_folder(path):
for f in os.listdir(path):
f_path = os.path.join(path, f)
if os.path.isfile(f_path):
os.unlink(f_path)
else:
shutil.rmtree(f_path)
def execute(cmd):
if os.system(cmd) != 0:
raise Exception('failed to execute: %s' % cmd)
def freopen(f, g):
os.dup2(g.fileno(), f.fileno())
g.close()
# utf 8
def uoj_text_replace(e):
return ''.join('<b>\\x%02x</b>' % b for b in e.object[e.start:e.end]), e.end
codecs.register_error('uoj_text_replace', uoj_text_replace)
# init
def init():
global jconf
os.chdir(os.path.dirname(os.path.realpath(__file__)))
with open('.conf.json', 'r') as fp:
jconf = json.load(fp)
assert 'uoj_protocol' in jconf
assert 'uoj_host' in jconf
assert 'judger_name' in jconf
assert 'judger_password' in jconf
assert 'socket_port' in jconf
assert 'socket_password' in jconf
if 'judger_cpus' not in jconf:
jconf['judger_cpus'] = [
'0'] # it should be distinct physical cores! Usually, you can just set it to some even numbers
# path related function
def uoj_url(uri):
return ("%s://%s%s" % (jconf['uoj_protocol'], jconf['uoj_host'], uri)).rstrip('/')
def uoj_judger_path(path=''):
return os.getcwd() + "/uoj_judger" + path
class RWLock:
def __init__(self):
self.rwlock = 0
self.writers_waiting = 0
self.monitor = Lock()
self.readers_ok = Condition(self.monitor)
self.writers_ok = Condition(self.monitor)
def acquire_read(self):
self.monitor.acquire()
while self.rwlock < 0 or self.writers_waiting:
self.readers_ok.wait()
self.rwlock += 1
self.monitor.release()
def acquire_write(self):
self.monitor.acquire()
while self.rwlock != 0:
self.writers_waiting += 1
self.writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def promote(self):
self.monitor.acquire()
self.rwlock -= 1
while self.rwlock != 0:
self.writers_waiting += 1
self.writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def demote(self):
self.monitor.acquire()
self.rwlock = 1
self.readers_ok.notifyAll()
self.monitor.release()
def release(self):
self.monitor.acquire()
if self.rwlock < 0:
self.rwlock = 0
else:
self.rwlock -= 1
wake_writers = self.writers_waiting and self.rwlock == 0
wake_readers = self.writers_waiting == 0
self.monitor.release()
if wake_writers:
self.writers_ok.acquire()
self.writers_ok.notify()
self.writers_ok.release()
elif wake_readers:
self.readers_ok.acquire()
self.readers_ok.notify_all()
self.readers_ok.release()
class Judger:
problem_data_lock = RWLock()
send_and_fetch_lock = RLock()
judger_id: int
cpus: str
main_path: str
submission: Optional[dict]
submission_judged: bool
main_thread: Optional[Thread]
report_thread: Optional[Thread]
_taskQ: Queue
def log_judge_client_status(self):
logging.info('submission: ' + str(self.submission))
def __init__(self, judger_id, cpus):
self.judger_id = judger_id
self.cpus = cpus
self.main_path = '/tmp/' + jconf['judger_name'] + '/' + str(self.judger_id)
self.submission = None
self.main_thread = Thread(target=self._main_loop)
self.report_thread = None
self.submission_judged = False
self._taskQ = Queue()
def start(self):
execute('mkdir -p %s' % (self.main_path + '/result'))
execute('mkdir -p %s' % (self.main_path + '/work'))
self.log_judge_client_status()
logging.info('hello from judger #%d' % self.judger_id)
self.main_thread.start()
def suspend(self):
self._taskQ.put('suspend')
self._taskQ.join()
def resume(self):
self._taskQ.put('resume')
self._taskQ.join()
def exit(self):
self._taskQ.put('exit')
self._taskQ.join()
self.main_thread.join()
# report thread
def _report_loop(self):
if 'is_hack' in self.submission:
return
while not self.submission_judged:
try:
with open(self.main_path + '/result/cur_status.txt', 'r') as f:
fcntl.flock(f, fcntl.LOCK_SH)
try:
status = f.read(100)
except Exception:
status = None
finally:
fcntl.flock(f, fcntl.LOCK_UN)
if status != None:
data = {}
data['update-status'] = True
data['id'] = self.submission['id']
if 'is_custom_test' in self.submission:
data['is_custom_test'] = True
data['status'] = status
uoj_interact(data)
time.sleep(0.2)
except Exception:
pass
def _get_result(self):
res = {}
with open(self.main_path + '/result/result.txt', 'r', encoding='utf-8', errors='uoj_text_replace') as fres:
res['score'] = 0
res['time'] = 0
res['memory'] = 0
while True:
line = fres.readline()
if line == '':
break
line = line.strip()
if line == 'details':
res['details'] = fres.read()
break
sp = line.split()
assert len(sp) >= 1
if sp[0] == 'error':
res['error'] = line[len('error') + 1:]
else:
assert len(sp) == 2
res[sp[0]] = sp[1]
res['score'] = int(res['score'])
res['time'] = int(res['time'])
res['memory'] = int(res['memory'])
res['status'] = 'Judged'
return res
def _remove_problem_data(self, problem_id):
quoted_path = pipes.quote(uoj_judger_path(f'/data/{problem_id}'))
execute(f'chmod 700 {quoted_path} -R && rm -rf {quoted_path}')
def _update_problem_data_atime(self, problem_id):
path = uoj_judger_path(f'/data/{problem_id}')
execute(f'touch -a {pipes.quote(path)}')
def _update_problem_data(self, problem_id, problem_mtime):
Judger.problem_data_lock.acquire_read()
try:
copy_name = uoj_judger_path(f'/data/{problem_id}')
copy_zip_name = uoj_judger_path(f'/data/{problem_id}.zip')
if os.path.isdir(copy_name):
if os.path.getmtime(copy_name) >= problem_mtime:
self._update_problem_data_atime(problem_id)
return
else:
Judger.problem_data_lock.promote()
self._remove_problem_data(problem_id)
else:
Judger.problem_data_lock.promote()
del_list = sorted(os.listdir(uoj_judger_path('/data')),
key=lambda p: os.path.getatime(uoj_judger_path(f'/data/{p}')))[:-99]
for p in del_list:
self._remove_problem_data(p)
uoj_download(f'/problem/{problem_id}', copy_zip_name)
execute('cd %s && unzip -q %d.zip && rm %d.zip && chmod -w %d -R' % (
uoj_judger_path('/data'), problem_id, problem_id, problem_id))
os.utime(uoj_judger_path(f'/data/{problem_id}'), (time.time(), problem_mtime))
except Exception:
self.log_judge_client_status()
logging.exception('problem update error')
raise Exception(f'failed to update problem data of #{problem_id}')
else:
self.log_judge_client_status()
logging.info(f'updated problem data of #{problem_id} successfully')
finally:
Judger.problem_data_lock.release()
def _judge(self):
clean_up_folder(self.main_path + '/work')
clean_up_folder(self.main_path + '/result')
self._update_problem_data(self.submission['problem_id'], self.submission['problem_mtime'])
with open(self.main_path + '/work/submission.conf', 'w') as fconf:
uoj_download(self.submission['content']['file_name'], self.main_path + '/work/all.zip')
execute("cd %s && unzip -q all.zip && rm all.zip" % pipes.quote(self.main_path + '/work'))
for k, v in self.submission['content']['config']:
print(k, v, file=fconf)
if 'is_hack' in self.submission:
if self.submission['hack']['input_type'] == 'USE_FORMATTER':
uoj_download(self.submission['hack']['input'], self.main_path + '/work/hack_input_raw.txt')
execute('%s <%s >%s' % (
pipes.quote(uoj_judger_path('/run/formatter')),
pipes.quote(self.main_path + '/work/hack_input_raw.txt'),
pipes.quote(self.main_path + '/work/hack_input.txt')
))
else:
uoj_download(self.submission['hack']['input'], self.main_path + '/work/hack_input.txt')
print('test_new_hack_only on', file=fconf)
elif 'is_custom_test' in self.submission:
print('custom_test on', file=fconf)
self.submission_judged = False
self.report_thread = Thread(target=self._report_loop)
self.report_thread.start()
Judger.problem_data_lock.acquire_read()
try:
main_judger_cmd = pipes.quote(uoj_judger_path('/main_judger'))
if self.cpus != 'all':
main_judger_cmd = 'taskset -c %s %s' % (pipes.quote(self.cpus), main_judger_cmd)
execute('cd %s && %s' % (pipes.quote(self.main_path), main_judger_cmd))
finally:
Judger.problem_data_lock.release()
self.submission_judged = True
self.report_thread.join()
self.report_thread = None
return self._get_result()
def _send_and_fetch(self, result=None, fetch_new=True):
"""send judgement result, and fetch new submission to judge"""
data = {}
files = {}
if not fetch_new:
data['fetch_new'] = False
if result is not None:
data['submit'] = True
if 'is_hack' in self.submission:
data['is_hack'] = True
data['id'] = self.submission['hack']['id']
if result != False and result['score']:
try:
logging.info("succ hack!")
files = {
('hack_input', open(self.main_path + '/work/hack_input.txt', 'rb')),
('std_output', open(self.main_path + '/work/std_output.txt', 'rb'))
}
except Exception:
self.log_judge_client_status()
logging.exception('hack: submit error')
result = False
elif 'is_custom_test' in self.submission:
data['is_custom_test'] = True
data['id'] = self.submission['id']
else:
data['id'] = self.submission['id']
if 'judge_time' in self.submission:
data['judge_time'] = self.submission['judge_time']
if 'tid' in self.submission:
data['tid'] = self.submission['tid']
if result == False:
result = {
'score': 0,
'error': 'Judgment Failed',
'details': 'Unknown Error'
}
result['status'] = 'Judged'
data['result'] = json.dumps(result, ensure_ascii=False)
while True:
Judger.send_and_fetch_lock.acquire()
try:
ret = uoj_interact(data, files)
except Exception:
self.log_judge_client_status()
logging.exception('uoj_interact error')
else:
break
finally:
Judger.send_and_fetch_lock.release()
time.sleep(2)
try:
self.submission = json.loads(ret)
except Exception:
if ret != 'Nothing to judge':
logging.info(ret)
self.submission = None
return False
else:
return True
def _main_loop(self):
while True:
if not self._taskQ.empty():
while True:
task = self._taskQ.get()
if task == 'resume':
self._taskQ.task_done()
break
elif task == 'exit':
self._taskQ.task_done()
return
else:
self._taskQ.task_done()
if not self._send_and_fetch():
time.sleep(2)
continue
self.log_judge_client_status()
logging.info('judging')
while True:
try:
res = self._judge()
except Exception:
self.log_judge_client_status()
logging.exception('judge error')
res = False
if not self._send_and_fetch(result=res, fetch_new=self._taskQ.empty()):
break
class JudgerServer:
taskQ: Queue
judgers: List[Judger]
socket_thread: Thread
def __init__(self):
self.taskQ = Queue()
self.judgers = [Judger(i, cpus) for i, cpus in enumerate(jconf['judger_cpus'])]
self.socket_thread = Thread(target=self._socket_loop)
def _socket_loop(self):
try:
SOCK_CLOEXEC = 524288
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM | SOCK_CLOEXEC)) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', jconf['socket_port']))
s.listen(5)
while True:
try:
conn, addr = s.accept()
with closing(conn) as conn:
data = conn.recv(1024)
assert data != None
task = json.loads(data)
assert task['password'] == jconf['socket_password']
assert 'cmd' in task
self.taskQ.put(task)
if task['cmd'] == 'stop':
logging.info('the judge client is closing...')
self.taskQ.join()
conn.sendall(b'ok')
return 'stop'
except Exception:
logging.exception('connection rejected')
else:
logging.info('a new task accomplished')
except Exception:
self.taskQ.put({'cmd': 'stop'})
logging.exception('socket server error!')
def run(self):
self.socket_thread.start()
for judger in self.judgers:
judger.start()
while True:
task = self.taskQ.get()
need_restart = False
block_wait = False
for judger in self.judgers:
judger.suspend()
try:
while True:
if task['cmd'] in ['update', 'self-update']:
try:
uoj_download('/judger', 'judger_update.zip')
execute('unzip -o judger_update.zip && cd %s && make clean && make' % uoj_judger_path())
except:
print(sys.stderr, "error when update")
if jconf['judger_name'] == 'main_judger':
uoj_sync_judge_client()
need_restart = True
elif task['cmd'] == 'stop':
self.taskQ.task_done()
self.socket_thread.join()
for judger in self.judgers:
judger.exit()
logging.info("goodbye!")
sys.exit(0)
self.taskQ.task_done()
task = self.taskQ.get(block=block_wait)
except Empty:
pass
if need_restart:
os.execl('./judge_client', './judge_client')
for judger in self.judgers:
judger.resume()
# interact with uoj web server
# TODO: set a larger timeout
def uoj_interact(data, files={}):
data = data.copy()
data.update({
'judger_name': jconf['judger_name'],
'password': jconf['judger_password']
})
return requests.post(uoj_url('/judge/submit'), data=data, files=files).text
def uoj_download(uri, filename):
data = {
'judger_name': jconf['judger_name'],
'password': jconf['judger_password']
}
with open(filename, 'wb') as f:
r = requests.post(uoj_url('/judge/download' + uri), data=data, stream=True)
for chunk in r.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)
def uoj_sync_judge_client():
data = {
'judger_name': jconf['judger_name'],
'password': jconf['judger_password']
}
ret = requests.post(uoj_url('/judge/sync-judge-client'), data=data).text
if ret != "ok":
raise Exception('failed to sync judge clients: %s' % ret)
# main function
def main():
init()
logging.basicConfig(filename='log/judge.log', level=logging.INFO,
format='%(asctime)-15s [%(levelname)s]: %(message)s')
if len(sys.argv) == 1:
JudgerServer().run()
if len(sys.argv) == 2:
if sys.argv[1] == 'start':
pid = os.fork()
if pid == -1:
raise Exception('fork failed')
elif pid > 0:
return
else:
freopen(sys.stdout, open(os.devnull, 'w'))
freopen(sys.stderr, open('log/judge.log', 'a', encoding='utf-8', errors='uoj_text_replace'))
JudgerServer().run()
elif sys.argv[1] in ['update', 'self-update']:
try:
try:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.connect(('127.0.0.1', jconf['socket_port']))
s.sendall(json.dumps({
'password': jconf['socket_password'],
'cmd': sys.argv[1]
}).encode())
return
except OSError:
JudgerServer.update(broadcast=sys.argv[1] == 'update')
return
except Exception:
logging.exception('update error')
raise Exception('update failed')
elif sys.argv[1] == 'stop':
try:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.connect(('127.0.0.1', jconf['socket_port']))
s.sendall(json.dumps({
'password': jconf['socket_password'],
'cmd': 'stop'
}).encode())
if s.recv(10) != b'ok':
raise Exception('stop failed')
return
except Exception:
logging.exception('stop error')
raise Exception('stop failed')
raise Exception('invalid argument')
try:
main()
except Exception:
logging.exception('critical error!')
sys.exit(1)