mirror of
https://gitlab.nic.cz/labs/bird.git
synced 2025-01-03 15:41:54 +00:00
aa14b546b8
The first iteration of MIB tree with tests.
94 lines
2.3 KiB
Python
Executable File
94 lines
2.3 KiB
Python
Executable File
#! /bin/env python3
|
|
|
|
"""
|
|
A very simple script used to test that BIRD does not segfaults on randomly split
|
|
AgentX PDUs. A split is when the AgentX PDUs do not align with socket's
|
|
write()/read() boundaries.
|
|
"""
|
|
|
|
import socket
|
|
import random
|
|
import time
|
|
import math
|
|
|
|
ADDRESS = "127.0.0.1"
|
|
PORT = 5000
|
|
AGENTX_MASTER_PORT = 705
|
|
|
|
def chunks(lst, n):
|
|
l = len(lst)
|
|
for i in range(0, l, n):
|
|
yield lst[i: min(i+n, l)]
|
|
|
|
def print_msg(header, msg, total):
|
|
print(header, "{}/{}".format(len(msg), total))
|
|
for line in chunks(msg, 16):
|
|
print(" ", end="")
|
|
for char in line:
|
|
print("0x{:02x} ".format(char), end="")
|
|
|
|
print()
|
|
|
|
def create_listen():
|
|
tun = socket.socket()
|
|
print(f"Binding to port {PORT} on {ADDRESS}")
|
|
tun.bind((ADDRESS, PORT))
|
|
print(f"Listening on {ADDRESS} port {PORT}")
|
|
tun.listen(2)
|
|
|
|
return tun
|
|
|
|
def io_loop(rx, tx):
|
|
while True:
|
|
try:
|
|
to_master = rx.recv(8192)
|
|
except BlockingIOError:
|
|
to_master = None
|
|
|
|
try:
|
|
to_subagent = tx.recv(8192)
|
|
except BlockingIOError:
|
|
to_subagent = None
|
|
|
|
if to_master is not None and len(to_master) > 0:
|
|
print_msg("S=>M: ", to_master, len(to_master))
|
|
tx.send(to_master)
|
|
|
|
if to_subagent is not None and len(to_subagent) > 0:
|
|
limit = 5 * len(to_subagent) / 100
|
|
part_len = random.randint(math.ceil(limit),
|
|
math.floor(len(to_subagent) - limit))
|
|
|
|
print(f"M->S: {len(to_subagent[:part_len])}/{len(to_subagent)}")
|
|
rx.send(to_subagent[:part_len])
|
|
time.sleep(0.4)
|
|
print_msg("M=>S: ", to_subagent, len(to_subagent))
|
|
rx.send(to_subagent[part_len:])
|
|
|
|
time.sleep(0.02)
|
|
|
|
def safe_io_loop(tun):
|
|
while True:
|
|
try:
|
|
rx, addr = tun.accept()
|
|
print("Subagent connected")
|
|
|
|
tx = socket.socket()
|
|
tx.connect((ADDRESS, AGENTX_MASTER_PORT))
|
|
print("Connected to master agent")
|
|
|
|
rx.setblocking(False)
|
|
tx.setblocking(False)
|
|
|
|
io_loop(rx, tx)
|
|
except BrokenPipeError:
|
|
rx.close()
|
|
tx.close()
|
|
|
|
def main():
|
|
with create_listen() as listening:
|
|
safe_io_loop(listening)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|