欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

memcached探索之thread model(2)

程序员文章站 2022-06-17 16:48:24
...

6. 下面我们看看主线程dispatch_thread的事件处理设置。在memcached.c中的main函数中 /* create unix mode sockets after dropping privileges */ if ( settings . socketpath ! = NULL ) { errno = 0 ; if ( server_socket_unix ( settings . socketpath ,

6. 下面我们看看主线程dispatch_thread的事件处理设置。在memcached.c中的main函数中

/* create unix mode sockets after dropping privileges */
if (settings.socketpath != NULL) {
errno = 0;
if (server_socket_unix(settings.socketpath,settings.access)) {
vperror("failed to listen on UNIX socket: %s", settings.socketpath);
exit(EX_OSERR);
}
}

/* create the listening socket, bind it, and init */
if (settings.socketpath == NULL) {
int udp_port;

const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
char temp_portnumber_filename[PATH_MAX];
FILE *portnumber_file = NULL;

if (portnumber_filename != NULL) {
snprintf(temp_portnumber_filename,
sizeof(temp_portnumber_filename),
"%s.lck", portnumber_filename);

portnumber_file = fopen(temp_portnumber_filename, "a");
if (portnumber_file == NULL) {
fprintf(stderr, "Failed to open \"%s\": %s\n",
temp_portnumber_filename, strerror(errno));
}
}

errno = 0;
if (settings.port && server_socket(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}

/*
* initialization order: first create the listening sockets
* (may need root on low ports), then drop root if needed,
* then daemonise if needed, then init libevent (in some cases
* descriptors created by libevent wouldn't survive forking).
*/

udp_port = settings.udpport ? settings.udpport : settings.port;

/* create the UDP listening socket and bind it */
errno = 0;
if (settings.udpport && server_socket(settings.udpport, udp_transport,
portnumber_file)) {
vperror("failed to listen on UDP port %d", settings.udpport);
exit(EX_OSERR);
}

if (portnumber_file) {
fclose(portnumber_file);
rename(temp_portnumber_filename, portnumber_filename);
}
}
/* Drop privileges no longer needed */
drop_privileges();
/* enter the event loop */
event_base_loop(main_base, 0); //主线程(dispatcher_thread)的事件监听循环。。。



7. 继续跟踪server_socket函数(memcached.c中)

/**
* Create a socket and bind it to a specific port number
* @param port the port number to bind to
* @param transport the transport protocol (TCP / UDP)
* @param portnumber_file A filepointer to write the port numbers to
* when they are successfully added to the list of ports we
* listen on.
*/

static int server_socket(int port, enum network_transport transport,
FILE *portnumber_file) {
int sfd;
struct linger ling = {0, 0};
struct addrinfo *ai;
struct addrinfo *next;
struct addrinfo hints = { .ai_flags = AI_PASSIVE,
.ai_family = AF_UNSPEC };
char port_buf[NI_MAXSERV];
int error;
int success = 0;
int flags =1;

hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;

if (port == -1) {
port = 0;
}
snprintf(port_buf, sizeof(port_buf), "%d", port);
error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
if (error != 0) {
if (error != EAI_SYSTEM)
fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
else
perror("getaddrinfo()");
return 1;
}

for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) { //creat socket
/* getaddrinfo can return "junk" addresses,
* we make sure at least one works before erroring.
*/

continue;
}

#ifdef IPV6_V6ONLY
if (next->ai_family == AF_INET6) {
error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
if (error != 0) {
perror("setsockopt");
close(sfd);
continue;
}
}
#endif
//setsockopt
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
if (IS_UDP(transport)) {
maximize_sndbuf(sfd);
} else {
error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");

error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
if (error != 0)
perror("setsockopt");

error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");
}
//bind
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
if (errno != EADDRINUSE) {
perror("bind()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
close(sfd);
continue;
} else {
success++;

//TCP listen
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
perror("listen()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
if (portnumber_file != NULL &&
(next->ai_addr->sa_family == AF_INET ||
next->ai_addr->sa_family == AF_INET6)) {
union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} my_sockaddr;
socklen_t len = sizeof(my_sockaddr);
if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
if (next->ai_addr->sa_family == AF_INET) {
fprintf(portnumber_file, "%s INET: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in.sin_port));
} else {
fprintf(portnumber_file, "%s INET6: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in6.sin6_port));
}
}
}
}

if (IS_UDP(transport))
{

//UDP 的处理中不需要accept,所以直接派发connection到工作线程。
int c;

for (c = 0; c settings.num_threads; c++) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
} else
{

//TCP的处理(注意,这里dispatcher_thread同样调用了conn_new来绑定conn_event到其main_base. 并且此时conn的初始状态为conn_listening, 事件为持久可读, 而在conn_new中注册了conn_event的回调函数为event_handler,所以,dispatche_thread在当前listen的socket可读时就会调用event_handler,进而调用driver_machine(c) 进入状态机。而在driver_machine中如果是主线程(dispatcher_thread)则会在accept socket后调用dispatch_new_conn函数来给各worker_thread派发connection...)
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
}

freeaddrinfo(ai);

/* Return zero iff we detected no errors in starting up connections */
return success == 0;
}


8. 看看UDP和TCP模式下dispatcher_thread都会调用的dispatch_new_conn函数(在thread.c中)

/*
* Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because
* of an incoming connection.
*/

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
int tid = (last_thread + 1) % settings.num_threads; //轮询的方式找worker_thread

LIBEVENT_THREAD *thread = threads + tid;

last_thread = tid;

item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//push conn到worker_thread的CQ中
cq_push(thread->new_conn_queue, item);

MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
if (write(thread->notify_send_fd, "", 1) != 1) {
perror("Writing to thread notify pipe");
}
}


9. 先看看unix domain socket模式下主线程的事件处理设置。(在上面的6中调用的 server_socket_unix函数

static int server_socket_unix(const char *path, int access_mask) {
int sfd;
struct linger ling = {0, 0};
struct sockaddr_un addr;
struct stat tstat;
int flags =1;
int old_umask;

if (!path) {
return 1;
}

if ((sfd = new_socket_unix()) == -1) {
return 1;
}

/*
* Clean up a previous socket file if we left it around
*/

if (lstat(path, &tstat) == 0) {
if (S_ISSOCK(tstat.st_mode))
unlink(path);
}

setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));

/*
* the memset call clears nonstandard fields in some impementations
* that otherwise mess things up.
*/

memset(&addr, 0, sizeof(addr));

addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
assert(strcmp(addr.sun_path, path) == 0);
old_umask = umask( ~(access_mask&0777));
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind()");
close(sfd);
umask(old_umask);
return 1;
}
umask(old_umask);
if (listen(sfd, settings.backlog) == -1) {
perror("listen()");
close(sfd);
return 1;
}
if (!(listen_conn = conn_new(sfd, conn_listening, //同样是调用conn_new
EV_READ | EV_PERSIST, 1,
local_transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}

return 0;
}