欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  php教程

PHP CLI 多进程执行启动脚本第二版

程序员文章站 2022-03-09 23:17:45
...

对于Linux的Shell编程不熟,用PHP写了个可以在终端和运行的多进程并行框架。功能很简单,能基本满足Linux下同时批量处理一些耗时的工作(修正第一版上的一些错误,和作一些优化) 第一版的地址:http://www.oschina.net/code/snippet_779765_44045 无 #!/usr/

对于 Linux 的 Shell 编程不熟,用 PHP 写了个可以在终端和运行的多进程并行框架。功能很简单,能基本满足 Linux 下同时批量处理一些耗时的工作(修正第一版上的一些错误,和作一些优化)
第一版的地址:http://www.oschina.net/code/snippet_779765_44045 PHP CLI 多进程执行启动脚本第二版
#!/usr/bin/env php

 *
 * bat-test.php 脚本内容如下:
 159);
}

function b($line){
	do{
		bat::notify("我是显示传递的参数 \$line = $line");
		usleep(500000);
	}while(mt_rand(100, 999) > 359);
}

function c(){
	global $x;
	bat::notify("多个任务之间的初始变量值不受影响, \$x = $x");
	bat::notify("我是暂停 9 秒时间测试");
	sleep(9);
	bat::notify("我是出错代码 5 测试");
	exit(5);
}

?>
 */

/** 确保这个脚本只能运行在 SHELL 中 */
if(substr(php_sapi_name(), 0, 3) !== 'cli'){
	die("This Programe can only be run in CLI mode.\n");
}

if(!is_callable('pcntl_fork') || !is_callable('msg_send')){
	bat::message("本程序需要 pcntl, sysvmsg 扩展,但您的系统没有安装!", 2);
	exit(5);
}

class bat{
	static private
			$max = 3,
			$total = 0,
			$running = 0,
			$failure = 0,
			$finished = 0,
			$daemon = false,
			$tasks = array(),
			$msg, $msgs = array(),
			$logfile = "/tmp/bat.php.log",
			$childs, $start, $split,
			$get, $parent, $wait;

	static function init(){
		set_time_limit(0);
		error_reporting(8106 & E_ALL);
		ini_set('display_errors', 'Off');
		set_error_handler(array(__CLASS__, 'error'), E_ALL);
		set_exception_handler(array(__CLASS__, 'exception'));
		register_shutdown_function(array(__CLASS__, 'shutdown'));

		self::$start = time();
		self::$split = str_repeat('=', 512);
		self::$parent = msg_get_queue(getmypid());
	}

	static function main($inc){

		self::init();

		ob_get_level() && ob_end_clean();

		if($_SERVER["argc"] == 1){
			if($inc) return true;
			self::usage();
		}

		$i = 0;
		$daemon = false;
		$files = array();

		while(++$i = 1) continue;
						self::message("进程数量应为正整数", 2);
						self::end(8);
					}
					self::message("未指定进程数量", 2);
					self::end(7);
				case "-l":
				case "--log":
				case "--logfile":
					if(++$i  0)self::$max = $max;
	}

	static function set_logfile($log){
		if(is_dir($log)){
			$log .= "/bat.php.log";
		}
		if(is_file($log)){
			if(is_writable($log)){
				self::$logfile = $log;
			}
		}else{
			if(is_writable(dirname($log))){
				self::$logfile = $log;
			}
		}
	}

	static private function run_exit($cid, $status){
		if(!pcntl_wifexited($status) || pcntl_wexitstatus($status)){
			$msg = sprintf("%-6d%s %s", $cid, date("H:i:s"), '进程异常退出');
			if(pcntl_wifexited($status)){
				$msg .= ',错误代码:' . pcntl_wexitstatus($status);
			}
			self::$failure++;
		}else{
			$msg = sprintf("%-6d%s %s", $cid, date("H:i:s"), '进程执行完毕');
			self::$finished++;
		}
		unset(self::$childs[$cid]);
		self::$daemon || self::flush($msg, 4);
		self::$running--;
	}

	static private function run_wait($cid){
		static $childs = array();
		if(self::$wait){
			if(self::$wait != $cid){
#				$childs[$cid] = time();
				msg_send(self::$childs[$cid], 1, 'start');
			}
		}else{
			foreach(self::$childs as $cid => $t){
				msg_send($t, 1, 'start');
#				$childs[$cid] = time();
			}
		}
		self::$wait = $cid;
label_time:
		$nomsg_interval = time() + 6;
label_wait:
		$time = time();
		if(msg_receive(self::$parent, 0, $typ, 8192, $msg, false, MSG_NOERROR | MSG_IPC_NOWAIT)){
			if($typ == 3){
				self::$daemon || self::flush($msg, $time);
			}else{
				if($typ == 1){
					$msg = sprintf("%-6d%s %s", $msg, date("H:i:s"), '进程启动');
					self::$daemon || self::flush($msg, 3);
				}elseif($typ == 4){
					if($cid = pcntl_waitpid($msg, $status, WNOHANG) and $cid > 0){
						self::run_exit($cid, $status);
						return;
					}
				}elseif($typ == 5 && is_callable("bat_diy_notify")){
					bat_diy_notify($msg);
				}
			}
		}else{
			if(self::$daemon){
				if($cid = pcntl_wait($status, WNOHANG) and $cid > 0){
					self::run_exit($cid, $status);
					return;
				}else{
					sleep(6);
					goto label_time;
				}
			}elseif(self::flush(null, 5)){
				echo "\33[0;0H"; $lines = intval(`tput lines`);
				echo "\33[K运行时长:", self::run_time(), ' ', date("Y-m-d H:i:s", self::$start), ' - ', date("Y-m-d H:i:s"), "\33[$lines;0H";
			}
			usleep(200000);
		}
		if($nomsg_interval  $t){
				if($cid = pcntl_waitpid($msg, $status, WNOHANG) and $cid > 0){
					self::run_exit($cid, $status);
				}
			}
			if(self::$max > self::$running) return;
			goto label_time;
		}
		goto label_wait;
	}

	static function notify($msg, $diy = false){
		if($diy){
			if(self::$get){
				msg_send(self::$parent, 5, $msg, false);
			}else{
				bat_diy_notify((string)$msg);
			}
		}else{
			if(self::$daemon){
#				msg_send(self::$parent, 5, self::$msg, false);
			}else{
				msg_send(self::$parent, 3, self::$msg . date("H:i:s ") . $msg, false);
			}
		}
	}

	static function message($msg, $code = 0){
		if(self::$daemon){
			return true;
		}
		switch($code){
		case 0:
			echo "\33[37m提示:\33[0m", $msg, "\n";
			break;
		case 1:
			echo "\33[33m警告:\33[0m", $msg, "\n";
			break;
		case 2:
			echo "\33[31m错误:\33[0m", $msg, "\n";
			break;
		}
	}

	static function confirm($msg = "确定要继续执行"){
		if(self::$daemon)return true;# 后台默认
		echo $msg, "(yes/no)?: "; # 暂这样
		return "yes\n" == fgets(STDIN);
	}

	static function usage(){
		$bat = __CLASS__;

		echo ""
		, "Usage:\n"
		, "	$_ENV[_] [options] [-f | --file] \n"
		, "Options:\n"
		, "	-h | --help		显示本帮助信息\n"
		, "	-v | --version		查看程序版本信息\n"
		, "\n"
		, "	-m | --max 	同时执行进程数量,默认 ", self::$max, " 个\n"
		, "	-l | --log 	错误记录日志文件,默认 ", self::$logfile, "\n"
		, "	-d | --daemon		启动后台进程执行,不会占用终端和输出内容\n"
		, "Information:\n"
		, "	脚本中调用 $bat::run(fun[, arg]) 来添加任务\n"
		, "	fun 为要执行的函数名;arg 为传递给这个函数的参数,可省\n"
		, "\n"
		, "	脚本中调用 $bat::start() 来启动子进程执行上面添加的任务\n"
		, "	在子进程中,通过调用 $bat::notify(msg) 发送要显示的信息给父进程\n"
		, "	在子进程中,程序执行发生错误,要让主进程统计为失败需用 exit(num) 非零返回\n"
		, "\n"
		, "	可以在主进程中提供一个 bat_diy_notify(msg) 函数来记录子进程传回的数据\n"
		, "	子进程调用 $bat::notify(msg, true) 传递数据给父进程,区别是多了 true 参数\n"
		, "	注:msg 只能为字符串,且长度不要太长,以免多余的部分被系统丢弃\n"
		;
		self::$daemon = true;
		exit();
	}

	static function version(){
		return "Version: 0.3 by huye\n";
	}

	static private function inc($file){
		include $file;
	}

	static private function end($code = 0){
		if($code || self::$daemon){
#			echo self::$total, " 任务完成\n";
			self::$daemon = true;
			exit($code);
		}

		$cols = intval(`tput cols`);
		$lines = intval(`tput lines`);

		if(self::$total){
			self::flush("执行完毕.", 2);
			echo "\33[$lines;{$cols}H\33[1C\n\n";
		}

		if(is_file(self::$logfile) && filemtime(self::$logfile) >= self::$start){
			echo "\33[K发生错误:", self::$logfile, "\n";
		}
		echo "\33[K运行时长:", self::run_time(), ' ', date("Y-m-d H:i:s", self::$start), ' - ', date("Y-m-d H:i:s"), "\n";
		echo "\33[K执行完毕:已完成任务 ", self::$finished, " 个", self::$failure ? ",失败 " . self::$failure . " 个" : "", self::$total ? "(共 " . self::$total . " 个)" : "", "。\n";
	}

	static private function flush($msg, $time){
		$cols = intval(`tput cols`);
		$lines = intval(`tput lines`);
		if($msg){
			$_max = $cols;
			foreach(explode("\n", $msg) as $msg){
				if($cols  $x){
							$_max -= $x;
							if(++$i >= $l)break;
							$x = strlen($z = $tmp[$i]) / 3 * 2;
							if($_max > $x){
								$_max -= $x;
								$i++; continue;
							}elseif($_max  $lines){
				array_splice(self::$msgs, 0, $_max - $lines);
			}elseif($lines > $_max){
				$split = str_repeat("\n\33[K", $lines - $_max) . $split;
			}

			echo "\33[K", implode("\n\33[K", self::$msgs), "\n", $split, "\n";
		}

		$msg = "已完成任务 " . self::$finished . " 个";
		if(self::$failure) $msg .= ",失败 " . self::$failure . " 个";
		if(self::$total) $msg .= "(共 " . self::$total . " 个)";
		echo str_repeat(' ', $cols - strlen(preg_replace("#[\xe0-\xef][\x80-\xbf]{2}#", "**", $msg))), $msg, "\33[$lines;0H";
	}

	static function run_time(){
		$consume = time()
				 - self::$start;

		$str = "";
		if($consume >= 86400){
			$str = floor($consume / 86400) . "天";
			$consume = $consume % 86400;
			$zero = true;
		}
		if($consume >= 3600){
			$str .= floor($consume / 3600) . "时";
			$consume = $consume % 3600;
			$zero = true;
		}elseif($consume > 0 && isset($zero)){
			unset($zero);
			$str .= "零";
		}
		if($consume >= 60){
			$str .= floor($consume / 60) . "分";
			$consume = $consume % 60;
			$zero = true;
		}elseif($consume > 0 && isset($zero)){
			unset($zero);
			$str .= "零";
		}
		if($consume > 0){
			$str .= $consume . "秒";
		}elseif($str == ""){
			$str = "0秒";
		}

		return $str;
	}

	static function error($no, $err, $file, $line){
		if(error_reporting()){
			$log = $no & 1032 ? 'M' : ($no & 514 ? 'W' : ($no & 2048 ? 'M' : 'E'));
			$log = "[" . date("m-d H:i:s") . "] $log $line $file $err\n";
			file_put_contents(self::$logfile, $log, FILE_APPEND);
		}
	}

	static function shutdown(){
		if($last = error_get_last() and 85 & $last['type']){
			self::error($last['type'], $last['message'], $last['file'], $last['line']);
			self::$get || self::end();
			self::$daemon = true;
		}
		if(self::$get){
			msg_send(self::$parent, 4, getmypid(), false); # 通知父进程结束
			self::$parent = self::$get; # 同时也防止上一行通知失败
			ob_end_clean();
		}else{
			self::end();
		}
		msg_remove_queue(self::$parent);
	}

	static function exception($e){
		self::error($e->getCode(), $e->getMessage(), $e->getFile(), $e->getLine());
		exit($e->getCode());
	}
}

bat::main(debug_backtrace());