[vhffs-dev] [1272] The master is now able to parse slaves messages. |
[ Thread Index |
Date Index
| More vhffs.org/vhffs-dev Archives
]
Revision: 1272
Author: gradator
Date: 2008-10-09 00:07:54 +0200 (Thu, 09 Oct 2008)
Log Message:
-----------
The master is now able to parse slaves messages. Slaves are now able to send messages to master.
Modified Paths:
--------------
trunk/vhffs-fssync/vhffsfssync_master.c
trunk/vhffs-fssync/vhffsfssync_slave.c
Modified: trunk/vhffs-fssync/vhffsfssync_master.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_master.c 2008-10-08 15:48:44 UTC (rev 1271)
+++ trunk/vhffs-fssync/vhffsfssync_master.c 2008-10-08 22:07:54 UTC (rev 1272)
@@ -2,6 +2,7 @@
#define DEBUG_NET 0
#define DEBUG_INOTIFY 0
+#define DEBUG_EVENTS 1
#include <unistd.h>
#include <stdio.h>
@@ -69,20 +70,20 @@
/* -- network stuff -- */
#define VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK 65536
+#define VHFFSFSSYNC_NET_RECV_BUF_LEN 65536
GList *vhffsfssync_conns;
typedef struct {
int fd;
struct sockaddr_in sockaddr;
+ uint32_t order;
+
+ char buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
+ uint32_t buf_len;
+ char *buf_cur;
+
GList *messages;
- uint32_t order;
-// char *recvbuf;
-// ssize_t recvbuf_cur;
-// ssize_t recvbuf_size;
-// char *sendbuf;
-// ssize_t sendbuf_cur;
-// ssize_t sendbuf_size;
} vhffsfssync_conn;
@@ -173,6 +174,8 @@
int vhffsfssync_net_remove_file(vhffsfssync_conn *conn, char *pathname);
void vhffsfssync_net_broadcast_file(char *pathname);
int vhffsfssync_net_send(vhffsfssync_conn *conn);
+int vhffsfssync_net_recv_event(vhffsfssync_conn *conn, char *event);
+int vhffsfssync_net_parse(vhffsfssync_conn *conn);
/* ----------------------------------------- */
@@ -687,7 +690,83 @@
#endif
+int vhffsfssync_net_recv_event(vhffsfssync_conn *conn, char *event) {
+ char *cur, *args[10];
+ int argc;
+ argc = 0;
+ do {
+ for(cur = event ; *cur++ != '\0' ; );
+ args[argc++] = event;
+ event = cur;
+ } while(*event && argc < 10);
+ if(!argc) return -1;
+
+#if DEBUG_EVENTS
+ int i;
+ for(i = 0 ; i < argc ; i++) {
+ printf("%s ", args[i]);
+ }
+ printf("\n");
+#endif
+
+ if(!strcmp(args[0], "fulltree")) {
+ // the client requested a full tree of all available files
+
+ }
+ else if(!strcmp(args[0], "hello")) {
+ // nice to meet you
+ }
+ else {
+ fprintf(stderr, "conn %d, received unhandled event: %s\n", conn->fd, args[0]);
+ }
+
+ return 0;
+}
+
+
+int vhffsfssync_net_parse(vhffsfssync_conn *conn) {
+ char *cur, *end;
+
+ cur = conn->buf;
+ end = conn->buf + conn->buf_len; //beware: end can be outside the buffer, you should NOT read *end
+
+ //printf("Buffer %d\n", conn->buf_len);
+
+ while(cur < end) {
+ char *begin;
+ // find "\0\0"
+ for(begin = cur ; ( cur < end && *cur++ != '\0' ) || ( cur < end && *cur++ != '\0' ) ; );
+
+ if( !*(cur-2) && !*(cur-1) ) {
+ vhffsfssync_net_recv_event(conn, begin);
+ begin = cur;
+ }
+
+ if(cur == end) {
+ register uint32_t len;
+ len = end - begin;
+ //printf("Not parsed %d\n", len);
+
+ // buffer is full and we didn't manage to fetch everything
+ if(len == VHFFSFSSYNC_NET_RECV_BUF_LEN) {
+ fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
+ conn->buf_cur = conn->buf;
+ conn->buf_len = 0;
+ break;
+ }
+
+ // copy the data that is not parsed to the begin of the buffer
+ memcpy(conn->buf, begin, len);
+ conn->buf_cur = conn->buf + len;
+ conn->buf_len = len;
+ break;
+ }
+ }
+ return 0;
+}
+
+
/* -- inotify stuff -- */
int vhffsfssync_add_watch(int inotifyfd, const char *pathname, uint32_t mask) {
@@ -1278,12 +1357,8 @@
conn->fd = newfd;
conn->sockaddr = addr;
conn->order = 0;
- //conn->recvbuf = NULL;
- //conn->recvbuf_cur = 0;
- //conn->recvbuf_size = 0;
- //conn->sendbuf = NULL;
- //conn->sendbuf_cur = 0;
- //conn->sendbuf_size = 0;
+ conn->buf_len = 0;
+ conn->buf_cur = conn->buf;
conn->messages = NULL;
vhffsfssync_conns = g_list_append(vhffsfssync_conns, conn);
#if DEBUG_NET
@@ -1325,12 +1400,11 @@
vhffsfssync_conn *conn = conns->data;
conns = g_list_next(conns);
- /* data to read ?, give give ! */
+ /* data to read ?, gimme gimme ! */
if( FD_ISSET(conn->fd, &readfs) ) {
- char buf[VHFFSFSSYNC_BUF_LEN];
ssize_t len;
- len = read(conn->fd, buf, VHFFSFSSYNC_BUF_LEN);
+ len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->buf_cur - conn->buf) );
if(len < 0) {
switch(errno) {
case EAGAIN:
@@ -1345,16 +1419,14 @@
vhffsfssync_net_conn_disable(conn);
}
else {
- char *plop = malloc(len+1);
- memcpy(plop, buf, len);
- plop[len] = '\0';
- printf("%s", plop);
- free(plop);
+ //printf("Read %d\n", len);
+ conn->buf_len += len;
+ vhffsfssync_net_parse(conn);
}
}
/* try to send more data */
- if(conn && conn->messages && FD_ISSET(conn->fd, &writefs) ) {
+ if(conn->messages && FD_ISSET(conn->fd, &writefs) ) {
vhffsfssync_net_send(conn);
}
}
Modified: trunk/vhffs-fssync/vhffsfssync_slave.c
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_slave.c 2008-10-08 15:48:44 UTC (rev 1271)
+++ trunk/vhffs-fssync/vhffsfssync_slave.c 2008-10-08 22:07:54 UTC (rev 1272)
@@ -1,8 +1,8 @@
#define _FILE_OFFSET_BITS 64
#define _ATFILE_SOURCE
-#define DEBUG_NET 0
-#define DEBUG_EVENTS 0
+#define DEBUG_NET 1
+#define DEBUG_EVENTS 1
#include <unistd.h>
#include <stdio.h>
@@ -26,27 +26,132 @@
/* -- network stuff -- */
// huge buffer size reduce syscalls
-#define VHFFSFSSYNC_NET_BUF_LEN 262144
+#define VHFFSFSSYNC_NET_RECV_BUF_LEN 262144
typedef struct {
int fd;
struct sockaddr_in sockaddr;
- char buf[VHFFSFSSYNC_NET_BUF_LEN];
+ char buf[VHFFSFSSYNC_NET_RECV_BUF_LEN];
uint32_t buf_len;
char *buf_cur;
FILE *chunk_file;
size_t chunk_stilltoread;
+
+ GList *messages;
} vhffsfssync_conn;
-// protos
+
+typedef struct {
+ char *data_buffer;
+ ssize_t data_len;
+ ssize_t data_cur;
+} vhffsfssync_net_message;
+
+// network protos
+int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len);
+void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg);
+int vhffsfssync_net_send(vhffsfssync_conn *conn);
+
+// events protos
int vhffsfssync_remove(char *pathname);
int vhffsfssync_mkdir(char *pathname, mode_t mode);
int vhffsfssync_event(vhffsfssync_conn *conn, char *event);
int vhffsfssync_parse(vhffsfssync_conn *conn);
+/* ------------------------------------------------------------ */
+/* our events use \0 as a delimiter, a double \0 is the end of the event */
+inline ssize_t vhffsfssync_net_event_len(char *data) {
+ ssize_t len = 0;
+ do {
+ len += strlen(data+len); //glibc strlen() is incredibly fast, we use it as much as possible
+ len++;
+ } while( *(data+len) );
+ len++;
+ return len;
+}
+
+inline int vhffsfssync_net_send_event(vhffsfssync_conn *conn, char *data) {
+ return vhffsfssync_net_send_data(conn, data, vhffsfssync_net_event_len(data));
+}
+
+// !!!!!! the buffer is freed when the message has been sent, DON'T send static string and DON'T free() the data yourself
+int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len) {
+ vhffsfssync_net_message *msg;
+ if(!conn || !data || len <= 0) return -1;
+
+ msg = malloc(sizeof(vhffsfssync_net_message));
+ msg->data_buffer = data;
+ msg->data_len = len;
+ msg->data_cur = 0;
+ conn->messages = g_list_append(conn->messages, msg);
+ return 0;
+}
+
+
+void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg) {
+ conn->messages = g_list_remove(conn->messages, msg);
+ free(msg->data_buffer);
+ free(msg);
+}
+
+
+int vhffsfssync_net_send(vhffsfssync_conn *conn) {
+ GList *msgs;
+ gboolean full = FALSE;
+
+ if(!conn || conn->fd < 0) return -1;
+#if DEBUG_NET
+ printf("--------------------------------------------------\n");
+ printf("conn: %d, to: %s\n", conn->fd, inet_ntoa(conn->sockaddr.sin_addr));
+#endif
+ while(!full && (msgs = g_list_first(conn->messages)) ) {
+ vhffsfssync_net_message *msg = msgs->data;
+ ssize_t written;
+ ssize_t lentowrite;
+
+#if DEBUG_NET
+ printf(" buffer: %d bytes, %d already written\n", msg->data_len, msg->data_cur);
+#endif
+ /* try to empty the buffer */
+ lentowrite = msg->data_len - msg->data_cur;
+ written = write(conn->fd, msg->data_buffer + msg->data_cur, lentowrite);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+#if DEBUG_NET
+ printf("=====> EAGAIN on write()\n");
+#endif
+ full = TRUE;
+ break;
+ default:
+ fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
+ return -1;
+ }
+ }
+ else {
+ msg->data_cur += written;
+#if DEBUG_NET
+ printf(" %d bytes written on %d bytes\n", written, lentowrite);
+#endif
+ /* the buffer is not empty yet (but the SendQ into the kernel is) */
+ if(written < lentowrite) {
+ full = TRUE;
+ }
+ /* buffer is now empty */
+ else {
+ vhffsfssync_net_destroy_message(conn, msg);
+ }
+ }
+ }
+
+ return 0;
+}
+
+
/* ---------------------------------------- */
int vhffsfssync_remove(char *pathname) {
struct stat st;
@@ -216,7 +321,7 @@
}
}
else if(!strcmp(args[0], "hello")) {
- // hello too !
+ // nice to meet you
}
else {
fprintf(stderr, "Received unhandled event: %s\n", args[0]);
@@ -254,7 +359,7 @@
//printf("Not parsed %d\n", len);
// buffer is full and we didn't manage to fetch everything
- if(len == VHFFSFSSYNC_NET_BUF_LEN) {
+ if(len == VHFFSFSSYNC_NET_RECV_BUF_LEN) {
fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
conn->buf_cur = conn->buf;
conn->buf_len = 0;
@@ -326,15 +431,40 @@
port = atoi(argv[3]);
signal(SIGPIPE, SIG_IGN);
conn = malloc(sizeof(vhffsfssync_conn));
+ conn->fd = -1;
+ conn->buf_len = 0;
+ conn->buf_cur = conn->buf;
+ conn->chunk_stilltoread = 0;
+ conn->chunk_file = NULL;
+ conn->messages = NULL;
/* -- main loop -- */
while(1) {
+ /* clean the previous connection */
+ if(conn->fd >= 0) {
+ shutdown(conn->fd, SHUT_RDWR);
+ close(conn->fd);
+ }
conn->fd = -1;
+
conn->buf_len = 0;
conn->buf_cur = conn->buf;
conn->chunk_stilltoread = 0;
+
+ if(conn->chunk_file) {
+ fclose(conn->chunk_file);
+ }
conn->chunk_file = NULL;
+ if(conn->messages) {
+ GList *msg;
+ while( (msg = g_list_first(conn->messages)) ) {
+ vhffsfssync_net_destroy_message(conn, (vhffsfssync_net_message*)msg->data);
+ }
+ }
+ conn->messages = NULL;
+
+
/* connect */
inet_aton(host, &conn->sockaddr.sin_addr);
conn->sockaddr.sin_family = AF_INET;
@@ -349,11 +479,7 @@
if( connect(conn->fd, (struct sockaddr*)&conn->sockaddr, sizeof(conn->sockaddr)) < 0 ) {
fprintf(stderr, "connect() failed: %s\n", strerror(errno));
- shutdown(conn->fd, SHUT_RDWR);
- close(conn->fd);
- conn->fd = -1;
- sleep(1);
- continue;
+ goto disconnected;
}
/* set newfd to non-blocking */
@@ -364,7 +490,15 @@
}
/* connected */
- while(conn->fd >= 0) {
+ vhffsfssync_net_send_event(conn, g_strdup_printf("hello%c", '\0') );
+ vhffsfssync_net_send_event(conn, g_strdup_printf("fulltree%c", '\0') );
+
+ //char *grosbuf=malloc(10485760);
+ //memset(grosbuf, 'D', 10485760);
+ //vhffsfssync_net_send_data(conn, grosbuf, 10485760);
+
+ /* -- the real main loop starts here -- */
+ while(1) {
int max_fd = 0;
fd_set readfs;
fd_set writefs;
@@ -375,6 +509,7 @@
FD_ZERO(&writefs);
FD_SET(conn->fd, &readfs);
+ if(conn->messages) FD_SET(conn->fd, &writefs);
if(conn->fd > max_fd) max_fd = conn->fd;
//tv.tv_sec = 3600;
@@ -390,11 +525,11 @@
}
}
if(ret > 0) {
- /* data to read ?, give give ! */
+ /* data to read ?, gimme gimme ! */
if(FD_ISSET(conn->fd, &readfs) ) {
ssize_t len;
- len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_BUF_LEN - (conn->buf_cur - conn->buf) );
+ len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_RECV_BUF_LEN - (conn->buf_cur - conn->buf) );
if(len < 0) {
switch(errno) {
case EAGAIN:
@@ -402,15 +537,11 @@
break;
default:
fprintf(stderr, "read() failed on socket %d: %s\n", conn->fd, strerror(errno));
- conn->fd = -1;
- break;
+ goto disconnected;
}
}
else if(len == 0) {
-#if DEBUG_NET
- printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
-#endif
- conn->fd = -1;
+ goto disconnected;
}
else {
//printf("Read %d\n", len);
@@ -418,8 +549,21 @@
vhffsfssync_parse(conn);
}
}
+
+ /* data to send ?, send send ! */
+ if(conn->messages && FD_ISSET(conn->fd, &writefs) ) {
+ if( vhffsfssync_net_send(conn) ) {
+ goto disconnected;
+ }
+ }
}
}
+// YES, I am evil, I am using goto and I don't care about !
+disconnected:
+#if DEBUG_NET
+ printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+ sleep(1);
}
return 0;