mysql的udf编程之非阻塞超时重传
mysql的udf(user defined function)类似于一种api, 用户根据一定的规范用c/c++(或采用c调用规范的语言)编写一组函数(udf),然后编译成动态链接库,通过drop function语句来加载和卸载udf。udf被加载后可以像调用mysql的内置函数一样来调用它,并且服务器在启动时会自动加载原来存在的udf。
#ifdef standard/* standard is defined, don't use any mysql functions */
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#ifdef __win__
typedef unsigned __int64 ulonglong; /* microsofts 64 bit types */
typedef __int64 longlong;
#else
typedef unsigned long long ulonglong;
typedef long long longlong;
#endif /*__win__*/
#else
#include <my_global.h>
#include <my_sys.h>
#if defined(mysql_server)
#include <m_string.h> /* to get strmov() */
#else
/* when compiled as standalone */
#include <string.h>
#endif
#endif
#include <mysql.h>
#include <m_ctype.h>
#include <m_string.h>
#include <stdlib.h>
#include <errno.h>
#include <netdb.h>
#include <unistd.h>
#include<fcntl.h>
#include<sys time.h="">
#include<sys ioctl.h="">
#include <sys types.h="">
#include <netinet in.h="">
#include <sys socket.h="">
#include <sys wait.h="">
#include<arpa inet.h="">
#include<unistd.h>
#include <mysql.h>
#include <ctype.h>
#ifdef have_dlopen
my_bool http_post_init(udf_init *initid, udf_args *args, char *message);
void http_post_deinit(udf_init *initid);
longlong http_post(udf_init *initid, udf_args *args, char *is_null,char *error);
/*************************************************************************
** example of init function
** arguments:
** initid points to a structure that the init function should fill.
** char *ptr; a pointer that the function can use.
** message error message
**return this function should return 1 if something goes wrong. in this case
**************************************************************************/
my_bool http_post_init(udf_init *initid, udf_args *args, char *message)
{
if (args->arg_count < 3 )
{
strcpy(message,"wrong arguments to http_post; ");
return 1;
}
if(args->arg_count == 4 && args->args[3]!=null)
{
int flexiblelength = strlen(args->args[3]);
if(flexiblelength > 160000)
{
int alloclength = 200 + flexiblelength;
if (!(initid->ptr=(char*) malloc(alloclength) ) )
{
strcpy(message,"couldn't allocate memory in http_post_init");
return 1;
}
return 0;
}
else
{
initid->ptr=null;
}
}
return 0;
}
/****************************************************************************
** deinit function. this should all resources allocated by
** this function.
** arguments:
** initid return value from xxxx_init
****************************************************************************/
void http_post_deinit(udf_init *initid)
{
if (initid!=null && initid->ptr!=null)
{
free(initid->ptr);
initid->ptr = null;
}
}
/***************************************************************************
** udf string function.
** arguments:
** initid structure filled by xxx_init
** args the same structure as to xxx_init. this structure
** this function should return a pointer to the result string.
** normally this is 'result' but may also be an alloced string.
***************************************************************************/
longlong http_post( udf_init *initid, udf_args *args,
char *is_null __attribute__((unused)),
char *error __attribute__((unused)))
{
int sockfd=0;
int numbytes=0;
int flags=0;
int cycletimes=0;
char* sendbuffer=null;
fd_set wset;
struct timeval tval;
tval.tv_sec = 0;
tval.tv_usec = 300000;
if(initid->ptr == null)
{
char sendarray[160000] = "\0";
sendbuffer=sendarray;
}
else
{
sendbuffer = initid->ptr;
}
struct sockaddr_in serv_addr;
serv_addr.sin_family = af_inet;
serv_addr.sin_port = htons(atoi(args->args[1]));
serv_addr.sin_addr.s_addr = inet_addr(args->args[0]);
bzero(&(serv_addr.sin_zero),8);
if(args->arg_count == 4 && (args->args[3]!=null) )
{
int argsnum = strlen(args->args[3]);
sprintf(sendbuffer,"post /?%s http/1.1\r\ncontent-length:%d\r\n\r\n%s",args->args[2],argsnum,args->args[3]);
}
else
{
sprintf(sendbuffer,"post /?%s http/1.1\r\n",args->args[2]);
}
if((sockfd = socket(af_inet,sock_stream,0)) == -1)
{
close(sockfd);
return 2;
}
flags = fcntl(sockfd,f_getfl,0);
fcntl(sockfd,f_setfl,flags|o_nonblock);//设置为非阻塞
do
{
connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr));
fd_zero(&wset);
fd_set(sockfd,&wset);
if( select(sockfd+1, null, &wset, null,&tval) <= 0 && cycletimes==5)
{
close(sockfd);
return 5;
}
numbytes = send(sockfd,sendbuffer,strlen(sendbuffer),0);
if(numbytes<0)
{
usleep(20000);
}
cycletimes++;
}while(numbytes<0 && cycletimes!=5);
if(numbytes<0)
{
close(sockfd);
return 4;
}
close(sockfd);
return 0;
}
#endif /* have_dlopen */
</ctype.h></mysql.h></unistd.h></arpa></sys></sys></netinet></sys></sys></sys></fcntl.h></unistd.h></netdb.h></errno.h></stdlib.h></m_string.h></m_ctype.h></mysql.h></string.h></m_string.h></my_sys.h></my_global.h></string.h></stdio.h></stdlib.h>