将spawn-fcgi程序管理与fastcgi程序库libfcgi统一封装成库
程序员文章站
2022-07-14 23:29:51
...
将spawn-fcgi进程管理程序与libfcgi c++库集成封装,使库做到自我管理程序启停和fcgi功能,输出以json格式,支持post与get方式。
如输入:http://192.168.1.123/PaymentRecords.cgi/?BillingCycle=201708&DestinatiosAttr=2
使用进程或线程函数读会得到信息:strjson="{BillingCycle:201708, DestinatiosAttr:2}"
readFcgi(string &strjson, unsigned int wait_seconds = 0);
bool readThreadFcgi(string &strjson, unsigned int wait_seconds = 0);
上述读还可以支持超时,即超时时间未收到请求后不在阻塞。处理业务完后,可以拼成json格式在用相应writeXXX函数写回去即可。
除上述,也可使用runXXX函数,传入业务函数指针,函数会循环处理请求,但此类函数不支持超时。
SpawnFcgi.h:
/**
* @file SpawnFcgi.h
* @brief SpawnFcgi lib
* @author wangcc3
* @date 2017-8-17
* @version 001
* @copyright Copyright (c) 2017,AsiaInfo(FzOcs)
*/
#ifndef _SPAWNFCGI_H_
#define _SPAWNFCGI_H_
#include <stdlib.h>
#include <string.h>
#include <string>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <map>
#include "fcgi_stdio.h"
#include "fcgi_config.h"
#include "fcgiapp.h"
#include <sys/syscall.h>
namespace BM35{
using std::string;
using std::map;
using std::make_pair;
#define FCGI_MAX_THREAD_NUMS 5
#define gettid() syscall(__NR_gettid)
typedef void (*business_fn)(string &,string &);
class SpawnFcgi {
public:
SpawnFcgi();
~SpawnFcgi();
bool initFcgi(unsigned short port = 9000, bool flag = true);
bool readFcgi(string &strjson, unsigned int wait_seconds = 0);
bool writeFcgi(string &strjson);
bool runFcgi(business_fn func);
bool setThreadInfo();
bool readThreadFcgi(string &strjson, unsigned int wait_seconds = 0);
bool writeThreadFcgi(string &strjson);
bool runThreadFcgi(business_fn func);
string getErrorInfo();
private:
int _sockFD;
static const int _stdinFD;
string _errStr;
FCGX_Request _request[FCGI_MAX_THREAD_NUMS];
FCGX_Request _the_request;
map<pid_t, int> _requestMap;
int _requestCursor;
bool char2StrJson(char * inputdata,string &strjson);
bool fcgiTimeOut(unsigned int wait_seconds = 0);
};
} // BM35
#endif // _SPAWNFCGI_H_
SpawnFcgi.cpp:
/**
* @file SpawnFcgi.cpp
* @brief SpawnFcgi lib
* @author wangcc3
* @date 2017-8-17
* @version 001
* @copyright Copyright (c) 2017,AsiaInfo(FzOcs)
*/
#include "SpawnFcgi.h"
namespace BM35{
const int SpawnFcgi::_stdinFD = 0;
SpawnFcgi::SpawnFcgi() {
_sockFD = -1;
_errStr = "no error";
_requestCursor = 0;
}
SpawnFcgi::~SpawnFcgi() {
}
bool SpawnFcgi::initFcgi(unsigned short port, bool flag) {
int optval = 1;
int max_fd = 0;
int i = 0;
struct sockaddr_in addr;
addr.sin_family=AF_INET;
addr.sin_port=htons(port);
addr.sin_addr.s_addr=htonl(INADDR_ANY);
if (-1 == (_sockFD = socket(AF_INET, SOCK_STREAM, 0))) {
char cTmp[256] = { 0 };
sprintf(cTmp,"create socket failed! errno=%s",strerror(errno));
_errStr = cTmp;
return false;
}
if (-1 == setsockopt(_sockFD, SOL_SOCKET,
SO_REUSEADDR, &optval,
sizeof(optval))) {
char cTmp[256] = { 0 };
sprintf(cTmp,"set reuseaddr failed! errno=%s",strerror(errno));
_errStr = cTmp;
return false;
}
if(-1 == bind(_sockFD, (struct sockaddr*) &addr,sizeof(addr))) {
char cTmp[256] = { 0 };
sprintf(cTmp,"bind socket failed! errno=%s",strerror(errno));
_errStr = cTmp;
return false;
}
if (-1 == listen(_sockFD, 1024)) {
char cTmp[256] = { 0 };
sprintf(cTmp,"set listen failed! errno=%s",strerror(errno));
_errStr = cTmp;
return false;
}
if(_sockFD != _stdinFD) {
close(_stdinFD);
dup2(_sockFD, _stdinFD);
close(_sockFD);
}
max_fd = open("/dev/null", O_RDWR);
if (-1 != max_fd) {
if (max_fd != STDOUT_FILENO)
dup2(max_fd, STDOUT_FILENO);
if (max_fd != STDERR_FILENO)
dup2(max_fd, STDERR_FILENO);
if (max_fd != STDOUT_FILENO && max_fd != STDERR_FILENO)
close(max_fd);
} else {
_errStr = "couldn't open and redirect stdout/stderr to '/dev/null'";
return false;
}
if(flag) {
for (i = 3; i < max_fd; i++) {
if (i != _stdinFD)
close(i);
}
}
FCGX_Init();
FCGX_InitRequest(&_the_request, 0, 0);
return true;
}
bool SpawnFcgi::writeFcgi(string &strjson) {
FCGX_FPrintF(_the_request.out, "Content-type: application/json\r\n\r\n");
FCGX_FPrintF(_the_request.out, "%s",strjson.c_str());
FCGX_Finish_r(&_the_request);
return true;
}
bool SpawnFcgi::readFcgi(string &strjson, unsigned int wait_seconds) {
if(!fcgiTimeOut(wait_seconds)) {
_errStr = "wait time out";
return false;
}
FCGX_Finish_r(&_the_request);
int rc;
char *queryString;
char contentInfo[256] = { 0 };
char *contentLength;
int len;
if((rc = FCGX_Accept_r(&_the_request)) >= 0) {
char *method = FCGX_GetParam("REQUEST_METHOD", _the_request.envp);
if(method && strcmp(method,"POST") == 0) {
contentLength = FCGX_GetParam("CONTENT_LENGTH", _the_request.envp);
if (contentLength != NULL) {
len = strtol(contentLength, &contentLength, 10);
if (*contentLength) {
_errStr ="can't parser CONTENT_LENGTH";
return false;
}
if (len > 0) {
int i, ch;
for (i = 0; i < len; i++) {
if ((ch = FCGX_GetChar(_the_request.in)) < 0) {
_errStr ="Error: Not enough bytes received on standard input";
return false;
}
contentInfo[i] = ch;
}
contentInfo[i] = '\0';
char2StrJson(contentInfo, strjson);
} else {
strjson="{}";
}
} else {
strjson="{}";
}
} else if(method && strcmp(method,"GET") == 0){
queryString = FCGX_GetParam("QUERY_STRING", _the_request.envp);
if(queryString)
char2StrJson(queryString, strjson);
else
strjson="{}";
} else {
_errStr ="requets method is null, or not get/post";
return false;
}
return true;
} else {
char cTmp[256] = { 0 };
sprintf(cTmp,"accept failed! errno=%d", rc);
_errStr = cTmp;
}
return false;
}
bool SpawnFcgi::runFcgi(business_fn func) {
char *method = NULL;
char *contentLength = NULL;
char contentInfo[256] = { 0 };
char * queryString;
int len = 0;
string inJson;
string outJson;
while (FCGI_Accept() >= 0) {
printf("Content-type: application/json\r\n\r\n" );
method= getenv("REQUEST_METHOD");
if(method && strcmp(method,"POST") == 0) {
contentLength = getenv("CONTENT_LENGTH");
if (contentLength != NULL) {
len = strtol(contentLength, &contentLength, 10);
if (*contentLength) {
_errStr ="can't parser CONTENT_LENGTH";
return false;
}
if (len > 0) {
int i, ch;
for (i = 0; i < len; i++) {
if ((ch = getchar()) < 0) {
_errStr ="Error: Not enough bytes received on standard input";
return false;
}
contentInfo[i] = ch;
}
contentInfo[i] = '\0';
char2StrJson(contentInfo, inJson);
} else {
inJson="{}";
}
} else {
inJson="{}";
}
} else if(method && strcmp(method,"GET") == 0) {
queryString = getenv("QUERY_STRING");
if(queryString)
char2StrJson(queryString, inJson);
else
inJson="{}";
} else {
_errStr ="requets method is null, or not get/post";
return false;
}
func(inJson, outJson);
printf("%s", outJson.c_str());
}
return true;
}
bool SpawnFcgi::setThreadInfo() {
static pthread_mutex_t set_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&set_mutex);
if(_requestCursor >= FCGI_MAX_THREAD_NUMS) {
_errStr ="have achieve max thread nums";
pthread_mutex_unlock(&set_mutex);
return false;
}
_requestMap.insert(make_pair(gettid(), _requestCursor));
FCGX_InitRequest(&_request[_requestCursor], 0, 0);
_requestCursor ++;
pthread_mutex_unlock(&set_mutex);
return true;
}
bool SpawnFcgi::readThreadFcgi(string &strjson, unsigned int wait_seconds) {
int rc;
char *method = NULL;
char *contentLength = NULL;
char contentInfo[256] = { 0 };
char *queryString = NULL;
int len = 0;
string inJson;
int cursor =_requestMap[gettid()];
FCGX_Finish_r(&_request[cursor]);
static pthread_mutex_t accept_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&accept_mutex);
if(!fcgiTimeOut(wait_seconds)) {
_errStr = "wait time out";
pthread_mutex_unlock(&accept_mutex);
return false;
}
rc = FCGX_Accept_r(&_request[cursor]);
pthread_mutex_unlock(&accept_mutex);
if (rc < 0) {
char cTmp[256] = { 0 };
sprintf(cTmp,"accept failed! errno=%d", rc);
_errStr = cTmp;
return false;
}
method = FCGX_GetParam("REQUEST_METHOD", _request[cursor].envp);
if(method && strcmp(method,"POST") == 0) {
contentLength = FCGX_GetParam("CONTENT_LENGTH", _request[cursor].envp);
if (contentLength != NULL) {
len = strtol(contentLength, &contentLength, 10);
if (*contentLength) {
_errStr ="can't parser CONTENT_LENGTH";
return false;
}
if (len > 0) {
int i, ch;
for (i = 0; i < len; i++) {
if ((ch = FCGX_GetChar(_request[cursor].in)) < 0) {
_errStr ="Error: Not enough bytes received on standard input";
return false;
}
contentInfo[i] = ch;
}
contentInfo[i] = '\0';
char2StrJson(contentInfo, inJson);
} else {
inJson="{}";
}
} else {
inJson="{}";
}
} else if(method && strcmp(method,"GET") == 0){
queryString = FCGX_GetParam("QUERY_STRING", _request[cursor].envp);
if(queryString)
char2StrJson(queryString, inJson);
else
inJson="{}";
} else {
_errStr ="requets method is null, or not get/post";
return false;
}
strjson = inJson;
return true;
}
bool SpawnFcgi::writeThreadFcgi(string &strjson) {
int cursor =_requestMap[gettid()];
FCGX_FPrintF(_request[cursor].out,
"Content-type: application/json\r\n\r\n%s",
strjson.c_str());
FCGX_Finish_r(&_request[cursor]);
return true;
}
bool SpawnFcgi::runThreadFcgi(business_fn func) {
int rc;
char *method = NULL;
char *contentLength = NULL;
char contentInfo[256] = { 0 };
char *queryString = NULL;
int len = 0;
string inJson;
string outJson;
FCGX_Request request;
FCGX_InitRequest(&request, 0, 0);
for (;;)
{
static pthread_mutex_t accept_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&accept_mutex);
rc = FCGX_Accept_r(&request);
pthread_mutex_unlock(&accept_mutex);
if (rc < 0)
return false;
method = FCGX_GetParam("REQUEST_METHOD", request.envp);
if(method && strcmp(method,"POST") == 0) {
contentLength = FCGX_GetParam("CONTENT_LENGTH", request.envp);
if (contentLength != NULL) {
len = strtol(contentLength, &contentLength, 10);
if (*contentLength) {
_errStr ="can't parser CONTENT_LENGTH";
return false;
}
if (len > 0) {
int i, ch;
for (i = 0; i < len; i++) {
if ((ch = FCGX_GetChar(request.in)) < 0) {
_errStr ="Error: Not enough bytes received on standard input";
return false;
}
contentInfo[i] = ch;
}
contentInfo[i] = '\0';
char2StrJson(contentInfo, inJson);
} else {
inJson="{}";
}
} else {
inJson="{}";
}
} else if(method && strcmp(method,"GET") == 0){
queryString = FCGX_GetParam("QUERY_STRING", request.envp);
if(queryString)
char2StrJson(queryString, inJson);
else
inJson="{}";
} else {
_errStr ="requets method is null, or not get/post";
return false;
}
func(inJson, outJson);
FCGX_FPrintF(request.out,
"Content-type: application/json\r\n\r\n%s",
outJson.c_str());
FCGX_Finish_r(&request);
}
return true;
}
bool SpawnFcgi::char2StrJson(char * inputdata,string &strjson)
{
string tmpStr;
string key;
string value;
int len1 = 0;
int len2 = 0;
string str = inputdata;
strjson = "{";
for(;len1 != string::npos;) {
len1 = str.find('&', 0);
tmpStr=str.substr(0,len1);
if(len1 != string::npos && len1 + 1 != string::npos)
str = str.substr(len1 + 1);
len2=tmpStr.find('=',0);
if(len2 == string::npos)
continue;
key=tmpStr.substr(0,len2);
if(len2 + 1 == string::npos)
continue;
value=tmpStr.substr(len2+1);
strjson = strjson +"\"" + key + "\":\"" + value +"\",";
}
if(strjson.length() > 1)
strjson = strjson.substr(0, strjson.length()-1);
strjson= strjson + "}";
return true;
}
bool SpawnFcgi::fcgiTimeOut(unsigned int wait_seconds) {
if ( wait_seconds > 0 ) {
int ret = 0;
fd_set accept_fdset;
struct timeval timeout;
FD_ZERO ( &accept_fdset );
FD_SET ( _stdinFD, &accept_fdset );
timeout.tv_sec = wait_seconds;
timeout.tv_usec = 0;
do {
ret = select( _stdinFD + 1, &accept_fdset, NULL, NULL, &timeout );
} while ( ret < 0 && errno == EINTR );
if ( ret == -1 )
return false;
else if ( ret == 0 ) {
return false;
}
}
return true;
}
string SpawnFcgi::getErrorInfo() {
return _errStr;
}
}
#include "SpawnFcgi.h"
#include <unistd.h>
#include <iostream>
/**
*nginx新增配置:
* location ~ \.cgi$ {
* fastcgi_pass 127.0.0.1:9000;
* fastcgi_index index.cgi;
* fastcgi_param SCRIPT_FILENAME fcgi$fastcgi_script_name;
* include fastcgi_params;
* }
*
*测试工具:
* Fiddler
*
*使用:
* 启动nginx反向代理或tomcat等支持cgi服务器,运行编译完testFcgi程序,
*Fiddler工具模拟post、get请求
*
*/
using namespace std;
using namespace BM35;
void myfunc(string & inJson,string & outJson) {
char tmp[64] = { 0 };
pid_t pid= getpid();
sprintf(tmp, "{\"pid\":\"%d\",", pid);
outJson =tmp + inJson + "}";
}
void myfuncT(string & inJson,string & outJson) {
char tmp[64] = { 0 };
pthread_t tid= pthread_self();
sprintf(tmp, "{\"tid\":\"%d\",", tid);
outJson =tmp + inJson + "}";
}
void * routine(void * arg) {
SpawnFcgi * tmp =(SpawnFcgi *)arg;
tmp->runThreadFcgi(myfuncT);
return NULL;
}
void * routine1(void * arg) {
SpawnFcgi * tmp =(SpawnFcgi *)arg;
tmp->setThreadInfo();
string info;
while(tmp->readThreadFcgi(info)) {
tmp->writeThreadFcgi(info);
}
return NULL;
}
int main() {
SpawnFcgi * fcgi = new SpawnFcgi();
#if defined(OneProcess) //单进程
if(!fcgi->initFcgi(9000))
printf("init failed");
string info;
while(fcgi->readFcgi(info)) {
fcgi->writeFcgi(info);
}
#elif defined (multProcess1) // 单进程或多进程
pid_t pid;
if(!fcgi->initFcgi(9000))
printf("init failed");
pid = fork();
if(pid == 0) {
string info;
while(fcgi->readFcgi(info)) {
fcgi->writeFcgi(info);
}
} else if(pid > 0){
string info;
while(fcgi->readFcgi(info)) {
fcgi->writeFcgi(info);
}
} else {
printf("fork error");
}
#elif defined (multProcess2) // 单进程或多进程
pid_t pid;
if(!fcgi->initFcgi(9000))
printf("init failed");
pid = fork();
if(pid == 0) {
fcgi->runFcgi(myfunc);
} else if(pid > 0){
fcgi->runFcgi(myfunc);
} else {
printf("fork error");
}
#elif defined (multProcessThread1) // 多线程
pid_t pid;
if(!fcgi->initFcgi(9000))
printf("init failed");
pid = fork();
if(pid == 0) {
// 子进程线程
pthread_t threads[2];
for(int i = 0; i <2;i++) {
pthread_create(&threads[i], NULL, routine, fcgi);
}
for(int i = 0; i < 2; i++) {
//若用pthread_detach(pthread_self())了,下面不用等了
pthread_join(threads[i], NULL); //防止僵尸线程,等待
}
} else if(pid > 0){
// 父进程线程
pthread_t threads[3];
for(int i = 0; i <3;i++) {
pthread_create(&threads[i], NULL, routine, fcgi);
}
for(int i = 0; i < 3; i++) {
//若用pthread_detach(pthread_self())了,下面不用等了
pthread_join(threads[i], NULL); //防止僵尸线程,等待
}
} else {
printf("fork error");
}
#elif defined (multProcessThread2) // 多线程
pid_t pid;
if(!fcgi->initFcgi(9000))
printf("init failed");
pid = fork();
if(pid == 0) {
// 子进程线程
pthread_t threads[2];
for(int i = 0; i <2;i++) {
pthread_create(&threads[i], NULL, routine1, fcgi);
}
for(int i = 0; i < 2; i++) {
//若用pthread_detach(pthread_self())了,下面不用等了
pthread_join(threads[i], NULL); //防止僵尸线程,等待
}
} else if(pid > 0){
// 父进程线程
pthread_t threads[3];
for(int i = 0; i <3;i++) {
pthread_create(&threads[i], NULL, routine1, fcgi);
}
for(int i = 0; i < 3; i++) {
//若用pthread_detach(pthread_self())了,下面不用等了
pthread_join(threads[i], NULL); //防止僵尸线程,等待
}
} else {
printf("fork error");
}
#elif defined (timeOutTest) // 超时测试
if(!fcgi->initFcgi(9000))
printf("init failed");
string info;
/*线程的
fcgi->setThreadInfo();
while(1) {
if(fcgi->readThreadFcgi(info, 10)) {
printf("go--\n");
fcgi->writeThreadFcgi(info);
}
std::cout<<"---"<<std::endl;
printf("timeout\n");
}
*/
//进程的
while(1) {
if(fcgi->readFcgi(info, 10)) {
printf("go--\n");
fcgi->writeFcgi(info);
}
std::cout<<"---"<<std::endl;
printf("timeout\n");
}
#endif
delete fcgi;
return 0;
}
makefile:
TARGETS=testFcgi
CC=g++
CFLAGS=-g -o
OBJECTS=SpawnFcgi.o TestExample.o
DEFINE=-DtimeOutTest
LD_USR_LIBS=-L/usr/local/lib/ -lfcgi -lpthread
INCLUDE=
all: $(TARGETS)
testFcgi: $(OBJECTS)
$(CC) $(CFLAGS) [email protected] $(OBJECTS) $(LD_USR_LIBS) $(INCLUDE) $(DEFINE)
.cpp.o:
$(CC) $(CFLAGS) $*.o $(DEFINE) $(INCLUDE) -c $*.cpp
.PHONY: clean
clean:
rm -rf $(OBJECTS) $(TESTDB) $(TARGETS)
上一篇: Install FastCGI Development Kit
下一篇: python 装饰器