1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| import telnetlib import time import sys import logging from logging.handlers import RotatingFileHandler
ROUTER_IP = "192.168.1.1" TELNET_PORT = 23 USERNAME = "root" PASSWORD = "Zte521" PROMPT = b"#" LOG_FILE = "/var/log/apply_iptables_rules.log"
CORE_RULES = [ "ip6tables -A MY_PROXY_TMP -i br0 -o ppp0 -j ACCEPT", "ip6tables -A MY_PROXY_TMP -i ppp0 -m state --state ESTABLISHED,RELATED -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p tcp --dport 443 -d ::be24:11ff:fe16:ceb1/::ffff:ffff:ffff:ffff -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p tcp --dport 443 -d ::40e4:bff:feea:2201/::ffff:ffff:ffff:ffff -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p udp --dport 8090 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p udp --dport 8092 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p udp --dport 20000:30000 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p tcp --dport 443 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p udp --dport 8090 -d ::86a9:3eff:fe70:7209/::ffff:ffff:ffff:ffff -j ACCEPT", "ip6tables -A MY_PROXY_TMP -p tcp --dport 920 -d ::86a9:3eff:fe70:7209/::ffff:ffff:ffff:ffff -j ACCEPT" ]
log_handler = RotatingFileHandler(LOG_FILE, maxBytes=2*1024*1024, backupCount=3, encoding='utf-8') log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') log_handler.setFormatter(log_formatter)
logger = logging.getLogger() logger.setLevel(logging.INFO) if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(log_handler)
def log_and_print(message, level=logging.INFO): print(message) if level == logging.INFO: logger.info(message) elif level == logging.ERROR: logger.error(message)
def execute_cmd(tn, cmd, fatal_on_error=True): tn.read_very_eager() tn.write(cmd.encode('ascii') + b"\n") raw_output = tn.read_until(PROMPT, timeout=5) output = raw_output.decode('ascii', errors='ignore').strip() time.sleep(0.1) if fatal_on_error: error_keywords = ["error", "bad", "failed", "invalid", "no chain", "not found", "command not found"] if any(keyword in output.lower() for keyword in error_keywords): error_msg = f"致命错误!命令执行失败: {cmd}\n回显信息: {output}" log_and_print(error_msg, level=logging.ERROR) raise RuntimeError("规则注入中断,已触发熔断保护。") return output
def run_telnet_session(): try: log_and_print(f"开始执行无感更新任务,连接路由器 {ROUTER_IP}...") tn = telnetlib.Telnet(ROUTER_IP, TELNET_PORT, timeout=15)
tn.read_until(b"Login:", timeout=5) tn.write(USERNAME.encode('ascii') + b"\n") tn.read_until(b"Password:", timeout=5) tn.write(PASSWORD.encode('ascii') + b"\n") tn.read_until(PROMPT, timeout=5) log_and_print("登录成功,开始构建影子链...")
execute_cmd(tn, "ip6tables -N MY_PROXY_TMP 2>/dev/null", fatal_on_error=False) execute_cmd(tn, "ip6tables -F MY_PROXY_TMP", fatal_on_error=True)
for cmd in CORE_RULES: execute_cmd(tn, cmd, fatal_on_error=True) log_and_print("影子链构建完成。")
log_and_print("正在执行原子级无感切换...") execute_cmd(tn, "ip6tables -I FORWARD 1 -j MY_PROXY_TMP", fatal_on_error=True) execute_cmd(tn, "ip6tables -D FORWARD -j MY_PROXY_RULES 2>/dev/null", fatal_on_error=False) execute_cmd(tn, "ip6tables -F MY_PROXY_RULES 2>/dev/null", fatal_on_error=False) execute_cmd(tn, "ip6tables -X MY_PROXY_RULES 2>/dev/null", fatal_on_error=False) execute_cmd(tn, "ip6tables -E MY_PROXY_TMP MY_PROXY_RULES", fatal_on_error=True) log_and_print("自定义防火墙链双缓冲更新完毕。")
log_and_print("正在校验主链兜底 DROP 规则...") execute_cmd(tn, "ip6tables -C FORWARD -j DROP 2>/dev/null || ip6tables -A FORWARD -j DROP", fatal_on_error=False) log_and_print("兜底拦截规则确认生效。")
tn.write(b"exit\n") tn.close() log_and_print("所有任务顺利完成,退出会话。")
except Exception as e: log_and_print(f"脚本执行终止: {e}", level=logging.ERROR) sys.exit(1)
if __name__ == "__main__": run_telnet_session()
|