1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
| import json import smtplib import requests from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from tencentcloud.common import credential from tencentcloud.common.profile.client_profile import ClientProfile from tencentcloud.common.profile.http_profile import HttpProfile from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.vpc.v20170312 import vpc_client, models
def update_ip2security_group(SecretId, SecretKey, security_group_id, account_subject, office_ip): cred = credential.Credential(SecretId, SecretKey) httpProfile = HttpProfile() httpProfile.endpoint = "vpc.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile
client = vpc_client.VpcClient(cred, "ap-guangzhou", clientProfile) try: req = models.DescribeSecurityGroupPoliciesRequest() params = { "SecurityGroupId": security_group_id } req.from_json_string(json.dumps(params))
resp = client.DescribeSecurityGroupPolicies(req) json_req = resp.to_json_string() security_group_policy_version = json.loads(json_req)['SecurityGroupPolicySet']['Version'] handle_info(account_subject, security_group_policy_version, office_ip)
print("当前规则如下:\n", json.loads(json_req)['SecurityGroupPolicySet'])
replace_security_policy(SecretId, SecretKey, security_group_id, security_group_policy_version, office_ip)
except TencentCloudSDKException as err: print(err)
def replace_security_policy(SecretId, SecretKey, security_group_id, version, office_ip): """ 注意: 此方法是基于安全组规则索引来修改规则,如果增加了新的规则,则之前的添加的规则索引都会+1, 考虑到以上SDK规则要求: 1.建议在使用此方法时先記錄上一次的IP信息,根據上一次IP記錄, 2.使要修改的安全组下只有一条规则即可用该方法。 以上办法任选其一! """ cred = credential.Credential(SecretId, SecretKey) httpProfile = HttpProfile() httpProfile.endpoint = "vpc.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile
client = vpc_client.VpcClient(cred, "ap-guangzhou", clientProfile) try: req = models.ReplaceSecurityGroupPolicyRequest() params = { "SecurityGroupId": security_group_id, "SecurityGroupPolicySet": { "Version": version, "Ingress": [ { "PolicyIndex": 0, "Protocol": "ALL", "Port": "ALL", "CidrBlock": office_ip, "Action": "ACCEPT", "PolicyDescription": "办公室IP", "ModifyTime": "" } ] } } req.from_json_string(json.dumps(params))
resp = client.ReplaceSecurityGroupPolicy(req) print(resp.to_json_string()) print("规则修改执行完毕!")
except TencentCloudSDKException as err: print(err)
def handle_info(account_subject, version, ip_address): print(f"当前执行修改的账户为:{account_subject},安全组修改版本为:{version},修改后的IP地址为:{ip_address}")
def send_mail(old_ip, new_ip): """ smtp_server : 邮件SMTP服务器地址【这里使用的腾讯企业邮件服务器】 sender_email:发件人地址 sender_password:发件人邮箱授权码【可登录网页端安全设置里获取smtp授权码】 receiver_email_list: 收件人邮箱列表 mail_subject:邮件主题 message:邮件内容正文 """ smtp_server = "smtp.exmail.qq.com" sender_email = 'mail@qq.com' sender_password = "xxxxxxxx" receiver_email_list = ['zhangsan@qq.com', 'lisi@qq.com'] receiver_email = receiver_email_list mail_subject = " Office IP Changed!"
msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = ','.join(receiver_email) msg['Subject'] = mail_subject
message = f"The old ip is {old_ip},The new ip is {new_ip} !" msg.attach(MIMEText(message, 'plain'))
try: server = smtplib.SMTP_SSL(smtp_server, 465) server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email_list, msg.as_string()) print("邮件发送成功") server.quit() except Exception as e: print("邮件发送失败:", e)
def send_wxwork(old_ip, new_ip): wx_work_hook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" notice1 = { "msgtype": "markdown", "markdown": { "content": "# IP变动通知\n" f">旧IP:<font color=\"comment\">{old_ip}</font>\n" f">新IP:<font color=\"info\">{new_ip}</font>\n" } }
response = requests.post(wx_work_hook, json=notice1) print(response.text)
if __name__ == '__main__': """ 1.通过接口获取办公室公网IP 2.通过获取的IP与旧文件IP比对是否一致,一致则不执行后续操作 3.不一致则发送邮件及企业微信通知, 4.调用腾讯云SDK更新不同账户下办公室IP安全组规则 """ Secret_dict = { "aa": { "account_subject": "阿里巴巴", "SecretId": "SecretId", "SecretKey": "SecretKey", "security_group_id": "security_group_id" }, "bb": { "account_subject": "腾讯", "SecretId": "SecretId", "SecretKey": "SecretKey", "security_group_id": "security_group_id" } }
aa_secretId = Secret_dict['aa']['SecretId'] aa_secretKey = Secret_dict['aa']['SecretKey'] aa_security_group_id = Secret_dict['aa']['security_group_id'] aa_account_subject = Secret_dict['aa']['account_subject']
bb_secretId = Secret_dict['bb']['SecretId'] bb_secretKey = Secret_dict['bb']['SecretKey'] bb_security_group_id = Secret_dict['bb']['security_group_id'] bb_account_subject = Secret_dict['bb']['account_subject']
res = requests.get('http://icanhazip.com/)') net_ip = res.text.strip('\n')
with open('office_ip', 'r') as f: file_ip = f.read()
if net_ip == file_ip: print(f"IP一致!当前IP为:{net_ip}") exit(1)
print("IP不一致") with open('office_ip', 'w') as f: f.seek(0) f.write(net_ip)
send_mail(file_ip, net_ip) send_wxwork(file_ip, net_ip)
update_ip2security_group(aa_secretId, aa_secretKey, aa_security_group_id, aa_account_subject, net_ip) update_ip2security_group(bb_secretId, bb_secretKey, bb_security_group_id, bb_account_subject, net_ip)
|