续上篇IPC-Pipes
上篇的example3中,我用named pipe(FIFO)来实现client和server的通讯。
因为client每次发完一个包后,都会sleep(2),因此几乎没有资源冲突和同步的问题。
可是,若是将sleep(2)去掉,会出现什么结果呢?
咱们加入如下这个函数,在每次接受到包以后,比较接收到的包和原始发送的包。
static int is_pkt_ok(struct cs_packet_t *pkt_recv, struct cs_packet_t *pkt_sent)
{
if (pkt_recv->cli_pid != pkt_sent->cli_pid)
return 0;
if ( strcmp(pkt_recv->action, pkt_sent->action) )
return 0;
char *p = pkt_recv->data;
char *q = pkt_sent->data;
while (*p != '\0')
{
if ( toupper(*p) != toupper(*q) )
return 0;
p++;
q++;
}
return 1;
}
结果会发现,若是没有sleep(2),即,client发送包的速度很快,server处的处理就不对了。
缘由是:
1. 若是procA打开一个fifo来read, procB打开同一个fifo来写,而且直接关掉,此时,procA将读到0个字节。也就是说,writer端关闭proc,reader端会读到0个字节(其实,这是个比较方便的特性,由于reader就有办法知道远方的writer是否关闭了pipe)。如下将有个示例程序证实这一点。
2. 在server端,对于每一个packet新建一个线程来处理,这就有可能出现如下状况:一个线程处理完包,write回结果,但尚未关闭pipe时,另一个线程启动,这个线程处理的是同一个client的包,因而它将与上一个线程共用一个FIFO来发送结果。因而就出现了冲突。client正等着接受第二个线程处理的结果,此时第一个线程关闭pipe的writer端的这个操做,也会使得client中的read再也不block而返回0字节。
其实,当多个线程访问同一个资源而互相又不知道时,你基本上已经有一个bug了。
此处,FIFO是共有资源,两个线程同时访问,但又不加控制,结果不言而喻。
解决方案1:
client若是读到0字节,说明这是一个无效的read,从新read。
结果:出现奇怪的现象。要么server和client卡住,要么client一直在读pipe,可是每次读的结果都是0个字节。此时,server程序的VSZ高的出奇,以下:
22728 0.1 0.3 3124528 3684 pts/1 S+ 22:49 0:00 ./server
这个方案应该是解决了以上问题,出现奇怪现象的缘由应该是虚拟内存使用太高了。虚拟内存使用太高的缘由是,不断的建立线程,在线程结束后,主线程没有pthread_join它,致使该线程相关的资源不能被操做系统回收,一旦这样的线程多了以后,虚拟内存就使用太高,致使程序没法继续运行了。
解决方案2:
server中添加pthread_join,等待该处理线程结束,回首资源。这样确定没有问题,可是,全部的处理就串行化了。可见这个server设计的有烂!
不过这个比解决方案1好点,由于它至少解决了问题。app
解决方案3:函数
利用thread attibutes,将处理packet的线程设置为detached thread。这样,当线程结束时,系统会自动回收线程的资源。这个方案比2要好一点。 oop
pthread_attr_t attr;this
pthread_t thread;操作系统
pthread_attr_init(&attr);.net
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);线程
pthread_create(&thread, &attr, &thread_function, NULL);设计
pthread_attr_destroy(&attr);
解决方案4:
对每一个packet进行线程的建立和销毁,简直有点蠢!
server能够将packet放在一个FIFO中,而处理packet的线程不断从FIFO中取出packet进行处理。
对FIFO的操做应该由某个函数进行操做,这个函数应该是re-entrant的,因此要注意资源的同步。
处理packet的线程数量,若是是一个,那么在client较少的状况下应该没有问题。由于一个线程来处理几个client的请求仍是说的过去的;若是是多个,那么就要注意了,由于几个处理线程拥有共同的资源,即,命名管道,须要进行仔细的处理,防止出现资源竞争。
如下代码实现server中的packet queue。client代码无需修改。
另外,目前代码中有如下几个地方须要改进。
1. client增长uer command interface,由此能够写脚本或者c程序来控制client的数量和行为。
2. server中的packet要增长同步机制,避免竞争状态出现。
3. server中增长线程池机制。
4. server中增长对client的管理线程,主要用于管理server->client的pipe,避免处理packet的线程反复开启和关闭同一个client的FIFO。
5. packet queue的长度要能够根据目前负载动态改变。
(下一步先实现client中的user command interface)
/**
* server_with_queue.c
*
* client/server using named pipes (FIFO)
* server implementation with a packet queue
**/
#include "common.h"
#include <pthread.h>
#define DEBUG_SERVER 1
#if DEBUG_SERVER
static void print_pkt(struct cs_packet_t *pkt)
{
printf("==========packet=================\n");
printf("client pid = %d \n", pkt->cli_pid);
printf("client action = %s \n", pkt->action);
printf("client data = %s \n", pkt->data);
printf("==========end pkt================\n");
}
#else
static void print_pkt(struct cs_packet_t *pkt)
{
return;
}
#endif
/**packet queue implementation; all operations on packet queue should be synchronized to avoid race condition*/
#define PKT_QUEUE_SIZE 32
/**
* struct: pkt_queue_t -- FIFO queue of (struct cs_packet_t*)
* @size: size of the queue, we use power of 2 size because it make modulus operation easier and faster
* @head : head of this circular queue
* @tail: tail of this circular queue
* @pkt_queue: stores PKT_QUEUE_SIZE elements of type (struct cs_packet_t*)
*
**/
struct pkt_queue_t
{
int size;
int head;
int tail;
struct cs_packet_t **queue;
};
static struct pkt_queue_t pkt_queue; /* global variable for this server */
/**
* function: init_pkt_queue
*
* the return value must be checked
**/
static int init_pkt_queue(struct pkt_queue_t *pkt_queue)
{
pkt_queue->size = PKT_QUEUE_SIZE;
pkt_queue->head = 0;
pkt_queue->tail = 0;
pkt_queue->queue = (struct cs_packet_t **)malloc( pkt_queue->size*sizeof(struct cs_packet_t*) );
if (pkt_queue == NULL)
{
perror("allocating packet queue failed");
return -1;
}
for (int i=0; i<pkt_queue->size; i++)
{
pkt_queue->queue[i] = (struct cs_packet_t*)malloc( sizeof(struct cs_packet_t) );
if (pkt_queue->queue[i] == NULL)
{
perror("allocating packet queue failed");
return -1;
}
}
return 0;
}
static int is_empty(struct pkt_queue_t *pkt_queue)
{
return (pkt_queue->head == pkt_queue->tail);
}
static int is_full(struct pkt_queue_t *pkt_queue)
{
int tmp = (pkt_queue->tail+1) & (pkt_queue->size - 1);
return (pkt_queue->head == tmp);
}
static void clone_packet(struct cs_packet_t *pkt_dst, const struct cs_packet_t *pkt_src)
{
pkt_dst->cli_pid = pkt_src->cli_pid;
strcpy(pkt_dst->action, pkt_src->action);
strcpy(pkt_dst->data, pkt_src->data);
}
/**
* function: in_pkt_queue -- add an element into this FIFO
* @return: 0 for success, -1 for fail
*
* we don't simply put the pointer into the queue, but copies the value of every field
* the return value must be checked
**/
static int in_pkt_queue(struct pkt_queue_t *pkt_queue, const struct cs_packet_t *pkt_to_add)
{
if (is_full(pkt_queue))
{
return -1;
}
clone_packet(pkt_queue->queue[pkt_queue->tail], pkt_to_add);
pkt_queue->tail = (pkt_queue->tail+1) & (pkt_queue->size - 1);
return 0;
}
/**
* function: out_pkt_queue --- remove an element out of the FIFO
* @pkt_dst: the target packet to copy the info to
* @return: 0 for success, -1 for fail(this happens when the FIFO is empty)
*
* the return value must be checked
**/
static int out_pkt_queue(struct pkt_queue_t *pkt_queue, struct cs_packet_t *pkt_dst)
{
if (is_empty(pkt_queue))
{
return -1;
}
clone_packet(pkt_dst, pkt_queue->queue[pkt_queue->head]);
pkt_queue->head = (pkt_queue->head+1)&(pkt_queue->size-1);
return 0;
}
static int destroy_pkt_queue(struct pkt_queue_t *pkt_queue)
{
for (int i=0; i<pkt_queue->size; i++)
{
free(pkt_queue->queue[i]);
}
free(pkt_queue->queue);
return 0;
}
/**
* function: thread_handle_requests
*
* thread which handles client requests, sends back the result to client through cli_pipe_%d pipe and then exits
* client1 --(pkt1)--> | | --> pkt1 (thread1)
* client2 --(pkt2)--> | --> packet queue --> | --> pkt2 (thread2)
* client3 --(pkt3)--> | | --> pkt3 (thread3)
*
* N threads may loop to take the requests out of the queue and then handles it.
* This avoids the overhead of thead creation and destruction.
**/
void *thread_handle_requests(void *arg) /* arg is NULL */
{
struct cs_packet_t *pkt = (struct cs_packet_t*)malloc(sizeof(struct cs_packet_t));
if (pkt == NULL)
{
perror("Not Enough Memory In Heap!");
exit(-1);
}
/* main loop in this thread, never exits voluntarily */
for (;;)
{
/* loop to get a packet from packet queue */
for (;;)
{
if ( out_pkt_queue(&pkt_queue, pkt) < 0 ) /* the queue is empty */
{
sleep(1);
}
else
{
break;
}
}
print_pkt(pkt);
char client_fifo[32]; /* name of this client's fifo, cli_fifo_%d */
int client_fifo_fd;
memset(client_fifo, 0, sizeof(client_fifo));
sprintf(client_fifo, CLIENT_FIFO_NAME, pkt->cli_pid);
client_fifo_fd = open(client_fifo, O_WRONLY);
if (client_fifo_fd == -1)
{
perror("open client pipe failed");
exit(-1);
}
if (!strcmp(pkt->action, "upcase"))
{
char *tmp_char_ptr = pkt->data;
while (*tmp_char_ptr)
{
*tmp_char_ptr = toupper(*tmp_char_ptr);
tmp_char_ptr++;
}
int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
if (ret < 0)
perror("write failed \n");
else
printf("<<server>> ---- write %d bytes \n", ret);
}
else if (!strcmp(pkt->action, "downcase"))
{
char *tmp_char_ptr = pkt->data;
while (*tmp_char_ptr)
{
*tmp_char_ptr = tolower(*tmp_char_ptr);
tmp_char_ptr++;
}
int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
if (ret < 0)
perror("write failed \n");
else
printf("<<server>> ---- write %d bytes \n", ret);
}
else
{
sprintf(pkt->data, "Action %s not supported", pkt->action);
int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
if (ret < 0)
perror("write failed \n");
else
printf("<<server>> ---- write %d bytes \n", ret);
}
close(client_fifo_fd);
}
free(pkt);
return NULL;
}
void main()
{
struct cs_packet_t pkt;
int server_fifo_fd;
pthread_t th;
void *result;
/* make server fifo */
mkfifo(SERVER_FIFO_NAME, 0777);
server_fifo_fd = open(SERVER_FIFO_NAME, O_RDONLY);
if (server_fifo_fd == -1)
{
perror("create server fifo failed");
exit(-1);
}
/* init packet queue */
int ret = init_pkt_queue(&pkt_queue);
if (ret < 0)
{
perror("allocating pkt_queue failed");
exit(-1);
}
/* create a thread to handle requests */
ret = pthread_create(&th, NULL, thread_handle_requests, (void*)&pkt);
if (ret < 0)
{
perror("create thread failed");
exit(-1);
}
/* read from server fifo */
for (;;)
{
int read_res = read(server_fifo_fd, &pkt, sizeof(pkt));
if (read_res > 0)
{
/* add the packet into the packet queue [pkt_queue] */
for (;;)
{
ret = in_pkt_queue(&pkt_queue, &pkt);
if (ret < 0) /* the queue is full */
{
sleep(1);
}
else
{
break;
}
}
} /* if (read_res > 0) */
} /* loop to listen to cs fifo */
destroy_pkt_queue(&pkt_queue);
}
server