需求来源:
由于云服务器的安全组限制了只能在公司IP下远程连接,每次更新安全组时只能通过手动更新,办公室IP是动态IP,每隔三天变一次IP,时间久了就觉得麻烦,所以在查阅了腾讯云文档后发现有安全组的SDK,故而此脚本应运而生~~🤣🤣🤣

脚本说明

1
2
3
4
1.通过接口获取办公室公网IP
2.通过获取的IP与旧文件IP比对是否一致,一致则不执行后续操作
3.不一致则发送邮件及企业微信通知,
4.调用腾讯云SDK更新不同账户下办公室IP安全组规则

完整代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# -*- coding: utf-8 -*-
import json
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.vpc.v20170312 import vpc_client, models


# 查询安全组规则的版本【Version】及入站规则【Ingress】
def update_ip2security_group(SecretId, SecretKey, security_group_id, account_subject, office_ip):
cred = credential.Credential(SecretId, SecretKey)
# 实例化一个http选项,可选的,没有特殊需求可以跳过
httpProfile = HttpProfile()
httpProfile.endpoint = "vpc.tencentcloudapi.com"

# 实例化一个client选项,可选的,没有特殊需求可以跳过
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile

client = vpc_client.VpcClient(cred, "ap-guangzhou", clientProfile)
try:
# 实例化一个请求对象,每个接口都会对应一个request对象
req = models.DescribeSecurityGroupPoliciesRequest()
params = {
"SecurityGroupId": security_group_id
}
req.from_json_string(json.dumps(params))

# 返回的resp是一个DescribeSecurityGroupPoliciesResponse的实例,与请求对象对应
resp = client.DescribeSecurityGroupPolicies(req)
# 输出json格式的字符串回包
json_req = resp.to_json_string()
# 获取当前安全组规则的修改版本
security_group_policy_version = json.loads(json_req)['SecurityGroupPolicySet']['Version']
# policy_index = json.loads(json_req)['SecurityGroupPolicySet']['Ingress'][0]['PolicyIndex']
# 日志打印
handle_info(account_subject, security_group_policy_version, office_ip)

print("当前规则如下:\n", json.loads(json_req)['SecurityGroupPolicySet'])

# 根据安全的顺序的索引来替换
replace_security_policy(SecretId, SecretKey, security_group_id, security_group_policy_version, office_ip)

except TencentCloudSDKException as err:
print(err)


def replace_security_policy(SecretId, SecretKey, security_group_id, version, office_ip):
"""
注意:
此方法是基于安全组规则索引来修改规则,如果增加了新的规则,则之前的添加的规则索引都会+1,
考虑到以上SDK规则要求:
1.建议在使用此方法时先記錄上一次的IP信息,根據上一次IP記錄,
2.使要修改的安全组下只有一条规则即可用该方法。
以上办法任选其一!
"""
cred = credential.Credential(SecretId, SecretKey)
# 实例化一个http选项,可选的,没有特殊需求可以跳过
httpProfile = HttpProfile()
httpProfile.endpoint = "vpc.tencentcloudapi.com"

# 实例化一个client选项,可选的,没有特殊需求可以跳过
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile

client = vpc_client.VpcClient(cred, "ap-guangzhou", clientProfile)
try:
# 实例化一个请求对象,每个接口都会对应一个request对象
req = models.ReplaceSecurityGroupPolicyRequest()
params = {
"SecurityGroupId": security_group_id,
"SecurityGroupPolicySet": {
"Version": version,
"Ingress": [
{
"PolicyIndex": 0,
"Protocol": "ALL",
"Port": "ALL",
"CidrBlock": office_ip,
"Action": "ACCEPT",
"PolicyDescription": "办公室IP",
"ModifyTime": ""
}
]
}
}
req.from_json_string(json.dumps(params))

# 返回的resp是一个ReplaceSecurityGroupPolicyResponse的实例,与请求对象对应
resp = client.ReplaceSecurityGroupPolicy(req)
# 输出json格式的字符串回包
print(resp.to_json_string())
print("规则修改执行完毕!")

except TencentCloudSDKException as err:
print(err)


def handle_info(account_subject, version, ip_address):
print(f"当前执行修改的账户为:{account_subject},安全组修改版本为:{version},修改后的IP地址为:{ip_address}")


def send_mail(old_ip, new_ip):
"""
smtp_server : 邮件SMTP服务器地址【这里使用的腾讯企业邮件服务器】
sender_email:发件人地址
sender_password:发件人邮箱授权码【可登录网页端安全设置里获取smtp授权码】
receiver_email_list: 收件人邮箱列表
mail_subject:邮件主题
message:邮件内容正文
"""
smtp_server = "smtp.exmail.qq.com"
sender_email = 'mail@qq.com'
sender_password = "xxxxxxxx"
receiver_email_list = ['zhangsan@qq.com', 'lisi@qq.com']
receiver_email = receiver_email_list
mail_subject = " Office IP Changed!"

# 设置邮件内容
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = ','.join(receiver_email)
msg['Subject'] = mail_subject

message = f"The old ip is {old_ip},The new ip is {new_ip} !"
# 邮件正文
msg.attach(MIMEText(message, 'plain'))

# 连接到QQ邮箱的SMTP服务器
try:
server = smtplib.SMTP_SSL(smtp_server, 465)
server.login(sender_email, sender_password)
server.sendmail(sender_email, receiver_email_list, msg.as_string())
print("邮件发送成功")
server.quit()
except Exception as e:
print("邮件发送失败:", e)


def send_wxwork(old_ip, new_ip):
wx_work_hook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
notice1 = {
"msgtype": "markdown",
"markdown": {
"content":
"# IP变动通知\n"
f">旧IP:<font color=\"comment\">{old_ip}</font>\n"
f">新IP:<font color=\"info\">{new_ip}</font>\n"
}
}

response = requests.post(wx_work_hook, json=notice1)
print(response.text)


if __name__ == '__main__':
"""
1.通过接口获取办公室公网IP
2.通过获取的IP与旧文件IP比对是否一致,一致则不执行后续操作
3.不一致则发送邮件及企业微信通知,
4.调用腾讯云SDK更新不同账户下办公室IP安全组规则
"""
Secret_dict = {
"aa": {
"account_subject": "阿里巴巴",
"SecretId": "SecretId",
"SecretKey": "SecretKey",
"security_group_id": "security_group_id"
},
"bb": {
"account_subject": "腾讯",
"SecretId": "SecretId",
"SecretKey": "SecretKey",
"security_group_id": "security_group_id"
}
}

aa_secretId = Secret_dict['aa']['SecretId']
aa_secretKey = Secret_dict['aa']['SecretKey']
aa_security_group_id = Secret_dict['aa']['security_group_id']
aa_account_subject = Secret_dict['aa']['account_subject']

bb_secretId = Secret_dict['bb']['SecretId']
bb_secretKey = Secret_dict['bb']['SecretKey']
bb_security_group_id = Secret_dict['bb']['security_group_id']
bb_account_subject = Secret_dict['bb']['account_subject']

# 1.获取IP
res = requests.get('http://icanhazip.com/)')
net_ip = res.text.strip('\n')

# 2.获取旧文件IP
with open('office_ip', 'r') as f:
file_ip = f.read()

# 3.比对IP
if net_ip == file_ip:
print(f"IP一致!当前IP为:{net_ip}")
exit(1)

print("IP不一致")
# 更新IP文件
with open('office_ip', 'w') as f:
f.seek(0) # 调整句柄到开头
f.write(net_ip)

# 发送邮件通知及企业微信通知
send_mail(file_ip, net_ip)
send_wxwork(file_ip, net_ip)

# 修改aa账户办公室IP安全组规则
update_ip2security_group(aa_secretId, aa_secretKey, aa_security_group_id, aa_account_subject, net_ip)
# 修改bb账户办公室IP安全组规则
update_ip2security_group(bb_secretId, bb_secretKey, bb_security_group_id, bb_account_subject, net_ip)

最后

1
以上完整脚本,有需要可自身场景需求修改。