[vhffs-dev] [1277] Improved a lot the responsiveness of the master, we are now delaying received events into a "slippery" dynamically allocated buffer if the server load is too important. |
[ Thread Index |
Date Index
| More vhffs.org/vhffs-dev Archives
]
- To: vhffs-dev@xxxxxxxxx
- Subject: [vhffs-dev] [1277] Improved a lot the responsiveness of the master, we are now delaying received events into a "slippery" dynamically allocated buffer if the server load is too important.
- From: subversion@xxxxxxxxxxxxx
- Date: Mon, 13 Oct 2008 18:15:32 +0200
Revision: 1277
Author: gradator
Date: 2008-10-13 18:15:31 +0200 (Mon, 13 Oct 2008)
Log Message:
-----------
Improved a lot the responsiveness of the master, we are now delaying received events into a "slippery" dynamically allocated buffer if the server load is too important.
Modified Paths:
--------------
trunk/vhffs-fssync/vhffsfssync_master.c
trunk/vhffs-fssync/vhffsfssync_slave.c
Modified: trunk/vhffs-fssync/vhffsfssync_master.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_master.c 2008-10-11 21:33:04 UTC (rev 1276)
+++ trunk/vhffs-fssync/vhffsfssync_master.c 2008-10-13 16:15:31 UTC (rev 1277)
@@ -70,6 +70,7 @@
/* -- network stuff -- */
+// huge buffer size reduce syscalls
#define VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK 65536
#define VHFFSFSSYNC_NET_RECV_BUF_LEN 65536
@@ -80,15 +81,20 @@
struct sockaddr_in sockaddr;
uint32_t order;
- char buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
- uint32_t buf_len;
- char *buf_cur;
+ /* TODO: change that to a dynamically allocated buffer */
+ char recv_buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
+ uint32_t recv_buf_len;
+ char *recv_buf_cur;
+ char **delayedevents;
+ uint32_t delayedevents_begin;
+ uint32_t delayedevents_end;
GList *messages;
+ uint32_t messages_num;
GList *fullviewtree;
GList *fullviewcur;
- int fullviewtimerset;
+ int8_t fullviewtimerset;
} vhffsfssync_conn;
@@ -181,6 +187,7 @@
int vhffsfssync_net_send(vhffsfssync_conn *conn);
int vhffsfssync_net_recv_event(vhffsfssync_conn *conn, char *event);
int vhffsfssync_net_parse(vhffsfssync_conn *conn);
+int vhffsfssync_net_parse_delayed(vhffsfssync_conn *conn);
int vhffsfssync_net_fullview(vhffsfssync_conn *conn, char *pathname);
void vhffsfssync_net_fullview_alarmsignal(int signo);
@@ -266,6 +273,7 @@
msg->data_len = len;
msg->data_cur = 0;
conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
+ conn->messages_num++;
return 0;
}
@@ -273,6 +281,7 @@
void vhffsfssync_net_destroy_data(vhffsfssync_conn *conn, vhffsfssync_net_message_data *datamsg) {
conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)datamsg);
+ conn->messages_num--;
free(datamsg->data_buffer);
free(datamsg);
}
@@ -372,6 +381,7 @@
msg->file_chunksize = -1;
msg->file_chunkcur = 0;
conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
+ conn->messages_num++;
return 0;
}
@@ -379,6 +389,7 @@
void vhffsfssync_net_destroy_file(vhffsfssync_conn *conn, vhffsfssync_net_message_file *filemsg) {
conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)filemsg);
+ conn->messages_num--;
/* we need to finish the chunk anyway */
if(filemsg->file_chunksize > 0) {
@@ -728,16 +739,16 @@
#if DEBUG_EVENTS
int i;
for(i = 0 ; i < argc ; i++) {
- printf("%s ", args[i]);
+ fprintf(stderr, "%s ", args[i]);
}
- printf("\n");
+ fprintf(stderr, "\n");
#endif
if(!strcmp(args[0], "get")) {
char *pathname = args[1];
struct stat st;
- printf("> %s\n", pathname);
+ //printf("> %s\n", pathname);
if(! lstat(pathname, &st) ) {
@@ -761,7 +772,7 @@
}
else if(!strcmp(args[0], "fulltree")) {
// the client requested a full tree of all available files
- vhffsfssync_net_fullview(conn, ".");
+ if(!conn->fullviewtree) vhffsfssync_net_fullview(conn, ".");
/* GList *lst;
for( lst = g_list_first(conn->fullviewtree) ; lst ; lst = g_list_next(lst) ) {
@@ -788,10 +799,11 @@
int vhffsfssync_net_parse(vhffsfssync_conn *conn) {
char *cur, *end;
- cur = conn->buf;
- end = conn->buf + conn->buf_len; //beware: end can be outside the buffer, you should NOT read *end
+ //fprintf(stderr, "Buffer %d\n", conn->recv_buf_len);
- //printf("Buffer %d\n", conn->buf_len);
+ /* parse the buffer */
+ cur = conn->recv_buf;
+ end = conn->recv_buf + conn->recv_buf_len; //beware: end can be outside the buffer, you should NOT read *end
while(cur < end) {
char *begin;
@@ -799,34 +811,100 @@
for(begin = cur ; ( cur < end && *cur++ != '\0' ) || ( cur < end && *cur++ != '\0' ) ; );
if( !*(cur-2) && !*(cur-1) ) {
- vhffsfssync_net_recv_event(conn, begin);
+ if(!conn->delayedevents && conn->messages_num < 20) {
+ vhffsfssync_net_recv_event(conn, begin);
+ }
+ else {
+ // system is overloaded, queue this event in delayed events
+ if( !(conn->delayedevents_end & 0x03FF) ) {
+ //printf("==> %d events, %d allocated\n", conn->delayedevents_end, ( (conn->delayedevents_end >>10) +1) <<10);
+ conn->delayedevents = realloc( conn->delayedevents, (((conn->delayedevents_end >>10) +1) <<10) * sizeof(char*) );
+ }
+ conn->delayedevents[conn->delayedevents_end] = malloc(cur - begin);
+ memcpy(conn->delayedevents[conn->delayedevents_end], begin, cur - begin);
+ conn->delayedevents_end++;
+ }
begin = cur;
}
if(cur == end) {
register uint32_t len;
len = end - begin;
- //printf("Not parsed %d\n", len);
+ //fprintf(stderr, "Not parsed %d\n", len);
+// fprintf(stderr, "buffer = %lu, cur = %lu, end = %lu, begin = %lu\n", (unsigned long)conn->recv_buf, (unsigned long)cur, (unsigned long)end, (unsigned long)begin);
+/* int i;
+ char *toto;
+ for(toto = begin, i = 0 ; i < 1000 && toto < end ; toto++, i++) {
+ fprintf(stderr, "%x ", *toto);
+ }
+ fprintf(stderr, "\n"); */
+
// buffer is full and we didn't manage to fetch everything
if(len == VHFFSFSSYNC_NET_RECV_BUF_LEN) {
fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
- conn->buf_cur = conn->buf;
- conn->buf_len = 0;
+ conn->recv_buf_cur = conn->recv_buf;
+ conn->recv_buf_len = 0;
+ exit(1);
break;
}
// copy the data that is not parsed to the begin of the buffer
- memcpy(conn->buf, begin, len);
- conn->buf_cur = conn->buf + len;
- conn->buf_len = len;
+ memcpy(conn->recv_buf, begin, len);
+ conn->recv_buf_cur = conn->recv_buf + len;
+ conn->recv_buf_len = len;
break;
}
}
+
+ //printf("==> finished with %d delayed events\n", conn->delayedevents_end - conn->delayedevents_begin);
return 0;
}
+int vhffsfssync_net_parse_delayed(vhffsfssync_conn *conn) {
+
+ //fprintf(stderr, "Delayed %d, begin %d, end %d\n", conn->delayedevents_end - conn->delayedevents_begin, conn->delayedevents_begin, conn->delayedevents_end);
+
+ /* try to parse delayed events */
+ while(conn->delayedevents && conn->messages_num < 20) {
+
+ vhffsfssync_net_recv_event(conn, conn->delayedevents[conn->delayedevents_begin]);
+ free(conn->delayedevents[conn->delayedevents_begin]);
+ conn->delayedevents_begin++;
+
+ if(conn->delayedevents_begin == conn->delayedevents_end) {
+ conn->delayedevents_begin = 0;
+ conn->delayedevents_end = 0;
+ free(conn->delayedevents);
+ conn->delayedevents = NULL;
+ break;
+ }
+
+ if(conn->delayedevents_begin == 0x0400) {
+ void *cur, *end;
+
+ conn->delayedevents_begin = 0;
+ conn->delayedevents_end -= 0x0400;
+
+ cur = conn->delayedevents;
+ end = conn->delayedevents + conn->delayedevents_end;
+ //printf("AA> sizeofvoid: %d, events: %d, grmbl= %d, buffer = %lu, cur = %lu, end = %lu\n", sizeof(char*), conn->delayedevents_end - conn->delayedevents_begin, (conn->delayedevents_end - conn->delayedevents_begin) * sizeof(void*), (unsigned long)conn->delayedevents, (unsigned long)cur, (unsigned long)end);
+ for( ; cur < end ; cur += 0x0400*sizeof(void*) ) {
+ //printf("Copie from %lu to %lu, %d bytes\n", (unsigned long)(cur+0x0400*sizeof(void*)), (unsigned long)cur, (cur+0x0400*sizeof(void*))-cur);
+ memcpy(cur, cur + 0x0400*sizeof(void*), 0x0400*sizeof(void*) );
+ }
+
+ //printf("XX: begin = %d, end = %d\n", conn->delayedevents_begin, conn->delayedevents_end);
+ //printf("XX> %d events, %d allocated, %d bytes allocated\n", conn->delayedevents_end, ( ( conn->delayedevents_end >>10) +1) <<10 , (((conn->delayedevents_end >>10) +1) <<10) * sizeof(char*) );
+ conn->delayedevents = realloc( conn->delayedevents, (((conn->delayedevents_end >>10) +1) <<10) * sizeof(char*) );
+ }
+ }
+
+ return 0;
+}
+
+
int vhffsfssync_net_fullview(vhffsfssync_conn *conn, char *pathname) {
/*
@@ -936,27 +1014,31 @@
int r;
char *path = ldirs->data;
- printf("%s\n", path);
+ //printf("%s\n", path);
// we can cancel the recursion here at anytime
// if( rand() < (RAND_MAX/10)*7 ) {
- if(conn->fullviewtimerset > 0) {
+ if(conn->messages_num < 10 && conn->fullviewtimerset > 0) {
// do nothing, this is here because it matchs most of the time
}
- else if(conn->fullviewtimerset < 0) {
- printf("CANCELLED\n");
- conn->fullviewcur = NULL;
- conn->fullviewtimerset = 0;
- return 1;
- }
else {
- struct sigaction act;
- act.sa_handler = vhffsfssync_net_fullview_alarmsignal;
- sigemptyset(&act.sa_mask);
- act.sa_flags = SA_RESETHAND;
- sigaction(SIGALRM, &act, NULL);
- alarm(1);
- conn->fullviewtimerset = 1;
+ if(conn->messages_num >= 10 || conn->fullviewtimerset < 0) {
+ //printf("%ld: CANCELLED, nums=%d, timer=%d\n", time(NULL), conn->messages_num, conn->fullviewtimerset);
+ signal(SIGALRM, SIG_IGN);
+ conn->fullviewcur = NULL;
+ conn->fullviewtimerset = 0;
+ return 1;
+ }
+
+ if(conn->fullviewtimerset == 0) {
+ struct sigaction act;
+ act.sa_handler = vhffsfssync_net_fullview_alarmsignal;
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = SA_RESETHAND;
+ sigaction(SIGALRM, &act, NULL);
+ conn->fullviewtimerset = 1;
+ alarm(1);
+ }
}
//printf("A: %p\n", conn->fullviewcur);
@@ -984,9 +1066,9 @@
conn->fullviewtree = g_list_delete_link(conn->fullviewtree, g_list_last(conn->fullviewtree) );
}
- if(!conn->fullviewtree) {
- printf("CLEAN UP SUCCESSFULL\n");
- }
+/* if(!conn->fullviewtree) {
+ printf("%ld: CLEAN UP SUCCESSFULL\n", time(NULL));
+ } */
return 0;
}
@@ -1433,7 +1515,6 @@
return -1;
}
-
/* -- network stuff -- */
vhffsfssync_conns = NULL;
@@ -1477,6 +1558,8 @@
printf("Listening on %s:%d\n", inet_ntoa(src.sin_addr), bindport);
#endif
+ printf("Ready\n");
+
/* -- main loop -- */
while(1) {
int max_fd = 0;
@@ -1508,11 +1591,18 @@
continue;
}
+ //printf("%d -> %d\n", conn->fd, conn->messages_num);
+
FD_SET(conn->fd, &readfs);
- if(conn->messages || conn->fullviewtree) FD_SET(conn->fd, &writefs);
+ if( conn->messages
+ || (conn->fullviewtree && conn->messages_num < 10)
+ || (conn->delayedevents && conn->messages_num < 20) )
+ FD_SET(conn->fd, &writefs);
if(conn->fd > max_fd) max_fd = conn->fd;
}
+ //printf("%ld: SELECT\n", time(NULL));
+
//tv.tv_sec = 3600;
//tv.tv_usec = 0;
ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
@@ -1597,9 +1687,13 @@
conn->fd = newfd;
conn->sockaddr = addr;
conn->order = 0;
- conn->buf_len = 0;
- conn->buf_cur = conn->buf;
+ conn->recv_buf_len = 0;
+ conn->recv_buf_cur = conn->recv_buf;
+ conn->delayedevents = NULL;
+ conn->delayedevents_begin = 0;
+ conn->delayedevents_end = 0;
conn->messages = NULL;
+ conn->messages_num = 0;
conn->fullviewtree = NULL;
conn->fullviewcur = NULL;
conn->fullviewtimerset = 0;
@@ -1647,7 +1741,7 @@
if( FD_ISSET(conn->fd, &readfs) ) {
ssize_t len;
- len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->buf_cur - conn->buf) );
+ len = read(conn->fd, conn->recv_buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->recv_buf_cur - conn->recv_buf) );
if(len < 0) {
switch(errno) {
case EAGAIN:
@@ -1662,21 +1756,23 @@
vhffsfssync_net_conn_disable(conn);
}
else {
- //printf("Read %d\n", len);
- conn->buf_len += len;
+ //fprintf(stderr, "Read %d\n", len);
+ conn->recv_buf_len += len;
vhffsfssync_net_parse(conn);
}
}
/* try to send more data */
- if(conn->messages && FD_ISSET(conn->fd, &writefs) ) {
+ if( conn->messages && FD_ISSET(conn->fd, &writefs) )
vhffsfssync_net_send(conn);
- }
/* continue the fullview if needed */
- if(conn->fullviewtree && FD_ISSET(conn->fd, &writefs) ) {
+ if( conn->fullviewtree && conn->messages_num < 10 )
vhffsfssync_net_fullview(conn, NULL);
- }
+
+ /* try to parse more data */
+ if( conn->delayedevents && conn->messages_num < 20 )
+ vhffsfssync_net_parse_delayed(conn);
}
}
}
Modified: trunk/vhffs-fssync/vhffsfssync_slave.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_slave.c 2008-10-11 21:33:04 UTC (rev 1276)
+++ trunk/vhffs-fssync/vhffsfssync_slave.c 2008-10-13 16:15:31 UTC (rev 1277)
@@ -27,7 +27,7 @@
/* -- network stuff -- */
// huge buffer size reduce syscalls
/* TODO: fullview require a HUGE buffer, change that to a dynamically allocated buffer */
-#define VHFFSFSSYNC_NET_RECV_BUF_LEN 5242880
+#define VHFFSFSSYNC_NET_RECV_BUF_LEN 20971520
typedef struct {
int fd;
@@ -40,6 +40,7 @@
FILE *chunk_file;
size_t chunk_stilltoread;
+ /* TODO: replace this linked list to a "slippery" dynamically allocated buffer */
GList *messages;
} vhffsfssync_conn;