[vhffs-dev] [1277] Improved a lot the responsiveness of the master, we are now delaying received events into a "slippery" dynamically allocated buffer if the server load is too important.

[ Thread Index | Date Index | More vhffs.org/vhffs-dev Archives ]


Revision: 1277
Author:   gradator
Date:     2008-10-13 18:15:31 +0200 (Mon, 13 Oct 2008)

Log Message:
-----------
Improved a lot the responsiveness of the master, we are now delaying received events into a "slippery" dynamically allocated buffer if the server load is too important.

Modified Paths:
--------------
    trunk/vhffs-fssync/vhffsfssync_master.c
    trunk/vhffs-fssync/vhffsfssync_slave.c


Modified: trunk/vhffs-fssync/vhffsfssync_master.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_master.c	2008-10-11 21:33:04 UTC (rev 1276)
+++ trunk/vhffs-fssync/vhffsfssync_master.c	2008-10-13 16:15:31 UTC (rev 1277)
@@ -70,6 +70,7 @@
 
 
 /* -- network stuff -- */
+// huge buffer size reduce syscalls
 #define VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK 65536
 #define VHFFSFSSYNC_NET_RECV_BUF_LEN 65536
 
@@ -80,15 +81,20 @@
 	struct sockaddr_in sockaddr;
 	uint32_t order;
 
-	char buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
-	uint32_t buf_len;
-	char *buf_cur;
+	/* TODO: change that to a dynamically allocated buffer */
+	char recv_buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
+	uint32_t recv_buf_len;
+	char *recv_buf_cur;
+	char **delayedevents;
+	uint32_t delayedevents_begin;
+	uint32_t delayedevents_end;
 
 	GList *messages;
+	uint32_t messages_num;
 
 	GList *fullviewtree;
 	GList *fullviewcur;
-	int fullviewtimerset;
+	int8_t fullviewtimerset;
 } vhffsfssync_conn;
 
 
@@ -181,6 +187,7 @@
 int vhffsfssync_net_send(vhffsfssync_conn *conn);
 int vhffsfssync_net_recv_event(vhffsfssync_conn *conn, char *event);
 int vhffsfssync_net_parse(vhffsfssync_conn *conn);
+int vhffsfssync_net_parse_delayed(vhffsfssync_conn *conn);
 int vhffsfssync_net_fullview(vhffsfssync_conn *conn, char *pathname);
 void vhffsfssync_net_fullview_alarmsignal(int signo);
 
@@ -266,6 +273,7 @@
 	msg->data_len = len;
 	msg->data_cur = 0;
 	conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
+	conn->messages_num++;
 
 	return 0;
 }
@@ -273,6 +281,7 @@
 
 void vhffsfssync_net_destroy_data(vhffsfssync_conn *conn, vhffsfssync_net_message_data *datamsg)  {
 	conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)datamsg);
+	conn->messages_num--;
 	free(datamsg->data_buffer);
 	free(datamsg);
 }
@@ -372,6 +381,7 @@
 	msg->file_chunksize = -1;
 	msg->file_chunkcur = 0;
 	conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
+	conn->messages_num++;
 
 	return 0;
 }
@@ -379,6 +389,7 @@
 
 void vhffsfssync_net_destroy_file(vhffsfssync_conn *conn, vhffsfssync_net_message_file *filemsg)  {
 	conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)filemsg);
+	conn->messages_num--;
 
 	/* we need to finish the chunk anyway */
 	if(filemsg->file_chunksize > 0) {
@@ -728,16 +739,16 @@
 #if DEBUG_EVENTS
 	int i;
 	for(i = 0 ; i < argc ; i++) {
-		printf("%s ", args[i]);
+		fprintf(stderr, "%s ", args[i]);
 	}
-	printf("\n");
+	fprintf(stderr, "\n");
 #endif
 
 	if(!strcmp(args[0], "get")) {
 		char *pathname = args[1];
 		struct stat st;
 
-		printf("> %s\n", pathname);
+		//printf("> %s\n", pathname);
 
 		if(! lstat(pathname, &st) )  {
 
@@ -761,7 +772,7 @@
 	}
 	else if(!strcmp(args[0], "fulltree")) {
 		// the client requested a full tree of all available files
-		vhffsfssync_net_fullview(conn, ".");
+		if(!conn->fullviewtree) vhffsfssync_net_fullview(conn, ".");
 /*		GList *lst;
 
 		for( lst = g_list_first(conn->fullviewtree) ; lst ; lst = g_list_next(lst) )  {
@@ -788,10 +799,11 @@
 int vhffsfssync_net_parse(vhffsfssync_conn *conn)  {
 	char *cur, *end;
 
-	cur = conn->buf;
-	end = conn->buf + conn->buf_len; //beware: end can be outside the buffer, you should NOT read *end
+	//fprintf(stderr, "Buffer %d\n", conn->recv_buf_len);
 
-	//printf("Buffer %d\n", conn->buf_len);
+	/* parse the buffer */
+	cur = conn->recv_buf;
+	end = conn->recv_buf + conn->recv_buf_len; //beware: end can be outside the buffer, you should NOT read *end
 
 	while(cur < end)  {
 		char *begin;
@@ -799,34 +811,100 @@
 		for(begin = cur ; ( cur < end && *cur++ != '\0' ) || ( cur < end && *cur++ != '\0' ) ; );
 
 		if( !*(cur-2) && !*(cur-1) ) {
-			vhffsfssync_net_recv_event(conn, begin);
+			if(!conn->delayedevents  &&  conn->messages_num < 20) {
+				vhffsfssync_net_recv_event(conn, begin);
+			}
+			else {
+ 				// system is overloaded, queue this event in delayed events
+				if( !(conn->delayedevents_end & 0x03FF) ) {
+					//printf("==> %d events, %d allocated\n", conn->delayedevents_end, ( (conn->delayedevents_end >>10) +1) <<10);
+					conn->delayedevents = realloc( conn->delayedevents, (((conn->delayedevents_end >>10) +1) <<10) * sizeof(char*) );
+				}
+				conn->delayedevents[conn->delayedevents_end] = malloc(cur - begin);
+				memcpy(conn->delayedevents[conn->delayedevents_end], begin, cur - begin);
+				conn->delayedevents_end++;
+			}
 			begin = cur;
 		}
 
 		if(cur == end)  {
 			register uint32_t len;
 			len = end - begin;
-			//printf("Not parsed %d\n", len);
+			//fprintf(stderr, "Not parsed %d\n", len);
+//			fprintf(stderr, "buffer = %lu, cur = %lu, end = %lu, begin = %lu\n", (unsigned long)conn->recv_buf, (unsigned long)cur, (unsigned long)end, (unsigned long)begin);
 
+/*			int i;
+			char *toto;
+			for(toto = begin, i = 0 ; i < 1000 && toto < end ; toto++, i++) {
+				fprintf(stderr, "%x ", *toto);
+			}
+			fprintf(stderr, "\n"); */
+
 			// buffer is full and we didn't manage to fetch everything
 			if(len == VHFFSFSSYNC_NET_RECV_BUF_LEN)  {
 				fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
-				conn->buf_cur = conn->buf;
-				conn->buf_len = 0;
+				conn->recv_buf_cur = conn->recv_buf;
+				conn->recv_buf_len = 0;
+				exit(1);
 				break;
 			}
 
 			// copy the data that is not parsed to the begin of the buffer
-			memcpy(conn->buf, begin, len);
-			conn->buf_cur = conn->buf + len;
-			conn->buf_len = len;
+			memcpy(conn->recv_buf, begin, len);
+			conn->recv_buf_cur = conn->recv_buf + len;
+			conn->recv_buf_len = len;
 			break;
 		}
 	}
+
+	//printf("==> finished with %d delayed events\n", conn->delayedevents_end - conn->delayedevents_begin);
 	return 0;
 }
 
 
+int vhffsfssync_net_parse_delayed(vhffsfssync_conn *conn)  {
+
+	//fprintf(stderr, "Delayed %d, begin %d, end %d\n", conn->delayedevents_end - conn->delayedevents_begin, conn->delayedevents_begin, conn->delayedevents_end);
+
+	/* try to parse delayed events */
+	while(conn->delayedevents && conn->messages_num < 20) {
+
+		vhffsfssync_net_recv_event(conn, conn->delayedevents[conn->delayedevents_begin]);
+		free(conn->delayedevents[conn->delayedevents_begin]);
+		conn->delayedevents_begin++;
+
+		if(conn->delayedevents_begin == conn->delayedevents_end) {
+			conn->delayedevents_begin = 0;
+			conn->delayedevents_end = 0;
+			free(conn->delayedevents);
+			conn->delayedevents = NULL;
+			break;
+		}
+
+		if(conn->delayedevents_begin == 0x0400) {
+			void *cur, *end;
+
+			conn->delayedevents_begin = 0;
+			conn->delayedevents_end -= 0x0400;
+
+			cur = conn->delayedevents;
+			end = conn->delayedevents + conn->delayedevents_end;
+			//printf("AA> sizeofvoid: %d, events: %d, grmbl= %d, buffer = %lu, cur = %lu, end = %lu\n", sizeof(char*), conn->delayedevents_end - conn->delayedevents_begin, (conn->delayedevents_end - conn->delayedevents_begin) * sizeof(void*), (unsigned long)conn->delayedevents, (unsigned long)cur, (unsigned long)end);
+ 			for( ; cur < end ; cur += 0x0400*sizeof(void*) ) {
+				//printf("Copie from %lu to %lu, %d bytes\n", (unsigned long)(cur+0x0400*sizeof(void*)), (unsigned long)cur, (cur+0x0400*sizeof(void*))-cur);
+				memcpy(cur, cur + 0x0400*sizeof(void*), 0x0400*sizeof(void*) );
+			}
+
+			//printf("XX: begin = %d, end = %d\n", conn->delayedevents_begin, conn->delayedevents_end);
+			//printf("XX> %d events, %d allocated, %d bytes allocated\n", conn->delayedevents_end, ( ( conn->delayedevents_end >>10) +1) <<10 ,  (((conn->delayedevents_end >>10) +1) <<10) * sizeof(char*)  );
+			conn->delayedevents = realloc( conn->delayedevents, (((conn->delayedevents_end >>10) +1) <<10) * sizeof(char*) );
+		}
+	}
+
+	return 0;
+}
+
+
 int vhffsfssync_net_fullview(vhffsfssync_conn *conn, char *pathname)  {
 
 	/*
@@ -936,27 +1014,31 @@
 			int r;
 			char *path = ldirs->data;
 
-			printf("%s\n", path);
+			//printf("%s\n", path);
 
 			// we can cancel the recursion here at anytime
 //			if( rand() < (RAND_MAX/10)*7 ) {
-			if(conn->fullviewtimerset > 0) {
+ 			if(conn->messages_num < 10 && conn->fullviewtimerset > 0) {
 				// do nothing, this is here because it matchs most of the time
 			}
-			else if(conn->fullviewtimerset < 0) {
-				printf("CANCELLED\n");
-				conn->fullviewcur = NULL;
-				conn->fullviewtimerset = 0;
-				return 1;
-			}
 			else {
-				struct sigaction act;
-				act.sa_handler = vhffsfssync_net_fullview_alarmsignal;
-				sigemptyset(&act.sa_mask);
-				act.sa_flags = SA_RESETHAND;
-				sigaction(SIGALRM, &act, NULL);
-				alarm(1);
-				conn->fullviewtimerset = 1;
+				if(conn->messages_num >= 10 || conn->fullviewtimerset < 0) {
+					//printf("%ld: CANCELLED, nums=%d, timer=%d\n", time(NULL), conn->messages_num, conn->fullviewtimerset);
+					signal(SIGALRM, SIG_IGN);
+					conn->fullviewcur = NULL;
+					conn->fullviewtimerset = 0;
+					return 1;
+				}
+
+				if(conn->fullviewtimerset == 0) {
+					struct sigaction act;
+					act.sa_handler = vhffsfssync_net_fullview_alarmsignal;
+					sigemptyset(&act.sa_mask);
+					act.sa_flags = SA_RESETHAND;
+					sigaction(SIGALRM, &act, NULL);
+					conn->fullviewtimerset = 1;
+					alarm(1);
+				}
 			}
 
 			//printf("A: %p\n", conn->fullviewcur);
@@ -984,9 +1066,9 @@
 		conn->fullviewtree = g_list_delete_link(conn->fullviewtree, g_list_last(conn->fullviewtree) );
 	}
 
-	if(!conn->fullviewtree)  {
-		printf("CLEAN UP SUCCESSFULL\n");
-	}
+/*	if(!conn->fullviewtree)  {
+		printf("%ld: CLEAN UP SUCCESSFULL\n", time(NULL));
+	} */
 	return 0;
 }
 
@@ -1433,7 +1515,6 @@
 		return -1;
 	}
 
-
 	/* -- network stuff -- */
 	vhffsfssync_conns = NULL;
 
@@ -1477,6 +1558,8 @@
 	printf("Listening on %s:%d\n", inet_ntoa(src.sin_addr), bindport);
 #endif
 
+	printf("Ready\n");
+
 	/* -- main loop -- */
 	while(1)  {
 		int max_fd = 0;
@@ -1508,11 +1591,18 @@
 				continue;
 			}
 
+			//printf("%d -> %d\n", conn->fd, conn->messages_num);
+
 			FD_SET(conn->fd, &readfs);
-			if(conn->messages || conn->fullviewtree) FD_SET(conn->fd, &writefs);
+			if( conn->messages
+			    || (conn->fullviewtree && conn->messages_num < 10)
+			    || (conn->delayedevents && conn->messages_num < 20) )
+				FD_SET(conn->fd, &writefs);
 			if(conn->fd > max_fd) max_fd = conn->fd;
 		}
 
+		//printf("%ld: SELECT\n", time(NULL));
+
 		//tv.tv_sec = 3600;
 		//tv.tv_usec = 0;
 		ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
@@ -1597,9 +1687,13 @@
 					conn->fd = newfd;
 					conn->sockaddr = addr;
 					conn->order = 0;
-					conn->buf_len = 0;
-					conn->buf_cur = conn->buf;
+					conn->recv_buf_len = 0;
+					conn->recv_buf_cur = conn->recv_buf;
+					conn->delayedevents = NULL;
+					conn->delayedevents_begin = 0;
+					conn->delayedevents_end = 0;
 					conn->messages = NULL;
+					conn->messages_num = 0;
 					conn->fullviewtree = NULL;
 					conn->fullviewcur = NULL;
 					conn->fullviewtimerset = 0;
@@ -1647,7 +1741,7 @@
 				if( FD_ISSET(conn->fd, &readfs)  )  {
 					ssize_t len;
 
-					len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->buf_cur - conn->buf) );
+					len = read(conn->fd, conn->recv_buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->recv_buf_cur - conn->recv_buf) );
 					if(len < 0)  {
 						switch(errno)  {
 							case EAGAIN:
@@ -1662,21 +1756,23 @@
 						vhffsfssync_net_conn_disable(conn);
 					}
 					else {
-						//printf("Read %d\n", len);
-						conn->buf_len += len;
+						//fprintf(stderr, "Read %d\n", len);
+						conn->recv_buf_len += len;
 						vhffsfssync_net_parse(conn);
 					}
 				}
 
 				/* try to send more data */
-				if(conn->messages && FD_ISSET(conn->fd, &writefs)  )  {
+				if( conn->messages  &&  FD_ISSET(conn->fd, &writefs) )
 					vhffsfssync_net_send(conn);
-				}
 
 				/* continue the fullview if needed */
-				if(conn->fullviewtree && FD_ISSET(conn->fd, &writefs)  )  {
+				if( conn->fullviewtree  &&  conn->messages_num < 10 )
 					vhffsfssync_net_fullview(conn, NULL);
-				}
+
+				/* try to parse more data */
+				if( conn->delayedevents  &&  conn->messages_num < 20 )
+					vhffsfssync_net_parse_delayed(conn);
 			}
 		}
 	}

Modified: trunk/vhffs-fssync/vhffsfssync_slave.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_slave.c	2008-10-11 21:33:04 UTC (rev 1276)
+++ trunk/vhffs-fssync/vhffsfssync_slave.c	2008-10-13 16:15:31 UTC (rev 1277)
@@ -27,7 +27,7 @@
 /* -- network stuff -- */
 // huge buffer size reduce syscalls
 /* TODO: fullview require a HUGE buffer, change that to a dynamically allocated buffer */
-#define VHFFSFSSYNC_NET_RECV_BUF_LEN 5242880
+#define VHFFSFSSYNC_NET_RECV_BUF_LEN 20971520
 
 typedef struct {
 	int fd;
@@ -40,6 +40,7 @@
 	FILE *chunk_file;
 	size_t chunk_stilltoread;
 
+	/* TODO: replace this linked list to a "slippery" dynamically allocated buffer */
 	GList *messages;
 } vhffsfssync_conn;
 


Mail converted by MHonArc 2.6.19+ http://listengine.tuxfamily.org/