欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python 写守护进程代码示例

程序员文章站 2023-11-16 11:51:10
下面是一个简单的python守护进程的例子。实现监控邮箱是否超过容量,如果超限则发邮件的功能。 实现的功能非常简单,只是为了说明下如何用python写守护进程。有了这个,相信其他...

下面是一个简单的python守护进程的例子。实现监控邮箱是否超过容量,如果超限则发邮件的功能。

实现的功能非常简单,只是为了说明下如何用python写守护进程。有了这个,相信其他复杂的守护进程你也可以搞定了。

###################################################################################################################

    import os
    import sys

    class daemonize:
        """ 创建守护进程的基类 """
        def daemonize(self):
            try:
                #this process would create a parent and a child
                pid = os.fork()
                if pid > 0:
                    # take care of the first parent
                    sys.exit(0)
            except oserror, err:
                sys.stderr.write("fork 1 has failed --> %d--[%s]\n" % (err.errno,
                                                                  err.strerror))
                sys.exit(1)

            #change to root
            os.chdir('/')
            #detach from terminal
            os.setsid()
            # file to be created ?
            os.umask(0)
            try:
                # this process creates a parent and a child
                pid = os.fork()
                if pid > 0:
                    print "daemon process pid %d" % pid
                    #bam
                    sys.exit(0)
            except oserror, err:
                sys.stderr.write("fork 2 has failed --> %d--[%s]\n" % (err.errno,
                                                                  err.strerror))
                sys.exit(1)

            sys.stdout.flush()
            sys.stderr.flush()
        def start_daemon(self):
            self.daemonize()
            self.run_daemon()

        def run_daemon(self):
        """override"""
            pass
###################################################################################################################

    from daemonize import daemonize
    from email.mimetext import mimetext
    import os
    import smtplib
    from smtplib import smtpexception
    import time
     class watchfile(daemonize):

        def __init__(self, file_path, size_limit=15728640):
            self.file = os.path.realpath(file_path)
            print '---'
            assert os.path.isfile(self.file), '%s does not exist' % self.file
            print '+++'
            self.userhome = os.getenv('home')
            self.smtpserver = '*your-host*'
            self.recipient_list = ['recipient@domain']
            self.sender = 'sender@domain'
            self.file_size_limit = size_limit
            self.email_body = os.path.join(self.userhome, 'email-msg.txt')
            self.interval = 3600
            self.log_file = os.path.join(self.userhome, 'inboxlog.txt')
        def send_an_email(self):
           """method to send email to the recipients"""
           email_body = open(self.email_body, 'r').read()
           msg = mimetext(email_body)
           msg['subject'] = 'your email inbox has exceeded size !'
           msg['from'] = 'inbox watchdog'
           msg['reply-to'] = none
           msg['to'] = self.recipient_list

           session_obj = smtplib.smtp()
           session_obj.connect(self.smtpserver)
           try:
               session_obj.sendmail(self.sender, self.recipient_list, msg.as_string())
           except smtpexception:
                print "unable to send emails."
           finally:
               session_obj.close()
        def watch(self):
            """method to watch your inbox. this also logs the time when your
               inbox was last checked."""
            current_file_size = os.path.getsize(self.file)
            if current_file_size > self.file_size_limit:
                self.send_an_email()
            f = open(self.log_file, 'a')
            f.write('last checked on : %s' % time.asctime(time.localtime(time.time())))
            f.write('\n')
            f.close()

        def run_daemon(self):
            """over ridden method from daemonize.this starts the daemon."""
            while 1:
                self.watch()
                time.sleep(self.interval)
    if __name__ == '__main__':
        watchdog = watchfile('path-to-your-inbox')
        watchdog.start_daemon()