监控程序状态和跨多个服务器实施电子邮件通知的最佳方法?

Optimal Approach for Monitoring Program Status and Implementing Email Notifications Across Multiple Servers?

提问人:voidecho 提问时间:11/4/2023 更新时间:11/4/2023 访问量:33

问:

如何设置与多主机上的程序运行状态相关的邮件推送机制?

我想要的是:一封可以部署在多个主机上的传出电子邮件,具有相同的密码或访问令牌,以便 python 或其他脚本可以在运行结束时发送相关状态。

我尝试了 SMTP。但是,SMTP提供商一般要求每个客户端都有单独的访问密码,并且需要在每个主机上单独生成(我尝试了QQ,sina.com 和 163.com),但是这个数字一般是有限的(我的QQ邮箱是5个)。

我想在许多服务器上部署此功能,因此这种方法不是很有用。

我对计算机网络和云服务没有足够的了解,所以我非常感谢您的耐心解释。

我使用以下 python 脚本在单个主机上建立了电子邮件推送机制。(以下代码修改自 。有关详细信息,请参阅此处py_reminder)


def send_email(task, time_start=0.0, args=None, kwargs=None, error='', to=''):
    config = {
        'SMTP': 'smtp.xxxx.com',
        'PORT': 25,
        'ADDRESS': '[email protected]',
        'PASSWORD': 'xxxx',
    }
    server = smtplib.SMTP_SSL(config['SMTP'], config['PORT'])
    server.login(config['ADDRESS'], config['PASSWORD'])

    msg = MIMEMultipart()
    msg['From'] = formataddr(('PyReminder', config['ADDRESS']))
    if to:
        msg['To'] = to
    else:
        msg['To'] = config['TO']

    formatted_time = datetime.now().strftime('%Y-%m-%d %H:%M')
    time_usage = (timer() - time_start) / 60 if time_start else 0
    common = f"""Task: {task}
Time: {formatted_time}
Machine: {gethostname()}
Time Usage: {time_usage:.2f} mins
Args: {args if args else '-'}
Kwargs: {kwargs if kwargs else '-'}
Status: """

    if error:
        msg['X-Priority'] = '2'
        message = common + 'Error! Please check! \n\n%s' % error
        msg['Subject'] = '[PyReminder] Error for %s' % task
    else:
        message = common + 'Complete!'
        msg['Subject'] = '[PyReminder] Completion for %s' % task

    msg.attach(MIMEText(message, 'plain'))
    server.sendmail(config['ADDRESS'], to, msg.as_string())
    server.quit()
    del msg


def monitor(task='Your Task', to='', mute_success=False, disable=False):
    def decorator(func):
        @functools.wraps(func)
        def wrapper_decorator(*args, **kwargs):
            logger = logging.Logger('catch_all')
            ts = timer()
            try:
                value = func(*args, **kwargs)
                if not disable and not mute_success:
                    send_email(task=task, time_start=ts, args=args, kwargs=kwargs, error='', to=to)
                return value
            except Exception as e:
                logger.error(e, exc_info=True)
                if not disable:
                    send_email(task=task, time_start=ts, args=args, kwargs=kwargs, error='%s\n%s\n%s' % sys.exc_info(),
                               to=to)

        return wrapper_decorator

    return decorator


@monitor('test', '[email protected]')
def test():
    sleep(1)
Python 身份验证 电子邮件 自动化 推送通知

评论


答: 暂无答案