欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

php多线程实现多进程 跨平台的一例代码

程序员文章站 2022-04-15 15:05:53
...
  1. define('DIR_PHP_EXEC', 'php');

  2. define('DIR_MAIN_EXEC', __FILE__);
  3. define('DIR_TMP', '/tmp');
  4. require_once('my_process.php');
  5. class pp extends my_process_base {

  6. public function run($param = null) {
  7. for ($i = 0; $i echo "111 $param\n";
  8. sleep(1);
  9. }
  10. }
  11. }
  12. init_my_process();

  13. $obj = $GLOBALS['gal_obj_process_m'];
  14. if ($obj->is_main()) {
  15. $obj->run_task('pp', 'a');
  16. $obj->run_task('pp', 'b');
  17. $obj->run_task('pp', 'c');
  18. $obj->run_task('pp', 'd');
  19. //$obj->run_task('pp', 'b');
  20. $obj->set_max_run(10);
  21. $obj->run();
  22. }?>
复制代码

2,进程管理类

  1. /**

  2. * @copyright 2007 movivi
  3. * @author 徐智 *
  4. * $Id: getPage.php 11 2007-09-21 02:15:01Z fred $
  5. * modify by bbs.it-home.org
  6. */
  7. if (!defined('DIR_PHP_EXEC')) define('DIR_PHP_EXEC', 'php');
  8. //if (!defined('DIR_MAIN_EXEC')) define('DIR_MAIN_EXEC', '');
  9. if (!defined('DIR_TMP')) define('DIR_TMP', '');
  10. /********/
  11. /* 初始化 */
  12. define('CMD_MAIN_PROCESS_KEY', 'main_process_key');
  13. define('CMD_CHILD_PROCESS_NAME', 'child_process_name');
  14. define('CMD_CHILD_PROCESS_PARAM', 'child_process_param');
  15. function init_my_process() {

  16. $GLOBALS['gal_obj_cmd'] = new my_cmd_argv();
  17. $key = $GLOBALS['gal_obj_cmd']->get_value(CMD_MAIN_PROCESS_KEY);
  18. $key = $key === false ? '' : $key;
  19. $GLOBALS['gal_obj_process_m'] = new my_process_m($key);
  20. if (!$GLOBALS['gal_obj_process_m']->is_main()) $GLOBALS['gal_obj_process_m']->run() ;
  21. }
  22. /**

  23. * php多进程类
  24. *
  25. * 你需要从这个对象继承,然后实现你自己的run处理
  26. */
  27. abstract class my_process_base {
  28. public function __construct($auto_run=true, $name='') {
  29. }
  30. public function __destruct() {

  31. echo "@end\n";
  32. }
  33. abstract public function run($param = null);

  34. }
  35. class my_cmd_argv {
  36. private $cmd_argv = array();
  37. public function __construct() {
  38. $argv = $_SERVER['argv'];
  39. for ($i = 1; $i $cmd = explode('=', $argv[$i]);
  40. $this->cmd_argv[$cmd[0]] = isset($cmd[1]) ? $cmd[1] : '';
  41. }
  42. }
  43. public function get_key($key) {

  44. return isset($this->cmd_argv[$key]);
  45. }
  46. public function get_value($key) {

  47. return isset($this->cmd_argv[$key]) ? $this->cmd_argv[$key] : false;
  48. }
  49. }
  50. /**

  51. * php多进程管理类
  52. * 可以在PHP中实现多进程处理,只限在控制台方式使用
  53. * 当前的信号实现机制采用文件方式
  54. *
  55. */
  56. class my_process_m {
  57. /**
  58. * @var array $task_list
  59. * 进程列表
  60. */
  61. private $task_list = array();
  62. private $lock_list = array();
  63. private $lock = null;
  64. private $is_main = false;
  65. private $max_run = 3600000;
  66. private function release_lock($key = null) {

  67. $lock = &$this->lock_list;
  68. if (!is_null($key)) {
  69. $key = md5($this->build_lock_id($key));
  70. if (isset($lock[$key])) {
  71. if (is_resource($lock[$key][0])) fclose($lock[$key][0]);
  72. unlink($lock[$key][1]);
  73. unset($lock[$key]);
  74. }
  75. return true;
  76. }
  77. foreach ($lock as $k => $h) {

  78. if (is_resource($h)) fclose($h);
  79. unset($lock[$k]);
  80. }
  81. return true;
  82. }
  83. private function release_task($key = null) {

  84. $task = &$this->task_list;
  85. if (!is_null($key) && isset($task[$key])) {
  86. if (is_resource($task[$key])) pclose($task[$key]);
  87. unset($task[$key]);
  88. } else {
  89. foreach ($task as $k => $h) {
  90. if (is_resource($h)) pclose($h);
  91. unset($task[$k]);
  92. }
  93. }
  94. return true;
  95. }
  96. private function build_lock_id($key) {
  97. return DIR_TMP . DIRECTORY_SEPARATOR . $key . '_sem.lock';
  98. }
  99. protected function run_child_process() {

  100. $class = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_NAME);
  101. $param = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_PARAM);
  102. $param = $param == '' ? null : unserialize(base64_decode(trim($param)));
  103. $obj = new $class();
  104. $obj->run($param);
  105. $this->task_list[] = $obj;
  106. }
  107. public function __construct($lock='') {

  108. if ($lock === '') {
  109. $this->is_main = true;
  110. $key = md5(uniqid()) . '_main.my_process';
  111. $lock = array($key, $this->get($key));
  112. } else {
  113. $this->is_main = false;
  114. $lock = array($lock, 0);
  115. }
  116. $this->lock = $lock;
  117. }
  118. public function __destruct() {

  119. $this->release_lock();
  120. $this->release_task();
  121. }
  122. /**

  123. * 停止所有进程
  124. *
  125. */
  126. public function stop_all() {
  127. }
  128. /**

  129. * 是否是主进程
  130. *
  131. */
  132. public function is_main() {
  133. return $this->is_main;
  134. }
  135. /**

  136. * 是不是已经存在一个活动信号
  137. *
  138. * @param string $key
  139. * @return bool
  140. */
  141. public function exist($key) {
  142. return file_exists($this->build_lock_id($key));
  143. }
  144. /**

  145. * 获取一个信号
  146. *
  147. * @param string $key
  148. * @param int $max_acquire 最大请求阻塞数量
  149. * @return mix 如果成功返回一个信号ID
  150. *
  151. */
  152. public function get($key, $max_acquire=5) {
  153. $fn = $this->build_lock_id($key);
  154. if (isset($this->lock_list[md5($fn)])) return false;
  155. $id = fopen($fn, 'a+');
  156. if ($id) $this->lock_list[md5($fn)] = array($id, $fn);
  157. return $id;
  158. }
  159. /**

  160. * 释放一个信号
  161. *
  162. * @param string $key
  163. * @return bool 如果成功返回一个信号true
  164. *
  165. */
  166. public function remove($key) {
  167. return $this->release_lock($key);
  168. }
  169. /**

  170. * 获取一个信号
  171. *
  172. * @param string $id 信号ID
  173. * @param bool $block 是否阻塞
  174. */
  175. public function acquire($id, $block=false) {
  176. if ($block) {
  177. return flock($id, LOCK_EX);
  178. } else {
  179. return flock($id, LOCK_EX + LOCK_NB);
  180. }
  181. }
  182. /**

  183. * 释放一个信号
  184. *
  185. */
  186. public function release($id) {
  187. flock($id, LOCK_UN);
  188. }
  189. public function run_task($process_name, $param=null) {
  190. $this->task_list[] = popen(DIR_PHP_EXEC . ' -f ' . DIR_MAIN_EXEC . ' -- '
  191. . CMD_CHILD_PROCESS_NAME . '=' . $process_name . ' '
  192. . CMD_CHILD_PROCESS_PARAM . '="' . base64_encode(serialize($param)) . '" '
  193. . CMD_MAIN_PROCESS_KEY . '="' . $this->lock[0] . '" ',
  194. 'r');
  195. }
  196. public function run($auto_run = true) {

  197. if ($this->is_main) {
  198. $ps = &$this->task_list;
  199. $max_run = &$this->max_run;
  200. $id = 0;
  201. do {
  202. //echo "process-------------: \n";
  203. $c = 0;
  204. foreach ($ps as $k => $h) {
  205. $c++;
  206. $msg = fread($h, 8000);
  207. if (substr($msg, -5, 4) === '@end') {
  208. echo "end process:[$k][$id] echo \n{$msg} \n";
  209. $this->release_task($k);
  210. } else {
  211. echo "process:[$k][$id] echo \n{$msg} \n";
  212. }
  213. }
  214. sleep(1);
  215. } while ($auto_run && $id++ 0);
  216. } else {
  217. $this->run_child_process();
  218. }
  219. }
  220. public function set_max_run($max=1000) {

  221. $this->max_run = $max;
  222. }
  223. }
  224. ?>
复制代码