[vhffs-dev] [1272] The master is now able to parse slaves messages.

[ Thread Index | Date Index | More vhffs.org/vhffs-dev Archives ]


Revision: 1272
Author:   gradator
Date:     2008-10-09 00:07:54 +0200 (Thu, 09 Oct 2008)

Log Message:
-----------
The master is now able to parse slaves messages. Slaves are now able to send messages to master.

Modified Paths:
--------------
    trunk/vhffs-fssync/vhffsfssync_master.c
    trunk/vhffs-fssync/vhffsfssync_slave.c


Modified: trunk/vhffs-fssync/vhffsfssync_master.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_master.c	2008-10-08 15:48:44 UTC (rev 1271)
+++ trunk/vhffs-fssync/vhffsfssync_master.c	2008-10-08 22:07:54 UTC (rev 1272)
@@ -2,6 +2,7 @@
 
 #define DEBUG_NET 0
 #define DEBUG_INOTIFY 0
+#define DEBUG_EVENTS 1
 
 #include <unistd.h>
 #include <stdio.h>
@@ -69,20 +70,20 @@
 
 /* -- network stuff -- */
 #define VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK 65536
+#define VHFFSFSSYNC_NET_RECV_BUF_LEN 65536
 
 GList *vhffsfssync_conns;
 
 typedef struct {
 	int fd;
 	struct sockaddr_in sockaddr;
+	uint32_t order;
+
+	char buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
+	uint32_t buf_len;
+	char *buf_cur;
+
 	GList *messages;
-	uint32_t order;
-//	char *recvbuf;
-//	ssize_t recvbuf_cur;
-//	ssize_t recvbuf_size;
-//	char *sendbuf;
-//	ssize_t sendbuf_cur;
-//	ssize_t sendbuf_size;
 } vhffsfssync_conn;
 
 
@@ -173,6 +174,8 @@
 int vhffsfssync_net_remove_file(vhffsfssync_conn *conn, char *pathname);
 void vhffsfssync_net_broadcast_file(char *pathname);
 int vhffsfssync_net_send(vhffsfssync_conn *conn);
+int vhffsfssync_net_recv_event(vhffsfssync_conn *conn, char *event);
+int vhffsfssync_net_parse(vhffsfssync_conn *conn);
 
 
 /* ----------------------------------------- */
@@ -687,7 +690,83 @@
 #endif
 
 
+int vhffsfssync_net_recv_event(vhffsfssync_conn *conn, char *event)  {
+	char *cur, *args[10];
+	int argc;
 
+	argc = 0;
+	do {
+		for(cur = event ; *cur++ != '\0' ; );
+		args[argc++] = event;
+		event = cur;
+	} while(*event && argc < 10);
+	if(!argc) return -1;
+
+#if DEBUG_EVENTS
+	int i;
+	for(i = 0 ; i < argc ; i++) {
+		printf("%s ", args[i]);
+	}
+	printf("\n");
+#endif
+
+	if(!strcmp(args[0], "fulltree")) {
+		// the client requested a full tree of all available files
+
+	}
+	else if(!strcmp(args[0], "hello")) {
+		// nice to meet you
+	}
+	else {
+		fprintf(stderr, "conn %d, received unhandled event: %s\n", conn->fd, args[0]);
+	}
+
+	return 0;
+}
+
+
+int vhffsfssync_net_parse(vhffsfssync_conn *conn)  {
+	char *cur, *end;
+
+	cur = conn->buf;
+	end = conn->buf + conn->buf_len; //beware: end can be outside the buffer, you should NOT read *end
+
+	//printf("Buffer %d\n", conn->buf_len);
+
+	while(cur < end)  {
+		char *begin;
+		// find "\0\0"
+		for(begin = cur ; ( cur < end && *cur++ != '\0' ) || ( cur < end && *cur++ != '\0' ) ; );
+
+		if( !*(cur-2) && !*(cur-1) ) {
+			vhffsfssync_net_recv_event(conn, begin);
+			begin = cur;
+		}
+
+		if(cur == end)  {
+			register uint32_t len;
+			len = end - begin;
+			//printf("Not parsed %d\n", len);
+
+			// buffer is full and we didn't manage to fetch everything
+			if(len == VHFFSFSSYNC_NET_RECV_BUF_LEN)  {
+				fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
+				conn->buf_cur = conn->buf;
+				conn->buf_len = 0;
+				break;
+			}
+
+			// copy the data that is not parsed to the begin of the buffer
+			memcpy(conn->buf, begin, len);
+			conn->buf_cur = conn->buf + len;
+			conn->buf_len = len;
+			break;
+		}
+	}
+	return 0;
+}
+
+
 /* -- inotify stuff -- */
 
 int vhffsfssync_add_watch(int inotifyfd, const char *pathname, uint32_t mask)  {
@@ -1278,12 +1357,8 @@
 					conn->fd = newfd;
 					conn->sockaddr = addr;
 					conn->order = 0;
-					//conn->recvbuf = NULL;
-					//conn->recvbuf_cur = 0;
-					//conn->recvbuf_size = 0;
-					//conn->sendbuf = NULL;
-					//conn->sendbuf_cur = 0;
-					//conn->sendbuf_size = 0;
+					conn->buf_len = 0;
+					conn->buf_cur = conn->buf;
 					conn->messages = NULL;
 					vhffsfssync_conns = g_list_append(vhffsfssync_conns, conn);
 #if DEBUG_NET
@@ -1325,12 +1400,11 @@
 				vhffsfssync_conn *conn = conns->data;
 				conns = g_list_next(conns);
 
-				/* data to read ?, give give ! */
+				/* data to read ?, gimme gimme ! */
 				if( FD_ISSET(conn->fd, &readfs)  )  {
-					char buf[VHFFSFSSYNC_BUF_LEN];
 					ssize_t len;
 
-					len = read(conn->fd, buf, VHFFSFSSYNC_BUF_LEN);
+					len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->buf_cur - conn->buf) );
 					if(len < 0)  {
 						switch(errno)  {
 							case EAGAIN:
@@ -1345,16 +1419,14 @@
 						vhffsfssync_net_conn_disable(conn);
 					}
 					else {
-						char *plop = malloc(len+1);
-						memcpy(plop, buf, len);
-						plop[len] = '\0';
-						printf("%s", plop);
-						free(plop);
+						//printf("Read %d\n", len);
+						conn->buf_len += len;
+						vhffsfssync_net_parse(conn);
 					}
 				}
 
 				/* try to send more data */
-				if(conn && conn->messages && FD_ISSET(conn->fd, &writefs)  )  {
+				if(conn->messages && FD_ISSET(conn->fd, &writefs)  )  {
 					vhffsfssync_net_send(conn);
 				}
 			}

Modified: trunk/vhffs-fssync/vhffsfssync_slave.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_slave.c	2008-10-08 15:48:44 UTC (rev 1271)
+++ trunk/vhffs-fssync/vhffsfssync_slave.c	2008-10-08 22:07:54 UTC (rev 1272)
@@ -1,8 +1,8 @@
 #define _FILE_OFFSET_BITS 64
 #define _ATFILE_SOURCE
 
-#define DEBUG_NET 0
-#define DEBUG_EVENTS 0
+#define DEBUG_NET 1
+#define DEBUG_EVENTS 1
 
 #include <unistd.h>
 #include <stdio.h>
@@ -26,27 +26,132 @@
 
 /* -- network stuff -- */
 // huge buffer size reduce syscalls
-#define VHFFSFSSYNC_NET_BUF_LEN 262144
+#define VHFFSFSSYNC_NET_RECV_BUF_LEN 262144
 
 typedef struct {
 	int fd;
 	struct sockaddr_in sockaddr;
 
-	char buf[VHFFSFSSYNC_NET_BUF_LEN];
+	char buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
 	uint32_t buf_len;
 	char *buf_cur;
 
 	FILE *chunk_file;
 	size_t chunk_stilltoread;
+
+	GList *messages;
 } vhffsfssync_conn;
 
-// protos
+
+typedef struct {
+	char *data_buffer;
+	ssize_t data_len;
+	ssize_t data_cur;
+} vhffsfssync_net_message;
+
+// network protos
+int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len);
+void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg);
+int vhffsfssync_net_send(vhffsfssync_conn *conn);
+
+// events protos
 int vhffsfssync_remove(char *pathname);
 int vhffsfssync_mkdir(char *pathname, mode_t mode);
 int vhffsfssync_event(vhffsfssync_conn *conn, char *event);
 int vhffsfssync_parse(vhffsfssync_conn *conn);
 
+/* ------------------------------------------------------------ */
 
+/* our events use \0 as a delimiter, a double \0 is the end of the event */
+inline ssize_t vhffsfssync_net_event_len(char *data)  {
+	ssize_t len = 0;
+	do {
+		len += strlen(data+len);  //glibc strlen() is incredibly fast, we use it as much as possible
+		len++;
+	} while( *(data+len) );
+	len++;
+	return len;
+}
+
+inline int vhffsfssync_net_send_event(vhffsfssync_conn *conn, char *data)  {
+	return vhffsfssync_net_send_data(conn, data, vhffsfssync_net_event_len(data));
+}
+
+// !!!!!! the buffer is freed when the message has been sent, DON'T send static string and DON'T free() the data yourself
+int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len)  {
+	vhffsfssync_net_message *msg;
+	if(!conn || !data || len <= 0) return -1;
+
+	msg = malloc(sizeof(vhffsfssync_net_message));
+	msg->data_buffer = data;
+	msg->data_len = len;
+	msg->data_cur = 0;
+	conn->messages = g_list_append(conn->messages, msg);
+	return 0;
+}
+
+
+void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg) {
+	conn->messages = g_list_remove(conn->messages, msg);
+	free(msg->data_buffer);
+	free(msg);
+}
+
+
+int vhffsfssync_net_send(vhffsfssync_conn *conn)  {
+	GList *msgs;
+	gboolean full = FALSE;
+
+	if(!conn || conn->fd < 0) return -1;
+#if DEBUG_NET
+	printf("--------------------------------------------------\n");
+	printf("conn: %d, to: %s\n", conn->fd, inet_ntoa(conn->sockaddr.sin_addr));
+#endif
+	while(!full  &&  (msgs = g_list_first(conn->messages)) )  {
+		vhffsfssync_net_message *msg = msgs->data;
+		ssize_t written;
+		ssize_t lentowrite;
+
+#if DEBUG_NET
+		printf("  buffer: %d bytes, %d already written\n", msg->data_len, msg->data_cur);
+#endif
+		/* try to empty the buffer */
+		lentowrite = msg->data_len - msg->data_cur;
+		written = write(conn->fd, msg->data_buffer + msg->data_cur, lentowrite);
+		if(written < 0) {
+			switch(errno)  {
+				case EAGAIN:
+				case EINTR:
+#if DEBUG_NET
+					printf("=====> EAGAIN on write()\n");
+#endif
+					full = TRUE;
+					break;
+				default:
+					fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
+					return -1;
+			}
+		}
+		else {
+			msg->data_cur += written;
+#if DEBUG_NET
+			printf("    %d bytes written on %d bytes\n", written, lentowrite);
+#endif
+			/* the buffer is not empty yet (but the SendQ into the kernel is) */
+			if(written < lentowrite) {
+				full = TRUE;
+			}
+			/* buffer is now empty */
+			else {
+				vhffsfssync_net_destroy_message(conn, msg);
+			}
+		}
+	}
+
+	return 0;
+}
+
+
 /* ---------------------------------------- */
 int vhffsfssync_remove(char *pathname)  {
 	struct stat st;
@@ -216,7 +321,7 @@
 		}
 	}
 	else if(!strcmp(args[0], "hello")) {
-		// hello too !
+		// nice to meet you
 	}
 	else {
 		fprintf(stderr, "Received unhandled event: %s\n", args[0]);
@@ -254,7 +359,7 @@
 				//printf("Not parsed %d\n", len);
 
 				// buffer is full and we didn't manage to fetch everything
-				if(len == VHFFSFSSYNC_NET_BUF_LEN)  {
+				if(len == VHFFSFSSYNC_NET_RECV_BUF_LEN)  {
 					fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
 					conn->buf_cur = conn->buf;
 					conn->buf_len = 0;
@@ -326,15 +431,40 @@
 	port = atoi(argv[3]);
 	signal(SIGPIPE, SIG_IGN);
 	conn = malloc(sizeof(vhffsfssync_conn));
+	conn->fd = -1;
+	conn->buf_len = 0;
+	conn->buf_cur = conn->buf;
+	conn->chunk_stilltoread = 0;
+	conn->chunk_file = NULL;
+	conn->messages = NULL;
 
 	/* -- main loop -- */
 	while(1)  {
+		/* clean the previous connection */
+		if(conn->fd >= 0) {
+			shutdown(conn->fd, SHUT_RDWR);
+			close(conn->fd);
+		}
 		conn->fd = -1;
+
 		conn->buf_len = 0;
 		conn->buf_cur = conn->buf;
 		conn->chunk_stilltoread = 0;
+
+		if(conn->chunk_file)  {
+			fclose(conn->chunk_file);
+		}
 		conn->chunk_file = NULL;
 
+		if(conn->messages) {
+			GList *msg;
+			while( (msg = g_list_first(conn->messages)) ) {
+				vhffsfssync_net_destroy_message(conn, (vhffsfssync_net_message*)msg->data);
+			}
+		}
+		conn->messages = NULL;
+
+
 		/* connect */
 		inet_aton(host, &conn->sockaddr.sin_addr);
 		conn->sockaddr.sin_family = AF_INET;
@@ -349,11 +479,7 @@
 		if( connect(conn->fd, (struct sockaddr*)&conn->sockaddr, sizeof(conn->sockaddr)) < 0 )  {
 
 			fprintf(stderr, "connect() failed: %s\n", strerror(errno));
-			shutdown(conn->fd, SHUT_RDWR);
-			close(conn->fd);
-			conn->fd = -1;
-			sleep(1);
-			continue;
+			goto disconnected;
 		}
 
 		/* set newfd to non-blocking */
@@ -364,7 +490,15 @@
 		}
 
 		/* connected */
-		while(conn->fd >= 0)  {
+		vhffsfssync_net_send_event(conn, g_strdup_printf("hello%c", '\0') );
+		vhffsfssync_net_send_event(conn, g_strdup_printf("fulltree%c", '\0') );
+
+		//char *grosbuf=malloc(10485760);
+		//memset(grosbuf, 'D', 10485760);
+		//vhffsfssync_net_send_data(conn, grosbuf, 10485760);
+
+		/* -- the real main loop starts here -- */
+		while(1)  {
 			int max_fd = 0;
 			fd_set readfs;
 			fd_set writefs;
@@ -375,6 +509,7 @@
 			FD_ZERO(&writefs);
 
 			FD_SET(conn->fd, &readfs);
+			if(conn->messages) FD_SET(conn->fd, &writefs);
 			if(conn->fd > max_fd) max_fd = conn->fd;
 
 			//tv.tv_sec = 3600;
@@ -390,11 +525,11 @@
 				}
 			}
 			if(ret > 0)  {
-				/* data to read ?, give give ! */
+				/* data to read ?, gimme gimme ! */
 				if(FD_ISSET(conn->fd, &readfs)  )  {
 					ssize_t len;
 
-					len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_BUF_LEN - (conn->buf_cur - conn->buf) );
+					len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->buf_cur - conn->buf) );
 					if(len < 0)  {
 						switch(errno)  {
 							case EAGAIN:
@@ -402,15 +537,11 @@
 								break;
 							default:
 								fprintf(stderr, "read() failed on socket %d: %s\n", conn->fd, strerror(errno));
-								conn->fd = -1;
-								break;
+								goto disconnected;
 						}
 					}
 					else if(len == 0) {
-#if DEBUG_NET
-						printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
-#endif
-						conn->fd = -1;
+						goto disconnected;
 					}
 					else {
 						//printf("Read %d\n", len);
@@ -418,8 +549,21 @@
 						vhffsfssync_parse(conn);
 					}
 				}
+
+				/* data to send ?, send send ! */
+				if(conn->messages && FD_ISSET(conn->fd, &writefs)  )  {
+					if( vhffsfssync_net_send(conn) ) {
+						goto disconnected;
+					}
+				}
 			}
 		}
+// YES, I am evil, I am using goto and I don't care about !
+disconnected:
+#if DEBUG_NET
+		printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+		sleep(1);
 	}
 
 	return 0;


Mail converted by MHonArc 2.6.19+ http://listengine.tuxfamily.org/