1. 在mysql中创建job表,用于储存队列任务,如下:
create table jobs( id auto_increment not null primary key, message text not null, job_status not null default 0 );
message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列
2. 有一个生产者进程,往job表中放新的数据,进行排队:
insert into jobs(message) values('msg1');
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。
#!/usr/bin/env python2.7 # # -*- coding:utf-8 -*- # # desc : # import logging, time import mysqldb class glock: def __init__(self, db): self.db = db def _execute(self, sql): cursor = self.db.cursor() try: ret = none cursor.execute(sql) if cursor.rowcount != 1: logging.error("multiple rows returned in mysql lock function.") ret = none else: ret = cursor.fetchone() cursor.close() return ret except exception, ex: logging.error("execute sql \"%s\" failed! exception: %s", sql, str(ex)) cursor.close() return none def lock(self, lockstr, timeout): sql = "select get_lock('%s', %s)" % (lockstr, timeout) ret = self._execute(sql) if ret[0] == 0: logging.debug("another client has previously locked '%s'.", lockstr) return false elif ret[0] == 1: logging.debug("the lock '%s' was obtained successfully.", lockstr) return true else: logging.error("error occurred!") return none def unlock(self, lockstr): sql = "select release_lock('%s')" % (lockstr) ret = self._execute(sql) if ret[0] == 0: logging.debug("the lock '%s' the lock is not released(the lock was not established by this thread).", lockstr) return false elif ret[0] == 1: logging.debug("the lock '%s' the lock was released.", lockstr) return true else: logging.error("the lock '%s' did not exist.", lockstr) return none #init logging def init_logging(): sh = logging.streamhandler() logger = logging.getlogger() logger.setlevel(logging.debug) formatter = logging.formatter('%(asctime)s -%(module)s:%(filename)s-l%(lineno)d-%(levelname)s: %(message)s') sh.setformatter(formatter) logger.addhandler(sh)"current log level is : %s",logging.getlevelname(logger.geteffectivelevel())) def main(): init_logging() db = mysqldb.connect(host='localhost', user='root', passwd='') lock_name = 'queue' l = glock(db) ret = l.lock(lock_name, 10) if ret != true: logging.error("can't get lock! exit!") quit() time.sleep(10)"you can do some synchronization work across processes!") ##todo ## you can do something in here ## l.unlock(lock_name) if __name__ == "__main__": main()
l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id
测试的时候,启动两个, 结果如下:
[@tj-10-47 test]# ./ 2014-03-14 17:08:40,277 current log level is : debug 2014-03-14 17:08:40,299 the lock 'queue' was obtained successfully. 2014-03-14 17:08:50,299 you can do some synchronization work across processes! 2014-03-14 17:08:50,299 the lock 'queue' the lock was released.
可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。
[@tj-10-47 test]# ./ 2014-03-14 17:08:46,873 current log level is : debug 2014-03-14 17:08:50,299 the lock 'queue' was obtained successfully. 2014-03-14 17:09:00,299 you can do some synchronization work across processes! 2014-03-14 17:09:00,300 the lock 'queue' the lock was released. [@tj-10-47 test]#
下一篇: 学车这事还是先缓一缓