欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

将spawn-fcgi程序管理与fastcgi程序库libfcgi统一封装成库

程序员文章站 2022-07-14 23:29:51
...


将spawn-fcgi进程管理程序与libfcgi c++库集成封装,使库做到自我管理程序启停和fcgi功能,输出以json格式,支持post与get方式。


如输入:http://192.168.1.123/PaymentRecords.cgi/?BillingCycle=201708&DestinatiosAttr=2
使用进程或线程函数读会得到信息:strjson="{BillingCycle:201708, DestinatiosAttr:2}"
readFcgi(string &strjson, unsigned int wait_seconds = 0);  
bool readThreadFcgi(string &strjson, unsigned int wait_seconds = 0);
上述读还可以支持超时,即超时时间未收到请求后不在阻塞。处理业务完后,可以拼成json格式在用相应writeXXX函数写回去即可。
除上述,也可使用runXXX函数,传入业务函数指针,函数会循环处理请求,但此类函数不支持超时。



SpawnFcgi.h:

/**
* @file  SpawnFcgi.h
* @brief SpawnFcgi lib
* @author   wangcc3
* @date     2017-8-17
* @version  001
* @copyright Copyright (c) 2017,AsiaInfo(FzOcs)
*/

#ifndef _SPAWNFCGI_H_
#define _SPAWNFCGI_H_

#include <stdlib.h>
#include <string.h>
#include <string>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <map>
#include "fcgi_stdio.h"
#include "fcgi_config.h"
#include "fcgiapp.h"
#include <sys/syscall.h>

namespace BM35{
using std::string;
using std::map;
using std::make_pair;
#define FCGI_MAX_THREAD_NUMS 5
#define gettid() syscall(__NR_gettid)
typedef void (*business_fn)(string &,string &);

class SpawnFcgi {

public:

    SpawnFcgi();
    ~SpawnFcgi();
    bool initFcgi(unsigned short port = 9000, bool flag = true);
    bool readFcgi(string &strjson, unsigned int wait_seconds = 0);
    bool writeFcgi(string &strjson);
    bool runFcgi(business_fn func);
    bool setThreadInfo();
    bool readThreadFcgi(string &strjson, unsigned int wait_seconds = 0);
    bool writeThreadFcgi(string &strjson);
    bool runThreadFcgi(business_fn func);
    string getErrorInfo();

private:

    int _sockFD;
    static const int _stdinFD;
    string _errStr;
    FCGX_Request _request[FCGI_MAX_THREAD_NUMS];
    FCGX_Request _the_request;
    map<pid_t, int> _requestMap;
    int _requestCursor;

    bool char2StrJson(char * inputdata,string &strjson);
    bool fcgiTimeOut(unsigned int wait_seconds = 0);
};

} // BM35

#endif // _SPAWNFCGI_H_


SpawnFcgi.cpp:

/**
* @file  SpawnFcgi.cpp
* @brief SpawnFcgi lib
* @author   wangcc3
* @date     2017-8-17
* @version  001
* @copyright Copyright (c) 2017,AsiaInfo(FzOcs)
*/

#include "SpawnFcgi.h"

namespace BM35{
const int SpawnFcgi::_stdinFD = 0;

SpawnFcgi::SpawnFcgi() {
    _sockFD = -1;
    _errStr = "no error";
    _requestCursor = 0;
}

SpawnFcgi::~SpawnFcgi() {

}

bool SpawnFcgi::initFcgi(unsigned short port, bool flag) {
    int optval = 1;
    int max_fd = 0;
    int i = 0;
    struct sockaddr_in addr;
    addr.sin_family=AF_INET;
    addr.sin_port=htons(port);
    addr.sin_addr.s_addr=htonl(INADDR_ANY);

    if (-1 == (_sockFD = socket(AF_INET, SOCK_STREAM, 0))) {
        char cTmp[256] = { 0 };
        sprintf(cTmp,"create socket failed! errno=%s",strerror(errno));
        _errStr = cTmp;
        return false;
    }

    if (-1 == setsockopt(_sockFD, SOL_SOCKET,
                         SO_REUSEADDR, &optval,
                         sizeof(optval))) {
        char cTmp[256] = { 0 };
        sprintf(cTmp,"set reuseaddr failed! errno=%s",strerror(errno));
        _errStr = cTmp;
        return false;
    }

    if(-1 == bind(_sockFD, (struct sockaddr*) &addr,sizeof(addr))) {
        char cTmp[256] = { 0 };
        sprintf(cTmp,"bind socket failed! errno=%s",strerror(errno));
        _errStr = cTmp;
        return false;
    }

    if (-1 == listen(_sockFD, 1024)) {
        char cTmp[256] = { 0 };
        sprintf(cTmp,"set listen failed! errno=%s",strerror(errno));
        _errStr = cTmp;
        return false;
    }

    if(_sockFD != _stdinFD) {
        close(_stdinFD);
        dup2(_sockFD, _stdinFD);
        close(_sockFD);
    }

    max_fd = open("/dev/null", O_RDWR);
    if (-1 != max_fd) {
        if (max_fd != STDOUT_FILENO)
            dup2(max_fd, STDOUT_FILENO);
        if (max_fd != STDERR_FILENO)
            dup2(max_fd, STDERR_FILENO);
        if (max_fd != STDOUT_FILENO && max_fd != STDERR_FILENO)
            close(max_fd);
    } else {
        _errStr = "couldn't open and redirect stdout/stderr to '/dev/null'";
        return false;
    }

    if(flag) {
        for (i = 3; i < max_fd; i++) {
            if (i != _stdinFD)
                close(i);
        }
    }

    FCGX_Init();
    FCGX_InitRequest(&_the_request, 0, 0);

    return true;
}

bool SpawnFcgi::writeFcgi(string &strjson) {
    FCGX_FPrintF(_the_request.out, "Content-type: application/json\r\n\r\n");
    FCGX_FPrintF(_the_request.out, "%s",strjson.c_str());
    FCGX_Finish_r(&_the_request);
    return true;
}

bool SpawnFcgi::readFcgi(string &strjson, unsigned int wait_seconds) {
    if(!fcgiTimeOut(wait_seconds)) {
        _errStr = "wait time out";
        return false;
    }

    FCGX_Finish_r(&_the_request);
    int rc;
    char *queryString;
    char contentInfo[256] = { 0 };
    char *contentLength;
    int len;

    if((rc = FCGX_Accept_r(&_the_request)) >= 0) {
        char *method = FCGX_GetParam("REQUEST_METHOD", _the_request.envp);
        if(method && strcmp(method,"POST") == 0) {
            contentLength = FCGX_GetParam("CONTENT_LENGTH", _the_request.envp);
            if (contentLength != NULL) {
                len = strtol(contentLength, &contentLength, 10);
                if (*contentLength) {
                    _errStr ="can't parser CONTENT_LENGTH";
                    return false;
                }
                if (len > 0) {
                    int i, ch;
                    for (i = 0; i < len; i++) {
                        if ((ch = FCGX_GetChar(_the_request.in)) < 0) {
                            _errStr ="Error: Not enough bytes received on standard input";
                            return false;
                        }
                        contentInfo[i] = ch;
                    }
                    contentInfo[i] = '\0';
                    char2StrJson(contentInfo, strjson);
                } else {
                    strjson="{}";
                }
            } else {
                strjson="{}";
            }
        } else if(method && strcmp(method,"GET") == 0){
            queryString = FCGX_GetParam("QUERY_STRING", _the_request.envp);
            if(queryString)
                char2StrJson(queryString, strjson);
            else
                strjson="{}";
        } else {
            _errStr ="requets method is null, or not get/post";
            return false;
        }
        return true;
    } else {
        char cTmp[256] = { 0 };
        sprintf(cTmp,"accept failed! errno=%d", rc);
        _errStr = cTmp;
    }
    return false;
}

bool SpawnFcgi::runFcgi(business_fn func) {
    char *method = NULL;
    char *contentLength = NULL;
    char contentInfo[256] = { 0 };
    char * queryString;
    int len = 0;
    string inJson;
    string outJson;

    while (FCGI_Accept() >= 0) {
        printf("Content-type: application/json\r\n\r\n" );
        method= getenv("REQUEST_METHOD");
        if(method && strcmp(method,"POST") == 0) {
            contentLength = getenv("CONTENT_LENGTH");
            if (contentLength != NULL) {
                len = strtol(contentLength, &contentLength, 10);
                if (*contentLength) {
                    _errStr ="can't parser CONTENT_LENGTH";
                    return false;
                }
                if (len > 0) {
                    int i, ch;
                    for (i = 0; i < len; i++) {
                        if ((ch = getchar()) < 0) {
                            _errStr ="Error: Not enough bytes received on standard input";
                            return false;
                        }
                        contentInfo[i] = ch;
                    }
                    contentInfo[i] = '\0';
                    char2StrJson(contentInfo, inJson);
                } else {
                    inJson="{}";
                }
            } else {
                inJson="{}";
            }
        } else if(method && strcmp(method,"GET") == 0) {
            queryString = getenv("QUERY_STRING");
            if(queryString)
                char2StrJson(queryString, inJson);
            else
                inJson="{}";
        } else {
            _errStr ="requets method is null, or not get/post";
            return false;
        }
        func(inJson, outJson);
        printf("%s", outJson.c_str());
    }
    return true;
}

bool SpawnFcgi::setThreadInfo() {
    static pthread_mutex_t set_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_lock(&set_mutex);
    if(_requestCursor >= FCGI_MAX_THREAD_NUMS) {
        _errStr ="have achieve max thread nums";
        pthread_mutex_unlock(&set_mutex);
        return false;
    }
    _requestMap.insert(make_pair(gettid(), _requestCursor));
    FCGX_InitRequest(&_request[_requestCursor], 0, 0);
    _requestCursor ++;
    pthread_mutex_unlock(&set_mutex);
    return true;
}

bool SpawnFcgi::readThreadFcgi(string &strjson, unsigned int wait_seconds) {
    int rc;
    char *method = NULL;
    char *contentLength = NULL;
    char contentInfo[256] = { 0 };
    char *queryString = NULL;
    int len = 0;
    string inJson;
    int cursor =_requestMap[gettid()];
    FCGX_Finish_r(&_request[cursor]);

    static pthread_mutex_t accept_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_lock(&accept_mutex);
    if(!fcgiTimeOut(wait_seconds)) {
        _errStr = "wait time out";
        pthread_mutex_unlock(&accept_mutex);
        return false;
    }
    rc = FCGX_Accept_r(&_request[cursor]);
    pthread_mutex_unlock(&accept_mutex);

    if (rc < 0) {
        char cTmp[256] = { 0 };
        sprintf(cTmp,"accept failed! errno=%d", rc);
        _errStr = cTmp;
        return false;
    }

    method = FCGX_GetParam("REQUEST_METHOD", _request[cursor].envp);
    if(method && strcmp(method,"POST") == 0) {
        contentLength = FCGX_GetParam("CONTENT_LENGTH", _request[cursor].envp);
        if (contentLength != NULL) {
            len = strtol(contentLength, &contentLength, 10);
            if (*contentLength) {
                _errStr ="can't parser CONTENT_LENGTH";
                return false;
            }
            if (len > 0) {
                int i, ch;
                for (i = 0; i < len; i++) {
                    if ((ch = FCGX_GetChar(_request[cursor].in)) < 0) {
                        _errStr ="Error: Not enough bytes received on standard input";
                        return false;
                    }
                    contentInfo[i] = ch;
                }
                contentInfo[i] = '\0';
                char2StrJson(contentInfo, inJson);
            } else {
                inJson="{}";
            }
        } else {
            inJson="{}";
        }
    } else if(method && strcmp(method,"GET") == 0){
        queryString = FCGX_GetParam("QUERY_STRING", _request[cursor].envp);
        if(queryString)
            char2StrJson(queryString, inJson);
        else
            inJson="{}";
    } else {
        _errStr ="requets method is null, or not get/post";
        return false;
    }
    strjson = inJson;
    return true;
}

bool SpawnFcgi::writeThreadFcgi(string &strjson) {
    int cursor =_requestMap[gettid()];
    FCGX_FPrintF(_request[cursor].out,
                 "Content-type: application/json\r\n\r\n%s",
                 strjson.c_str());
    FCGX_Finish_r(&_request[cursor]);

    return true;
}

bool SpawnFcgi::runThreadFcgi(business_fn func) {
    int rc;
    char *method = NULL;
    char *contentLength = NULL;
    char contentInfo[256] = { 0 };
    char *queryString = NULL;
    int len = 0;
    string inJson;
    string outJson;
    FCGX_Request request;
    FCGX_InitRequest(&request, 0, 0);
    for (;;)
    {
        static pthread_mutex_t accept_mutex = PTHREAD_MUTEX_INITIALIZER;

        pthread_mutex_lock(&accept_mutex);
        rc = FCGX_Accept_r(&request);
        pthread_mutex_unlock(&accept_mutex);

        if (rc < 0)
            return false;

        method = FCGX_GetParam("REQUEST_METHOD", request.envp);
        if(method && strcmp(method,"POST") == 0) {
            contentLength = FCGX_GetParam("CONTENT_LENGTH", request.envp);
            if (contentLength != NULL) {
                len = strtol(contentLength, &contentLength, 10);
                if (*contentLength) {
                    _errStr ="can't parser CONTENT_LENGTH";
                    return false;
                }
                if (len > 0) {
                    int i, ch;
                    for (i = 0; i < len; i++) {
                        if ((ch = FCGX_GetChar(request.in)) < 0) {
                            _errStr ="Error: Not enough bytes received on standard input";
                            return false;
                        }
                        contentInfo[i] = ch;
                    }
                    contentInfo[i] = '\0';
                    char2StrJson(contentInfo, inJson);
                } else {
                    inJson="{}";
                }
            } else {
                inJson="{}";
            }
        } else if(method && strcmp(method,"GET") == 0){
            queryString = FCGX_GetParam("QUERY_STRING", request.envp);
            if(queryString)
                char2StrJson(queryString, inJson);
            else
                inJson="{}";
        } else {
            _errStr ="requets method is null, or not get/post";
            return false;
        }
        func(inJson, outJson);
        FCGX_FPrintF(request.out,
                     "Content-type: application/json\r\n\r\n%s",
                     outJson.c_str());
        FCGX_Finish_r(&request);
    }
    return true;
}

bool SpawnFcgi::char2StrJson(char * inputdata,string &strjson)
{
    string tmpStr;
    string key;
    string value;
    int len1 = 0;
    int len2 = 0;
    string str = inputdata;
    strjson = "{";

    for(;len1 != string::npos;) {
        len1 = str.find('&', 0);
        tmpStr=str.substr(0,len1);
        if(len1 != string::npos && len1 + 1 !=  string::npos)
            str = str.substr(len1 + 1);
        len2=tmpStr.find('=',0);
        if(len2 == string::npos)
            continue;
        key=tmpStr.substr(0,len2);
        if(len2 + 1 == string::npos)
            continue;
        value=tmpStr.substr(len2+1);
        strjson = strjson +"\"" + key + "\":\"" + value +"\",";
    }

    if(strjson.length() > 1)
        strjson = strjson.substr(0, strjson.length()-1);
    strjson= strjson + "}";

    return true;
}

bool SpawnFcgi::fcgiTimeOut(unsigned int wait_seconds) {
    if ( wait_seconds > 0 ) {
        int ret = 0;
        fd_set accept_fdset;
        struct timeval timeout;
        FD_ZERO ( &accept_fdset );
        FD_SET ( _stdinFD, &accept_fdset );
        timeout.tv_sec = wait_seconds;
        timeout.tv_usec = 0;
        do {
            ret = select( _stdinFD + 1, &accept_fdset, NULL, NULL, &timeout );
        } while ( ret < 0 && errno == EINTR );
        if ( ret == -1 )
            return false;
        else if ( ret == 0 ) {
            return false;
        }
    }
    return true;
}

string SpawnFcgi::getErrorInfo() {
    return _errStr;
}

}



例子程序TestExample.cpp

#include "SpawnFcgi.h"
#include <unistd.h>
#include <iostream>
/**
*nginx新增配置:
* location ~ \.cgi$ {
*            fastcgi_pass 127.0.0.1:9000;
*            fastcgi_index index.cgi;
*            fastcgi_param SCRIPT_FILENAME fcgi$fastcgi_script_name;
*            include fastcgi_params;
*        }
*
*测试工具:
*    Fiddler
*
*使用:
*    启动nginx反向代理或tomcat等支持cgi服务器,运行编译完testFcgi程序,
*Fiddler工具模拟post、get请求
*
*/

using namespace std;
using namespace BM35;

void myfunc(string & inJson,string & outJson) {
    char tmp[64] = { 0 };
    pid_t pid= getpid();
    sprintf(tmp, "{\"pid\":\"%d\",", pid);
    outJson =tmp + inJson + "}";
}

void myfuncT(string & inJson,string & outJson) {
    char tmp[64] = { 0 };
    pthread_t tid= pthread_self();
    sprintf(tmp, "{\"tid\":\"%d\",", tid);
    outJson =tmp + inJson + "}";
}

void * routine(void * arg) {
    SpawnFcgi * tmp =(SpawnFcgi *)arg;
    tmp->runThreadFcgi(myfuncT);
    return NULL;
}

void * routine1(void * arg) {

    SpawnFcgi * tmp =(SpawnFcgi *)arg;
    tmp->setThreadInfo();
    string info;
    while(tmp->readThreadFcgi(info)) {
        tmp->writeThreadFcgi(info);
    }
    return NULL;
}

int main() {
    SpawnFcgi * fcgi = new SpawnFcgi();

#if defined(OneProcess) //单进程
    if(!fcgi->initFcgi(9000))
        printf("init failed");

    string info;

    while(fcgi->readFcgi(info)) {

        fcgi->writeFcgi(info);
    }
#elif defined (multProcess1)  // 单进程或多进程
    pid_t pid;

    if(!fcgi->initFcgi(9000))
        printf("init failed");

    pid = fork();
    if(pid == 0) {
        string info;
        while(fcgi->readFcgi(info)) {
            fcgi->writeFcgi(info);
        }
    } else if(pid > 0){
        string info;
        while(fcgi->readFcgi(info)) {
            fcgi->writeFcgi(info);
        }
    } else {
        printf("fork error");
    }
#elif defined (multProcess2)  // 单进程或多进程
    pid_t pid;

    if(!fcgi->initFcgi(9000))
        printf("init failed");

    pid = fork();
    if(pid == 0) {
        fcgi->runFcgi(myfunc);
    } else if(pid > 0){
        fcgi->runFcgi(myfunc);
    } else {
        printf("fork error");
    }
#elif defined (multProcessThread1)  // 多线程
    pid_t pid;

    if(!fcgi->initFcgi(9000))
        printf("init failed");

    pid = fork();
    if(pid == 0) {
        // 子进程线程
        pthread_t threads[2];
        for(int i = 0; i <2;i++) {
            pthread_create(&threads[i], NULL, routine, fcgi);
        }
        for(int i = 0; i < 2; i++) {
            //若用pthread_detach(pthread_self())了,下面不用等了
            pthread_join(threads[i], NULL); //防止僵尸线程,等待
        }

    } else if(pid > 0){
        // 父进程线程
        pthread_t threads[3];
        for(int i = 0; i <3;i++) {
            pthread_create(&threads[i], NULL, routine, fcgi);
        }
        for(int i = 0; i < 3; i++) {
            //若用pthread_detach(pthread_self())了,下面不用等了
            pthread_join(threads[i], NULL); //防止僵尸线程,等待
        }

    } else {
        printf("fork error");
    }
#elif defined (multProcessThread2)  // 多线程
    pid_t pid;

    if(!fcgi->initFcgi(9000))
        printf("init failed");

    pid = fork();
    if(pid == 0) {
        // 子进程线程
        pthread_t threads[2];
        for(int i = 0; i <2;i++) {
            pthread_create(&threads[i], NULL, routine1, fcgi);
        }
        for(int i = 0; i < 2; i++) {
            //若用pthread_detach(pthread_self())了,下面不用等了
            pthread_join(threads[i], NULL); //防止僵尸线程,等待
        }

    } else if(pid > 0){
        // 父进程线程
        pthread_t threads[3];
        for(int i = 0; i <3;i++) {
            pthread_create(&threads[i], NULL, routine1, fcgi);
        }
        for(int i = 0; i < 3; i++) {
            //若用pthread_detach(pthread_self())了,下面不用等了
            pthread_join(threads[i], NULL); //防止僵尸线程,等待
        }

    } else {
        printf("fork error");
    }
#elif defined (timeOutTest)  // 超时测试
    if(!fcgi->initFcgi(9000))
        printf("init failed");
    string info;
    /*线程的
    fcgi->setThreadInfo();
    while(1) {
        if(fcgi->readThreadFcgi(info, 10)) {
             printf("go--\n");
            fcgi->writeThreadFcgi(info);
        }
        std::cout<<"---"<<std::endl;
        printf("timeout\n");

    }
    */
    //进程的
    while(1) {
        if(fcgi->readFcgi(info, 10)) {
             printf("go--\n");
            fcgi->writeFcgi(info);
        }
        std::cout<<"---"<<std::endl;
        printf("timeout\n");

    }

#endif
    delete fcgi;
    return 0;
}


makefile:

TARGETS=testFcgi
CC=g++
CFLAGS=-g -o
OBJECTS=SpawnFcgi.o TestExample.o
DEFINE=-DtimeOutTest
LD_USR_LIBS=-L/usr/local/lib/ -lfcgi -lpthread
INCLUDE=
all: $(TARGETS)

testFcgi: $(OBJECTS)
	$(CC) $(CFLAGS) [email protected] $(OBJECTS) $(LD_USR_LIBS) $(INCLUDE) $(DEFINE)

.cpp.o:
	$(CC) $(CFLAGS) $*.o $(DEFINE) $(INCLUDE) -c $*.cpp

.PHONY: clean

clean:
	rm -rf $(OBJECTS) $(TESTDB) $(TARGETS)