使用expect

脚本

将下面内容写入exp文件,给执行权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/expect

# 设置超时时间
set timeout 30

# 启动 Telnet 会话
spawn telnet 192.168.1.1 23

# 等待登录提示
expect "Login:"
send "root\r"

# 等待密码提示
expect "Password:"
send "Zte521\r"

# 等待命令提示符
expect "#"
send "ip6tables -L FORWARD --line-numbers\r"

expect {
"DROP" {
# 如果输出包含特定字符串,则退出
send "exit\r"
}
timeout {
# 如果没有匹配到,发送其他命令
send "ip6tables -A FORWARD -i br0 -o ppp0 -j ACCEPT\r"
expect "#"
send "ip6tables -A FORWARD -i ppp0 -m state --state ESTABLISHED,RELATED -j ACCEPT\r"
expect "#"
send "ip6tables -I FORWARD -p tcp --dport 443 -d ::be24:11ff:fe16:ceb1/::ffff:ffff:ffff:ffff -j ACCEPT\r"
expect "#"
send "ip6tables -I FORWARD -p udp --dport 8090 -d ::86a9:3eff:fe70:7209/::ffff:ffff:ffff:ffff -j ACCEPT\r"
expect "#"
send "ip6tables -A FORWARD -j DROP\r"
expect "#"
send "ip6tables -L FORWARD --line-numbers\r"
expect "#"
send "exit\r"
}
}

定时

可以借助青龙

docker部署示例:

1
2
3
docker exec -it qinglong bash
apk update
apk add expect busybox-extras

使用python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3
import telnetlib
import time
import sys
import logging
from logging.handlers import RotatingFileHandler

# ==========================================
# 1. 基础配置区
# ==========================================
ROUTER_IP = "192.168.1.1"
TELNET_PORT = 23
USERNAME = "root"
PASSWORD = "Zte521"
PROMPT = b"#"
LOG_FILE = "/var/log/apply_iptables_rules.log"

# ==========================================
# 2. 核心规则区 (写入临时影子链 MY_PROXY_TMP)
# ==========================================
# 重点:这里只放行流量,绝对不加 DROP
CORE_RULES = [
"ip6tables -A MY_PROXY_TMP -i br0 -o ppp0 -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -i ppp0 -m state --state ESTABLISHED,RELATED -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p tcp --dport 443 -d ::be24:11ff:fe16:ceb1/::ffff:ffff:ffff:ffff -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p tcp --dport 443 -d ::40e4:bff:feea:2201/::ffff:ffff:ffff:ffff -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p udp --dport 8090 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p udp --dport 8092 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p udp --dport 20000:30000 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p tcp --dport 443 -d ::be24:11ff:fe76:16f9/::ffff:ffff:ffff:ffff -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p udp --dport 8090 -d ::86a9:3eff:fe70:7209/::ffff:ffff:ffff:ffff -j ACCEPT",
"ip6tables -A MY_PROXY_TMP -p tcp --dport 920 -d ::86a9:3eff:fe70:7209/::ffff:ffff:ffff:ffff -j ACCEPT"
]

# ==========================================
# 3. 日志配置 (带自动轮转,防止撑爆硬盘)
# ==========================================
log_handler = RotatingFileHandler(LOG_FILE, maxBytes=2*1024*1024, backupCount=3, encoding='utf-8')
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log_handler.setFormatter(log_formatter)

logger = logging.getLogger()
logger.setLevel(logging.INFO)
if logger.hasHandlers():
logger.handlers.clear()
logger.addHandler(log_handler)

def log_and_print(message, level=logging.INFO):
print(message)
if level == logging.INFO:
logger.info(message)
elif level == logging.ERROR:
logger.error(message)

# ==========================================
# 4. 核心执行逻辑
# ==========================================
def execute_cmd(tn, cmd, fatal_on_error=True):
tn.read_very_eager()
tn.write(cmd.encode('ascii') + b"\n")

raw_output = tn.read_until(PROMPT, timeout=5)
output = raw_output.decode('ascii', errors='ignore').strip()
time.sleep(0.1)

if fatal_on_error:
error_keywords = ["error", "bad", "failed", "invalid", "no chain", "not found", "command not found"]
if any(keyword in output.lower() for keyword in error_keywords):
error_msg = f"致命错误!命令执行失败: {cmd}\n回显信息: {output}"
log_and_print(error_msg, level=logging.ERROR)
raise RuntimeError("规则注入中断,已触发熔断保护。")
return output

def run_telnet_session():
try:
log_and_print(f"开始执行无感更新任务,连接路由器 {ROUTER_IP}...")
tn = telnetlib.Telnet(ROUTER_IP, TELNET_PORT, timeout=15)

tn.read_until(b"Login:", timeout=5)
tn.write(USERNAME.encode('ascii') + b"\n")
tn.read_until(b"Password:", timeout=5)
tn.write(PASSWORD.encode('ascii') + b"\n")
tn.read_until(PROMPT, timeout=5)
log_and_print("登录成功,开始构建影子链...")

# 第一步:构建影子链 (MY_PROXY_TMP)
execute_cmd(tn, "ip6tables -N MY_PROXY_TMP 2>/dev/null", fatal_on_error=False)
execute_cmd(tn, "ip6tables -F MY_PROXY_TMP", fatal_on_error=True)

for cmd in CORE_RULES:
execute_cmd(tn, cmd, fatal_on_error=True)
log_and_print("影子链构建完成。")

# 第二步:原子级指针切换
log_and_print("正在执行原子级无感切换...")
execute_cmd(tn, "ip6tables -I FORWARD 1 -j MY_PROXY_TMP", fatal_on_error=True)
execute_cmd(tn, "ip6tables -D FORWARD -j MY_PROXY_RULES 2>/dev/null", fatal_on_error=False)

# 第三步:清理战场与转正
execute_cmd(tn, "ip6tables -F MY_PROXY_RULES 2>/dev/null", fatal_on_error=False)
execute_cmd(tn, "ip6tables -X MY_PROXY_RULES 2>/dev/null", fatal_on_error=False)
execute_cmd(tn, "ip6tables -E MY_PROXY_TMP MY_PROXY_RULES", fatal_on_error=True)
log_and_print("自定义防火墙链双缓冲更新完毕。")

# =========================================================
# 第四步:安全处理全局兜底 DROP
# =========================================================
log_and_print("正在校验主链兜底 DROP 规则...")
# 逻辑说明:如果 FORWARD 中没有 DROP 规则 (-C 失败返回1),则执行追加 (-A)
# 隐藏了可能产生的报错信息,不触发 Python 的 fatal 熔断
execute_cmd(tn, "ip6tables -C FORWARD -j DROP 2>/dev/null || ip6tables -A FORWARD -j DROP", fatal_on_error=False)
log_and_print("兜底拦截规则确认生效。")

# 退出
tn.write(b"exit\n")
tn.close()
log_and_print("所有任务顺利完成,退出会话。")

except Exception as e:
log_and_print(f"脚本执行终止: {e}", level=logging.ERROR)
sys.exit(1)

if __name__ == "__main__":
run_telnet_session()

注意警告:

1
2
apply_firewall_rules.py:2: DeprecationWarning: 'telnetlib' is deprecated and slated for removal in Python 3.13
import telnetlib