0
0
mirror of https://gitlab.nic.cz/labs/bird.git synced 2025-01-05 16:41:53 +00:00
bird/proto/snmp/splitter.py

94 lines
2.3 KiB
Python
Raw Normal View History

#! /bin/env python3
"""
A very simple script used to test that BIRD does not segfaults on randomly split
AgentX PDUs. A split is when the AgentX PDUs do not align with socket's
write()/read() boundaries.
"""
import socket
import random
import time
import math
ADDRESS = "127.0.0.1"
PORT = 5000
AGENTX_MASTER_PORT = 705
def chunks(lst, n):
l = len(lst)
for i in range(0, l, n):
yield lst[i: min(i+n, l)]
def print_msg(header, msg, total):
print(header, "{}/{}".format(len(msg), total))
for line in chunks(msg, 16):
print(" ", end="")
for char in line:
print("0x{:02x} ".format(char), end="")
print()
def create_listen():
tun = socket.socket()
print(f"Binding to port {PORT} on {ADDRESS}")
tun.bind((ADDRESS, PORT))
print(f"Listening on {ADDRESS} port {PORT}")
tun.listen(2)
return tun
def io_loop(rx, tx):
while True:
try:
to_master = rx.recv(8192)
except BlockingIOError:
to_master = None
try:
to_subagent = tx.recv(8192)
except BlockingIOError:
to_subagent = None
if to_master is not None and len(to_master) > 0:
print_msg("S=>M: ", to_master, len(to_master))
tx.send(to_master)
if to_subagent is not None and len(to_subagent) > 0:
limit = 5 * len(to_subagent) / 100
part_len = random.randint(math.ceil(limit),
math.floor(len(to_subagent) - limit))
print(f"M->S: {len(to_subagent[:part_len])}/{len(to_subagent)}")
rx.send(to_subagent[:part_len])
time.sleep(0.4)
print_msg("M=>S: ", to_subagent, len(to_subagent))
rx.send(to_subagent[part_len:])
time.sleep(0.02)
def safe_io_loop(tun):
while True:
try:
rx, addr = tun.accept()
print("Subagent connected")
tx = socket.socket()
tx.connect((ADDRESS, AGENTX_MASTER_PORT))
print("Connected to master agent")
rx.setblocking(False)
tx.setblocking(False)
io_loop(rx, tx)
except BrokenPipeError:
rx.close()
tx.close()
def main():
with create_listen() as listening:
safe_io_loop(listening)
if __name__ == '__main__':
main()