| import json import smtplib import requests from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from tencentcloud.common import credential from tencentcloud.common.profile.client_profile import ClientProfile from tencentcloud.common.profile.http_profile import HttpProfile from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.vpc.v20170312 import vpc_client, models
def update_ip2security_group(SecretId, SecretKey, security_group_id, account_subject, office_ip): cred = credential.Credential(SecretId, SecretKey) httpProfile = HttpProfile() httpProfile.endpoint = "vpc.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile
client = vpc_client.VpcClient(cred, "ap-guangzhou", clientProfile) try: req = models.DescribeSecurityGroupPoliciesRequest() params = { "SecurityGroupId": security_group_id } req.from_json_string(json.dumps(params))
resp = client.DescribeSecurityGroupPolicies(req) json_req = resp.to_json_string() security_group_policy_version = json.loads(json_req)['SecurityGroupPolicySet']['Version'] handle_info(account_subject, security_group_policy_version, office_ip)
print("当前规则如下:\n", json.loads(json_req)['SecurityGroupPolicySet'])
replace_security_policy(SecretId, SecretKey, security_group_id, security_group_policy_version, office_ip)
except TencentCloudSDKException as err: print(err)
def replace_security_policy(SecretId, SecretKey, security_group_id, version, office_ip): """ 注意: 此方法是基于安全组规则索引来修改规则,如果增加了新的规则,则之前的添加的规则索引都会+1, 考虑到以上SDK规则要求: 1.建议在使用此方法时先記錄上一次的IP信息,根據上一次IP記錄, 2.使要修改的安全组下只有一条规则即可用该方法。 以上办法任选其一! """ cred = credential.Credential(SecretId, SecretKey) httpProfile = HttpProfile() httpProfile.endpoint = "vpc.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile
client = vpc_client.VpcClient(cred, "ap-guangzhou", clientProfile) try: req = models.ReplaceSecurityGroupPolicyRequest() params = { "SecurityGroupId": security_group_id, "SecurityGroupPolicySet": { "Version": version, "Ingress": [ { "PolicyIndex": 0, "Protocol": "ALL", "Port": "ALL", "CidrBlock": office_ip, "Action": "ACCEPT", "PolicyDescription": "办公室IP", "ModifyTime": "" } ] } } req.from_json_string(json.dumps(params))
resp = client.ReplaceSecurityGroupPolicy(req) print(resp.to_json_string()) print("规则修改执行完毕!")
except TencentCloudSDKException as err: print(err)
def handle_info(account_subject, version, ip_address): print(f"当前执行修改的账户为:{account_subject},安全组修改版本为:{version},修改后的IP地址为:{ip_address}")
def send_mail(old_ip, new_ip): """ smtp_server : 邮件SMTP服务器地址【这里使用的腾讯企业邮件服务器】 sender_email:发件人地址 sender_password:发件人邮箱授权码【可登录网页端安全设置里获取smtp授权码】 receiver_email_list: 收件人邮箱列表 mail_subject:邮件主题 message:邮件内容正文 """ smtp_server = "smtp.exmail.qq.com" sender_email = 'mail@qq.com' sender_password = "xxxxxxxx" receiver_email_list = ['zhangsan@qq.com', 'lisi@qq.com'] receiver_email = receiver_email_list mail_subject = " Office IP Changed!"
msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = ','.join(receiver_email) msg['Subject'] = mail_subject
message = f"The old ip is {old_ip},The new ip is {new_ip} !" msg.attach(MIMEText(message, 'plain'))
try: server = smtplib.SMTP_SSL(smtp_server, 465) server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email_list, msg.as_string()) print("邮件发送成功") server.quit() except Exception as e: print("邮件发送失败:", e)
def send_wxwork(old_ip, new_ip): wx_work_hook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" notice1 = { "msgtype": "markdown", "markdown": { "content": "# IP变动通知\n" f">旧IP:<font color=\"comment\">{old_ip}</font>\n" f">新IP:<font color=\"info\">{new_ip}</font>\n" } }
response = requests.post(wx_work_hook, json=notice1) print(response.text)
if __name__ == '__main__': """ 1.通过接口获取办公室公网IP 2.通过获取的IP与旧文件IP比对是否一致,一致则不执行后续操作 3.不一致则发送邮件及企业微信通知, 4.调用腾讯云SDK更新不同账户下办公室IP安全组规则 """ Secret_dict = { "aa": { "account_subject": "阿里巴巴", "SecretId": "SecretId", "SecretKey": "SecretKey", "security_group_id": "security_group_id" }, "bb": { "account_subject": "腾讯", "SecretId": "SecretId", "SecretKey": "SecretKey", "security_group_id": "security_group_id" } }
aa_secretId = Secret_dict['aa']['SecretId'] aa_secretKey = Secret_dict['aa']['SecretKey'] aa_security_group_id = Secret_dict['aa']['security_group_id'] aa_account_subject = Secret_dict['aa']['account_subject']
bb_secretId = Secret_dict['bb']['SecretId'] bb_secretKey = Secret_dict['bb']['SecretKey'] bb_security_group_id = Secret_dict['bb']['security_group_id'] bb_account_subject = Secret_dict['bb']['account_subject']
res = requests.get('http://icanhazip.com/)') net_ip = res.text.strip('\n')
with open('office_ip', 'r') as f: file_ip = f.read()
if net_ip == file_ip: print(f"IP一致!当前IP为:{net_ip}") exit(1)
print("IP不一致") with open('office_ip', 'w') as f: f.seek(0) f.write(net_ip)
send_mail(file_ip, net_ip) send_wxwork(file_ip, net_ip)
update_ip2security_group(aa_secretId, aa_secretKey, aa_security_group_id, aa_account_subject, net_ip) update_ip2security_group(bb_secretId, bb_secretKey, bb_security_group_id, bb_account_subject, net_ip)