[vhffs-dev] [1265] getevents.c -> vhffsfssync_master.c ; client.c -> vhffsfssync_slave. c |
[ Thread Index |
Date Index
| More vhffs.org/vhffs-dev Archives
]
Revision: 1265
Author: gradator
Date: 2008-10-07 13:13:12 +0200 (Tue, 07 Oct 2008)
Log Message:
-----------
getevents.c -> vhffsfssync_master.c ; client.c -> vhffsfssync_slave.c
Added Paths:
-----------
trunk/vhffs-fssync/vhffsfssync_master.c
trunk/vhffs-fssync/vhffsfssync_slave.c
Removed Paths:
-------------
trunk/vhffs-fssync/client.c
trunk/vhffs-fssync/getevents.c
Deleted: trunk/vhffs-fssync/client.c
===================================================================
--- trunk/vhffs-fssync/client.c 2008-10-07 11:10:20 UTC (rev 1264)
+++ trunk/vhffs-fssync/client.c 2008-10-07 11:13:12 UTC (rev 1265)
@@ -1,431 +0,0 @@
-#define _FILE_OFFSET_BITS 64
-#define _ATFILE_SOURCE
-
-#define DEBUG_NET 0
-#define DEBUG_EVENTS 0
-
-#include <unistd.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <time.h>
-#include <sys/time.h>
-#include <fcntl.h>
-#include <dirent.h>
-#include <string.h>
-#include <errno.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <glib.h>
-#include <sys/select.h>
-#include <signal.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <netinet/in.h>
-
-
-/* -- network stuff -- */
-// huge buffer size reduce syscalls
-#define VHFFSFSSYNC_NET_BUF_LEN 262144
-
-typedef struct {
- int fd;
- struct sockaddr_in sockaddr;
-
- char buf[VHFFSFSSYNC_NET_BUF_LEN];
- uint32_t buf_len;
- char *buf_cur;
-
- FILE *chunk_file;
- size_t chunk_stilltoread;
-} vhffsfssync_conn;
-
-// protos
-int vhffsfssync_remove(char *pathname);
-int vhffsfssync_mkdir(char *pathname, mode_t mode);
-int vhffsfssync_event(vhffsfssync_conn *conn, char *event);
-int vhffsfssync_parse(vhffsfssync_conn *conn);
-
-
-/* ---------------------------------------- */
-int vhffsfssync_remove(char *pathname) {
- struct stat st;
-
- if(! lstat(pathname, &st) ) {
-
- if( S_ISDIR(st.st_mode) ) {
- DIR *d;
- struct dirent *dir;
-
- d = opendir(pathname);
- if(d) {
- while( (dir = readdir(d)) ) {
- if( strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
- char *path;
- char *sep = "/";
- if( pathname[strlen(pathname)-1] == '/' ) sep = "";
- path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
- vhffsfssync_remove(path);
- free(path);
- }
- }
- closedir(d);
- if( rmdir(pathname) < 0) {
- fprintf(stderr, "cannot rmdir() '%s': %s\n", pathname, strerror(errno));
- }
- }
- else {
- fprintf(stderr, "cannot opendir() '%s': %s\n", pathname, strerror(errno));
- }
- }
- else {
- if( unlink(pathname) < 0) {
- fprintf(stderr, "cannot unlink() '%s': %s\n", pathname, strerror(errno));
- }
- }
- }
- else {
- if(errno != ENOENT) {
- fprintf(stderr, "cannot lstat() '%s': %s\n", pathname, strerror(errno));
- return -1;
- }
- }
-
- return 0;
-}
-
-
-int vhffsfssync_mkdir(char *pathname, mode_t mode) {
- char *cur, *dirs[64];
- int i, fd, fd_, argc;
-
- argc = 0;
- cur = pathname;
- while(*cur != '\0' && argc < 64) {
- for( ; *cur != '/' && *cur != '\0' ; cur++ );
- dirs[argc++] = pathname;
- if( *cur == '/' ) {
- *cur = '\0';
- pathname = ++cur;
- }
- }
-
- fd = AT_FDCWD;
- for(i = 0 ; i < argc ; i++) {
- mkdirat(fd, dirs[i], 0755);
- fd_ = openat(fd, dirs[i], 0);
- if(fd >= 0) close(fd);
- fd = fd_;
- if(fd < 0) {
- fprintf(stderr, "openat() failed on %s: %s\n", dirs[i], strerror(errno));
- break;
- }
- }
- if(fd >= 0) close(fd);
- return 0;
-}
-
-
-int vhffsfssync_event(vhffsfssync_conn *conn, char *event) {
- char *cur, *args[10];
- int argc;
-
- argc = 0;
- cur = event;
- while(*cur != '\0' && argc < 10) {
- for( ; *cur != '\x1F' && *cur != '\0' ; cur++ );
- args[argc++] = event;
- if( *cur == '\x1F' ) {
- *cur = '\0';
- event = ++cur;
- }
- }
-
- if(!argc) return -1;
-
-#if DEBUG_EVENTS
- int i;
- for(i = 0 ; i < argc ; i++) {
- printf("%s ", args[i]);
- }
- printf("\n");
-#endif
-
- if(!strcmp(args[0], "remove")) {
- char *pathname = args[1];
- vhffsfssync_remove(pathname);
- }
- else if(!strcmp(args[0], "create")) {
- char *pathname = args[1];
- int fd;
-
- fd = open(pathname, O_CREAT|O_WRONLY|O_TRUNC, 0644);
- if(fd >= 0) {
- close(fd);
- }
- else {
- fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
- }
- }
- else if(!strcmp(args[0], "mkdir")) {
- char *path = args[1];
- vhffsfssync_mkdir(path, 0755);
- }
- else if(!strcmp(args[0], "symlink")) {
- char *from = args[1];
- char *to = args[2];
- symlink(to, from);
- }
- else if(!strcmp(args[0], "move")) {
- char *from = args[1];
- char *to = args[2];
- rename(from, to);
- }
- else if(!strcmp(args[0], "write")) {
- char *pathname = args[1];
- off_t offset = atoll(args[2]);
- size_t size = atoll(args[3]);
- int fd;
-
- conn->chunk_stilltoread = size;
-
- //printf("FILE: %s %lld %d\n", pathname, offset, size);
- if(size > 0) {
- fd = open(pathname, O_CREAT|O_WRONLY, 0644);
- if(fd >= 0) {
- conn->chunk_file = fdopen(fd, "w");
- if(conn->chunk_file) {
- if( fseeko(conn->chunk_file, offset, SEEK_SET) < 0 ) {
- fprintf(stderr, "fseeko() on %lld failed on file %s: %s\n", offset, pathname, strerror(errno));
- fclose(conn->chunk_file);
- conn->chunk_file = NULL;
- }
- }
- else {
- fprintf(stderr, "fdopen() failed on %s: %s\n", pathname, strerror(errno));
- close(fd);
- }
- }
- else {
- fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
- }
- }
- else {
- fd = open(pathname, O_CREAT|O_WRONLY|O_TRUNC, 0644);
- if(fd >= 0) {
- close(fd);
- }
- else {
- fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
- }
- }
- }
- else if(!strcmp(args[0], "hello")) {
- // hello too !
- }
- else {
- fprintf(stderr, "Received unhandled event: %s\n", args[0]);
- exit(1);
- }
-
- return 0;
-}
-
-
-int vhffsfssync_parse(vhffsfssync_conn *conn) {
- char *cur, *end;
-
- cur = conn->buf;
- end = conn->buf + conn->buf_len; //beware: end can be outside the buffer, you should NOT read *end
-
- //printf("Buffer %d, stilltoread: %d\n", conn->buf_len, conn->chunk_stilltoread);
-
- while(cur < end) {
-
- // text mode
- if(!conn->chunk_stilltoread) {
- char *begin;
- for(begin = cur ; cur < end && *cur != '\n' ; cur++ );
-
- if( *cur == '\n' ) {
- *cur = '\0';
- vhffsfssync_event(conn, begin);
- begin = ++cur;
- }
-
- if(cur == end) {
- register uint32_t len;
- len = end - begin;
- //printf("Not parsed %d\n", len);
-
- // buffer is full and we didn't manage to fetch everything
- if(len == VHFFSFSSYNC_NET_BUF_LEN) {
- fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
- conn->buf_cur = conn->buf;
- conn->buf_len = 0;
- break;
- }
-
- // copy the data that is not parsed to the begin of the buffer
- memcpy(conn->buf, begin, len);
- conn->buf_cur = conn->buf + len;
- conn->buf_len = len;
- break;
- }
- }
-
- // binary mode (receiving a chunk)
- else {
- size_t canread = MIN(conn->chunk_stilltoread, end-cur);
- conn->chunk_stilltoread -= canread;
-#if DEBUG_EVENTS
- printf("binary mode: read: %d stilltoread: %d\n", canread, conn->chunk_stilltoread);
-#endif
- if(conn->chunk_file) {
- /* TODO: handle errors and use a bigger nmemb */
- size_t len = fwrite(cur, 1, canread, conn->chunk_file);
-#if DEBUG_EVENTS
- printf(" written=%d\n", len);
-#endif
- if(len != canread) {
- /* TODO: Handle errors */
- }
- if(!conn->chunk_stilltoread) {
-#if DEBUG_EVENTS
- printf(" closing file\n");
-#endif
- fclose(conn->chunk_file);
- conn->chunk_file = NULL;
- }
- }
- cur += canread;
-
- // all the data have been read, resetting buffer
- if(cur == end) {
- conn->buf_cur = conn->buf;
- conn->buf_len = 0;
- break;
- }
- }
- }
- return 0;
-}
-
-
-int main(int argc, char *argv[]) {
-
- char *root, *host;
- int flags, port;
- vhffsfssync_conn *conn;
-
- /* chdir() to the filesystem to write the data */
- root = argv[1];
- if( chdir(root) < 0) {
- fprintf(stderr, "cannot chdir() to %s: %s\n", root, strerror(errno));
- return -1;
- }
- root = ".";
-
- /* -- network stuff -- */
- host = argv[2];
- port = atoi(argv[3]);
- signal(SIGPIPE, SIG_IGN);
- conn = malloc(sizeof(vhffsfssync_conn));
-
- /* -- main loop -- */
- while(1) {
- conn->fd = -1;
- conn->buf_len = 0;
- conn->buf_cur = conn->buf;
- conn->chunk_stilltoread = 0;
- conn->chunk_file = NULL;
-
- /* connect */
- inet_aton(host, &conn->sockaddr.sin_addr);
- conn->sockaddr.sin_family = AF_INET;
- conn->sockaddr.sin_port = htons(port);
-
- conn->fd = socket(conn->sockaddr.sin_family, SOCK_STREAM, IPPROTO_IP);
- if(conn->fd < 0) {
- fprintf(stderr, "socket() failed: %s\n", strerror(errno));
- return -1;
- }
-
- if( connect(conn->fd, (struct sockaddr*)&conn->sockaddr, sizeof(conn->sockaddr)) < 0 ) {
-
- fprintf(stderr, "connect() failed: %s\n", strerror(errno));
- shutdown(conn->fd, SHUT_RDWR);
- close(conn->fd);
- conn->fd = -1;
- sleep(1);
- continue;
- }
-
- /* set newfd to non-blocking */
- flags = fcntl(conn->fd, F_GETFL);
- if(flags >= 0) {
- flags |= O_NONBLOCK;
- fcntl(conn->fd, F_SETFL, flags);
- }
-
- /* connected */
- while(conn->fd >= 0) {
- int max_fd = 0;
- fd_set readfs;
- fd_set writefs;
- //struct timeval tv;
- int ret;
-
- FD_ZERO(&readfs);
- FD_ZERO(&writefs);
-
- FD_SET(conn->fd, &readfs);
- if(conn->fd > max_fd) max_fd = conn->fd;
-
- //tv.tv_sec = 3600;
- //tv.tv_usec = 0;
- ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
- if(ret < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
- }
- }
- if(ret > 0) {
- /* data to read ?, give give ! */
- if(FD_ISSET(conn->fd, &readfs) ) {
- ssize_t len;
-
- len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_BUF_LEN - (conn->buf_cur - conn->buf) );
- if(len < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- fprintf(stderr, "read() failed on socket %d: %s\n", conn->fd, strerror(errno));
- conn->fd = -1;
- break;
- }
- }
- else if(len == 0) {
-#if DEBUG_NET
- printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
-#endif
- conn->fd = -1;
- }
- else {
- //printf("Read %d\n", len);
- conn->buf_len += len;
- vhffsfssync_parse(conn);
- }
- }
- }
- }
- }
-
- return 0;
-}
Deleted: trunk/vhffs-fssync/getevents.c
===================================================================
--- trunk/vhffs-fssync/getevents.c 2008-10-07 11:10:20 UTC (rev 1264)
+++ trunk/vhffs-fssync/getevents.c 2008-10-07 11:13:12 UTC (rev 1265)
@@ -1,1331 +0,0 @@
-#define _FILE_OFFSET_BITS 64
-
-#define DEBUG_NET 0
-#define DEBUG_INOTIFY 0
-
-#include <unistd.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <time.h>
-#include <sys/time.h>
-#include <fcntl.h>
-#include <dirent.h>
-#include <string.h>
-#include <errno.h>
-#include <sys/inotify.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <glib.h>
-#include <sys/select.h>
-#include <signal.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <netinet/in.h>
-#include <sys/sendfile.h>
-
-
-/* TODO: vérifier qu'un nom de fichier ne contient pas un \x1F, sinon c'est le segfault assuré du client */
-/* convertir les \x1F vers un espace lors de l'émission de l'évent ? */
-
-/* -- inotify stuff -- */
-
-#define VHFFSFSSYNC_BUF_LEN 4096
-#define VHFFSFSSYNC_WATCH_MASK IN_ATTRIB|IN_CLOSE_WRITE|IN_CREATE|IN_DELETE|IN_DELETE_SELF|IN_MODIFY|IN_MOVE_SELF|IN_MOVED_FROM|IN_MOVED_TO|IN_DONT_FOLLOW|IN_ONLYDIR
-// Not used: IN_ACCESS, IN_CLOSE_NOWRITE, IN_OPEN
-
-// the maximum number of files simultaneously opened
-// huge values offer better performance
-// actually it is MAX = VHFFSFSSYNC_MAX_OPENFILES + number of clients
-#define VHFFSFSSYNC_MAX_OPENFILES 512
-
-// each monitor entry is associated with a path, we need to keep it to compute the path
-//char **vhffsfssync_wd_to_path = NULL;
-//int vhffsfssync_wd_to_path_len = 0; // number of allocated paths
-GHashTable *vhffsfssync_wd_to_path;
-GHashTable *vhffsfssync_path_to_wd;
-
-// return a timestamp in ms (it loops for 100000 sec)
-/*inline int vhffsfssync_timestamp() {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- return (tv.tv_sec%100000)*1000+tv.tv_usec/1000;
-}*/
-
-struct vhffsfssync_cookie {
- uint32_t id;
- char *from;
- gboolean isdir;
-};
-static struct vhffsfssync_cookie vhffsfssync_cookie;
-
-int vhffsfssync_openfiles;
-
-// protos
-int vhffsfssync_add_watch(int inotifyfd, const char *pathname, uint32_t mask);
-int vhffsfssync_del_watch(int inotifyfd, const char *pathname, int wd);
-int vhffsfssync_add_watch_recursively(int inotifyfd, const char *pathname, uint32_t mask);
-int vhffsfssync_manage_event_create(int inotifyfd, char *pathname, gboolean sendfile);
-int vhffsfssync_manage_event(int inotifyfd, struct inotify_event *event);
-int vhffsfssync_fake_events_recursively(int inotifyfd, char *pathname);
-
-
-/* -- network stuff -- */
-#define VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK 65536
-
-GList *vhffsfssync_conns;
-
-typedef struct {
- int fd;
- struct sockaddr_in sockaddr;
- GList *messages;
- uint32_t order;
-// char *recvbuf;
-// ssize_t recvbuf_cur;
-// ssize_t recvbuf_size;
-// char *sendbuf;
-// ssize_t sendbuf_cur;
-// ssize_t sendbuf_size;
-} vhffsfssync_conn;
-
-
-/* message - generic */
-typedef unsigned short int msg_family_t;
-enum {
- VHFFSFSSYNC_NET_MESSAGE_UNSPEC=0,
- VHFFSFSSYNC_NET_MESSAGE_DATA,
- VHFFSFSSYNC_NET_MESSAGE_FILE
-};
-
-/* message - priorities */
-enum {
- VHFFSFSSYNC_NET_PRIO_HIGHEST=100, // values < 100 may be used internally, please don't set anything below 100 or die!
- VHFFSFSSYNC_NET_PRIO_HIGH, // values >= 1000 are used for files, don't use them too
- VHFFSFSSYNC_NET_PRIO_MEDIUM,
- VHFFSFSSYNC_NET_PRIO_LOW,
- VHFFSFSSYNC_NET_PRIO_LOWEST
-};
-
-
-#define __VHFFSFSSYNC_NET_MESSAGE_COMMON(msg_prefix) \
- msg_family_t msg_prefix##family; \
- uint32_t msg_prefix##priority; \
- uint32_t msg_prefix##order
-
-#define __VHFFSFSSYNC_NET_MESSAGE_COMMON_SIZE ( sizeof (msg_family_t) + sizeof(uint32_t) + sizeof(uint32_t) )
-
-typedef struct {
- __VHFFSFSSYNC_NET_MESSAGE_COMMON(msg_);
- char msg_data[200];
-} vhffsfssync_net_message;
-
-/* message - common data */
-typedef struct {
- __VHFFSFSSYNC_NET_MESSAGE_COMMON(data_);
- char *data_buffer;
- ssize_t data_len;
- ssize_t data_cur;
-
- /* pad to size of `vhffsfssync_net_message' */
- unsigned char sin_zero[ sizeof(vhffsfssync_net_message)
- - __VHFFSFSSYNC_NET_MESSAGE_COMMON_SIZE
- - sizeof(char*)
- - sizeof(ssize_t)
- - sizeof(ssize_t) ];
-} vhffsfssync_net_message_data;
-
-/* message - file */
-typedef struct {
- __VHFFSFSSYNC_NET_MESSAGE_COMMON(file_);
- FILE *file_stream;
- off_t file_offset;
- off_t file_size;
- off_t file_chunksize;
- off_t file_chunkcur;
- char *file_pathname;
-
- /* pad to size of `vhffsfssync_net_message' */
- unsigned char sin_zero[ sizeof(vhffsfssync_net_message)
- - __VHFFSFSSYNC_NET_MESSAGE_COMMON_SIZE
- - sizeof(FILE*)
- - sizeof(off_t)
- - sizeof(off_t)
- - sizeof(off_t)
- - sizeof(off_t)
- - sizeof(char*) ];
-} vhffsfssync_net_message_file;
-
-
-// protos
-void vhffsfssync_net_conn_disable(vhffsfssync_conn *conn);
-void vhffsfssync_net_conn_destroy(vhffsfssync_conn *conn);
-inline vhffsfssync_net_message *vhffsfssync_net_new_message(vhffsfssync_conn *conn, msg_family_t family, uint32_t priority);
-gint vhffsfssync_net_message_insert_compare(gconstpointer a, gconstpointer b);
-inline void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg);
-int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len, uint32_t priority);
-void vhffsfssync_net_destroy_data(vhffsfssync_conn *conn, vhffsfssync_net_message_data *datamsg);
-inline int vhffsfssync_net_send_string(vhffsfssync_conn *conn, char *data, uint32_t priority) ;
-void vhffsfssync_net_broadcast_string(char *data, uint32_t priority);
-int vhffsfssync_net_send_file(vhffsfssync_conn *conn, char *pathname);
-void vhffsfssync_net_destroy_file(vhffsfssync_conn *conn, vhffsfssync_net_message_file *filemsg);
-FILE *vhffsfssync_net_file_open(vhffsfssync_conn *conn, const char *pathname, const char *mode);
-int vhffsfssync_net_file_close(vhffsfssync_conn *conn, FILE *stream);
-int vhffsfssync_net_remove_file(vhffsfssync_conn *conn, char *pathname);
-void vhffsfssync_net_broadcast_file(char *pathname);
-int vhffsfssync_net_send(vhffsfssync_conn *conn);
-
-
-/* ----------------------------------------- */
-void vhffsfssync_net_conn_disable(vhffsfssync_conn *conn) {
- GList *msgs;
-
- if(conn->fd >= 0) {
-#if DEBUG_NET
- printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
-#endif
- shutdown(conn->fd, SHUT_RDWR);
- close(conn->fd);
- }
- conn->fd = -1;
-
- while( (msgs = g_list_first(conn->messages)) ) {
- vhffsfssync_net_destroy_message(conn, (vhffsfssync_net_message*)msgs->data );
- }
-}
-
-
-void vhffsfssync_net_conn_destroy(vhffsfssync_conn *conn) {
- vhffsfssync_conns = g_list_remove(vhffsfssync_conns, conn);
- vhffsfssync_net_conn_disable(conn);
- free(conn);
-}
-
-
-/* -- network stuff -- */
-inline vhffsfssync_net_message *vhffsfssync_net_new_message(vhffsfssync_conn *conn, msg_family_t family, uint32_t priority) {
- vhffsfssync_net_message *msg;
- msg = malloc( sizeof(vhffsfssync_net_message) );
- msg->msg_family = family;
- msg->msg_priority = priority;
- msg->msg_order = conn->order++;
- return msg;
-}
-
-
-gint vhffsfssync_net_message_insert_compare(gconstpointer a, gconstpointer b) {
- vhffsfssync_net_message *first = (vhffsfssync_net_message*)a;
- vhffsfssync_net_message *second = (vhffsfssync_net_message*)b;
-
- // lowest priority is preferred
- if(first->msg_priority != second->msg_priority)
- return first->msg_priority - second->msg_priority;
-
- // lowest order is preferred
- return first->msg_order - second->msg_order;
-}
-
-
-inline void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg) {
- if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_DATA) {
- vhffsfssync_net_destroy_data(conn, (vhffsfssync_net_message_data*)msg);
- }
- else if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
- vhffsfssync_net_destroy_file(conn, (vhffsfssync_net_message_file*)msg);
- }
-}
-
-
-// !!!!!! the buffer is freed when the message has been sent, DON'T send static string and DON'T free() the data yourself
-int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len, uint32_t priority) {
- vhffsfssync_net_message_data *msg;
- if(!conn || !data || len <= 0) return -1;
-
- msg = (vhffsfssync_net_message_data*)vhffsfssync_net_new_message(conn, VHFFSFSSYNC_NET_MESSAGE_DATA, priority);
- msg->data_buffer = data;
- msg->data_len = len;
- msg->data_cur = 0;
- conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
-
- return 0;
-}
-
-
-void vhffsfssync_net_destroy_data(vhffsfssync_conn *conn, vhffsfssync_net_message_data *datamsg) {
- conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)datamsg);
- free(datamsg->data_buffer);
- free(datamsg);
-}
-
-
-inline int vhffsfssync_net_send_string(vhffsfssync_conn *conn, char *data, uint32_t priority) {
- return vhffsfssync_net_send_data(conn, data, strlen(data), priority);
-}
-
-
-void vhffsfssync_net_broadcast_string(char *data, uint32_t priority) {
-
- GList *conns;
- for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
- vhffsfssync_conn *conn = conns->data;
- conns = g_list_next(conns);
-
- vhffsfssync_net_send_string(conn, strdup(data), priority);
- }
- free(data);
-}
-
-
-// prototype is simple, files are always the lowest of the lowest priority messages
-int vhffsfssync_net_send_file(vhffsfssync_conn *conn, char *pathname) {
- vhffsfssync_net_message_file *msg;
- struct stat st;
- uint32_t maxprio = -1; // 4294967295
- FILE *stream;
-
- if(!conn || !pathname) {
- return -1;
- }
-
- if( lstat(pathname, &st) < 0 ) {
- fprintf(stderr, "lstat() failed on %s: %s\n", pathname, strerror(errno));
- return -1;
- }
-
- /* only copy regular files */
- if(! S_ISREG(st.st_mode) ) {
- fprintf(stderr, "%s is not a regular file\n", pathname);
- return -1;
- }
-
- stream = vhffsfssync_net_file_open(conn, pathname, "r");
- if(!stream) {
- return -1;
- }
-
- // if the file is being sent, cancel it
- vhffsfssync_net_remove_file(conn, pathname);
- //printf("%d SENDING FILE %s\n", conn->fd, pathname);
- vhffsfssync_net_send_string( conn, g_strdup_printf("create\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
-
- // the size of the file is the priority (small files are sent with more priority)
- // but don't set the priority too low, low value can be used for anything else
- msg = (vhffsfssync_net_message_file*)vhffsfssync_net_new_message(conn, VHFFSFSSYNC_NET_MESSAGE_FILE, MAX(MIN(st.st_size, maxprio),1000) );
- msg->file_stream = stream;
- msg->file_offset = 0;
- msg->file_pathname = strdup(pathname);
- msg->file_size = st.st_size;
- msg->file_chunksize = -1;
- msg->file_chunkcur = 0;
- conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
-
- return 0;
-}
-
-
-void vhffsfssync_net_destroy_file(vhffsfssync_conn *conn, vhffsfssync_net_message_file *filemsg) {
- conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)filemsg);
-
- /* we need to finish the chunk anyway */
- if(filemsg->file_chunksize > 0) {
- off_t len = filemsg->file_chunksize - filemsg->file_chunkcur;
- if(len > 0) {
- char *data = malloc(len);
- memset(data, 0, len);
- vhffsfssync_net_send_data(conn, data, len, 0);
- }
- }
-
- vhffsfssync_net_file_close(conn, filemsg->file_stream);
- free(filemsg->file_pathname);
- free(filemsg);
-}
-
-
-FILE *vhffsfssync_net_file_open(vhffsfssync_conn *conn, const char *pathname, const char *mode) {
- FILE *stream;
-
- if(!conn || !pathname || !mode)
- return NULL;
-
- stream = fopen(pathname, mode);
- if(!stream) {
- fprintf(stderr, "fopen() failed on %s: %s\n", pathname, strerror(errno));
- return stream;
- }
-
- vhffsfssync_openfiles++;
- // do we need to free a file fd ?
- if(vhffsfssync_openfiles > VHFFSFSSYNC_MAX_OPENFILES) {
- GList *msgs;
- // we prefer to fclose() the fd of the msg with the lowest priority
- for(msgs = g_list_last(conn->messages) ; msgs ; ) {
- vhffsfssync_net_message *msg = msgs->data;
- msgs = g_list_previous(msgs);
-
- if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
- vhffsfssync_net_message_file *filemsg = (vhffsfssync_net_message_file*)msg;
-
- if(filemsg->file_stream && filemsg->file_chunksize < 0) {
- vhffsfssync_net_file_close(conn, filemsg->file_stream);
- filemsg->file_stream = NULL;
- break;
- }
- }
- }
- }
-
- return stream;
-}
-
-
-int vhffsfssync_net_file_close(vhffsfssync_conn *conn, FILE *stream) {
- int r;
-
- if(!stream)
- return -1;
-
- r = fclose(stream);
- if( r ) {
- fprintf(stderr, "fclose() failed: %s\n", strerror(errno));
- return r;
- }
- vhffsfssync_openfiles--;
- return r;
-}
-
-
-void vhffsfssync_net_broadcast_file(char *pathname) {
-
- GList *conns;
- for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
- vhffsfssync_conn *conn = conns->data;
- conns = g_list_next(conns);
- vhffsfssync_net_send_file(conn, pathname);
- }
-}
-
-
-int vhffsfssync_net_remove_file(vhffsfssync_conn *conn, char *pathname) {
- GList *msgs;
- for(msgs = g_list_first(conn->messages) ; msgs ; ) {
- vhffsfssync_net_message *msg = msgs->data;
- msgs = g_list_next(msgs);
-
- if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
- vhffsfssync_net_message_file *filemsg = (vhffsfssync_net_message_file*)msg;
- if(!strcmp(filemsg->file_pathname, pathname)) {
- //printf("%d CANCELLING %s\n", conn->fd, pathname);
- vhffsfssync_net_destroy_file(conn, filemsg);
- }
- }
- }
- return 0;
-}
-
-
-int vhffsfssync_net_send(vhffsfssync_conn *conn) {
- GList *msgs;
- gboolean full = FALSE;
-
- if(!conn || conn->fd < 0) return -1;
-#if DEBUG_NET
- printf("--------------------------------------------------\n");
- printf("conn: %d, to: %s\n", conn->fd, inet_ntoa(conn->sockaddr.sin_addr));
-#endif
- while(!full && conn->fd >= 0 && (msgs = g_list_first(conn->messages)) ) {
- vhffsfssync_net_message *msg = msgs->data;
-#if DEBUG_NET
- printf(" family: %d , priority: %d , order: %d\n", msg->msg_family, msg->msg_priority, msg->msg_order);
-#endif
- // data
- if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_DATA) {
- vhffsfssync_net_message_data *datamsg;
- ssize_t written;
- ssize_t lentowrite;
-
- // we need to make sure that the message will not be truncated, we set the current message to the highest priority
- msg->msg_priority = 0;
-
- datamsg = (vhffsfssync_net_message_data*)msg;
-#if DEBUG_NET
- printf(" buffer: %d bytes, %d already written\n", datamsg->data_len, datamsg->data_cur);
-#endif
- /* try to empty the buffer */
- lentowrite = datamsg->data_len - datamsg->data_cur;
- written = write(conn->fd, datamsg->data_buffer + datamsg->data_cur, lentowrite);
- if(written < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
-#if DEBUG_NET
- printf("=====> EAGAIN on write()\n");
-#endif
- full = TRUE;
- break;
- default:
- fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
- vhffsfssync_net_conn_disable(conn);
- }
- }
- else {
- datamsg->data_cur += written;
-#if DEBUG_NET
- printf(" %d bytes written on %d bytes\n", written, lentowrite);
-#endif
- /* the buffer is not empty yet (but the SendQ into the kernel is) */
- if(written < lentowrite) {
- full = TRUE;
- }
- /* buffer is now empty */
- else {
- vhffsfssync_net_destroy_data(conn, datamsg);
- }
- }
- }
-
- // file
- else if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
- vhffsfssync_net_message_file *filemsg;
- ssize_t written;
- off_t lentowrite;
-
- filemsg = (vhffsfssync_net_message_file*)msg;
-#if DEBUG_NET
- printf(" file: %s, offset = %lld, size = %lld\n", filemsg->file_pathname, (long long int)filemsg->file_offset, (long long int)filemsg->file_size);
-#endif
- /* new chunk */
- if(filemsg->file_chunksize < 0) {
- if(!filemsg->file_stream) {
- filemsg->file_stream = vhffsfssync_net_file_open(conn, filemsg->file_pathname, "r");
- if(!filemsg->file_stream) {
- vhffsfssync_net_destroy_file(conn, filemsg);
- break;
- }
- }
-
- lentowrite = filemsg->file_chunksize = MIN(VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK, filemsg->file_size - filemsg->file_offset);
- filemsg->file_chunkcur = 0;
- // we need to make sure that the chunk will not be truncated, we set the current message to the highest priority
- msg->msg_priority = 1;
- vhffsfssync_net_send_string(conn, g_strdup_printf("write\x1F%s\x1F%lld\x1F%lld\n", filemsg->file_pathname, (long long int)filemsg->file_offset, (long long int)filemsg->file_chunksize) , 0);
- // we need to reset here in order to consider the new priorities
- continue;
- }
- /* the previous chunk was partially sent */
- else {
- lentowrite = filemsg->file_chunksize - filemsg->file_chunkcur;
- }
-
- /* try to send the file */
- written = sendfile(conn->fd, fileno(filemsg->file_stream), &filemsg->file_offset, lentowrite);
- if(written < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
-#if DEBUG_NET
- printf("=====> EAGAIN on sendfile()\n");
-#endif
- full = TRUE;
- break;
- default:
- fprintf(stderr, "sendfile() failed from file %s to socket %d: %s\n", filemsg->file_pathname, conn->fd, strerror(errno));
- vhffsfssync_net_conn_disable(conn);
- }
- }
- else {
-#if DEBUG_NET
- printf(" %d bytes written, we are at offset %lld\n", written, filemsg->file_offset);
-#endif
- filemsg->file_chunkcur += written;
-
- /* end of file or file completed */
- if( written == 0 || filemsg->file_offset == filemsg->file_size ) {
- vhffsfssync_net_destroy_file(conn, filemsg);
- }
-
- /* the chunk is not fully sent yet */
- else if(written < lentowrite) {
- full = TRUE;
- }
-
- /* the chunk is sent */
- else if(written == lentowrite) {
- uint32_t maxprio = -1; // 4294967295
-
- filemsg->file_chunksize = -1;
- filemsg->file_chunkcur = 0;
-
- // reschedule this file to a nicer priority
- msg->msg_priority = MAX(MIN(filemsg->file_size - filemsg->file_offset, maxprio),1000);
- conn->messages = g_list_remove(conn->messages, msg);
- conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
- }
- }
- }
-
- // I don't want to stay in this jail
- else full = TRUE;
- }
-
- return 0;
-}
-
-
-#if 0
-int vhffsfssync_net_write(vhffsfssync_conn *conn, char *buffer, ssize_t len) {
-
- ssize_t written = -1;
-
- /* new data to send */
- if(buffer && len > 0) {
- written = 0;
- // buffer is empty, try to send the data directly (it avoids copying the data twice)
- if(conn->sendbuf_size == 0) {
- written = write(conn->fd, buffer, len);
- if(written < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- printf("=====> EAGAIN on write()\n");
- break;
- default:
- fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
- }
- written = 0;
- }
- }
-
- printf("%d bytes written on %d bytes\n", written, len);
-
- /* this is the case if the buffer is not empty, or if the write failed, or if the write didn't write everything */
- if(written < len) {
- ssize_t lentobuf = len-written;
- printf("buffering %d bytes\n", lentobuf);
-
- /* As you may have noticed, the buffer is only growing as long as it still
- * contains data to be sent, here we try to deal with that, as it needs data
- * to be copied, we hope that it is not going to happen very often
- */
-
- conn->sendbuf = realloc(conn->sendbuf, conn->sendbuf_size+lentobuf);
- memcpy(conn->sendbuf+conn->sendbuf_size, buffer+written, lentobuf);
- conn->sendbuf_size += lentobuf;
- printf("buffer size = %d\n", conn->sendbuf_size);
- }
- }
-
- /* try to empty the buffer */
- if(!buffer && len == 0 && conn->sendbuf_size > 0) {
- ssize_t lentowrite = conn->sendbuf_size - conn->sendbuf_cur;
- written = write(conn->fd, conn->sendbuf + conn->sendbuf_cur, lentowrite);
- if(written < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- printf("=====> EAGAIN on write()\n");
- break;
- default:
- fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
- }
- written = 0;
- }
-
- if(written > 0) {
- printf("%d bytes written on %d bytes\n", written, lentowrite);
-
- /* the buffer is not empty yet */
- if(written < lentowrite) {
- conn->sendbuf_cur += written;
- }
- /* buffer is now empty */
- else {
- printf("buffer is now empty\n");
- free(conn->sendbuf);
- conn->sendbuf = NULL;
- conn->sendbuf_cur = 0;
- conn->sendbuf_size = 0;
- }
- }
- }
-
- return written;
-}
-#endif
-
-
-
-/* -- inotify stuff -- */
-
-int vhffsfssync_add_watch(int inotifyfd, const char *pathname, uint32_t mask) {
-
- int wd;
- int *_wd;
- char *_pathname;
-
- if( g_hash_table_lookup(vhffsfssync_path_to_wd, pathname) ) {
- return -1;
- }
-
- wd = inotify_add_watch(inotifyfd, pathname, mask);
- if(wd < 0) {
- if(errno == ENOSPC) {
- fprintf(stderr, "Maximum number of watches reached, consider adding more...\n");
- }
- return wd;
- }
-
- _wd = g_new(int, 1);
- *_wd = wd;
- _pathname = g_strdup(pathname);
- g_hash_table_insert(vhffsfssync_wd_to_path, _wd, _pathname);
- g_hash_table_insert(vhffsfssync_path_to_wd, _pathname, _wd);
-
-// if(wd >= vhffsfssync_wd_to_path_len) {
-// vhffsfssync_wd_to_path_len = ( (wd >>10) +1) <<10;
-// vhffsfssync_wd_to_path = realloc( vhffsfssync_wd_to_path, vhffsfssync_wd_to_path_len * sizeof(void*) );
-// }
-// vhffsfssync_wd_to_path[wd] = strdup(pathname);
-#if DEBUG_INOTIFY
- printf("+ %d %s\n", wd, pathname);
-#endif
- if(g_hash_table_size(vhffsfssync_wd_to_path) != g_hash_table_size(vhffsfssync_path_to_wd)) {
- exit(-1);
- }
- return wd;
-}
-
-
-int vhffsfssync_del_watch(int inotifyfd, const char *pathname, int wd) {
-
- if(!pathname && wd > 0) {
- pathname = (char*)g_hash_table_lookup(vhffsfssync_wd_to_path, &wd);
- // this wd has already been deleted
- if(!pathname) return -1;
- }
- else if(pathname && wd == 0) {
- int *_wd;
- _wd = g_hash_table_lookup(vhffsfssync_path_to_wd, pathname);
- // this wd has already been deleted
- if(!_wd) return -1;
- wd = *_wd;
- }
- else return -1;
-#if DEBUG_INOTIFY
- printf("- %d %s\n", wd, pathname);
-#endif
- g_hash_table_remove(vhffsfssync_path_to_wd, pathname);
- g_hash_table_remove(vhffsfssync_wd_to_path, &wd);
- if(g_hash_table_size(vhffsfssync_wd_to_path) != g_hash_table_size(vhffsfssync_path_to_wd)) {
- exit(-1);
- }
- return inotify_rm_watch(inotifyfd, wd);
-}
-
-
-int vhffsfssync_modify_watch(int inotifyfd, const char *from, const char *to) {
-
- int wd;
- int *_wd;
- char *_to;
-
- _wd = g_hash_table_lookup(vhffsfssync_path_to_wd, from);
- if(!_wd) return -1;
- wd = *_wd;
-
- g_hash_table_remove(vhffsfssync_path_to_wd, from);
- g_hash_table_remove(vhffsfssync_wd_to_path, _wd);
-
- _wd = g_new(int, 1);
- *_wd = wd;
- _to = strdup(to);
- g_hash_table_insert(vhffsfssync_wd_to_path, _wd, _to);
- g_hash_table_insert(vhffsfssync_path_to_wd, _to, _wd);
-
-#if DEBUG_INOTIFY
- printf("= %d %s -> %s\n", wd, from, to);
-#endif
- return 0;
-}
-
-
-int vhffsfssync_add_watch_recursively(int inotifyfd, const char *pathname, uint32_t mask) {
-
- int wd;
- DIR *d;
- struct dirent *dir;
-
- wd = vhffsfssync_add_watch(inotifyfd, pathname, mask);
- if(wd < 0) return wd;
-
- d = opendir(pathname);
- if(d) {
- while( (dir = readdir(d)) ) {
- if(dir->d_type == DT_DIR && strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
- char *path;
- char *sep = "/";
- if( pathname[strlen(pathname)-1] == '/' ) sep = "";
- path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
- wd = vhffsfssync_add_watch_recursively(inotifyfd, path, mask);
- free(path);
- }
- }
- closedir(d);
- }
-
- return wd;
-}
-
-
-int vhffsfssync_manage_event_remove(int inotifyfd, char *pathname) {
- GList *conns;
- int *_wd;
-
-#if DEBUG_INOTIFY
- printf("==> REMOVE %s\n", pathname);
-#endif
-
- if( (_wd = g_hash_table_lookup(vhffsfssync_path_to_wd, pathname)) ) {
- vhffsfssync_del_watch(inotifyfd, NULL, *_wd);
- }
-
- /* connections */
- for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
- vhffsfssync_conn *conn = conns->data;
- conns = g_list_next(conns);
- vhffsfssync_net_remove_file(conn, pathname);
- vhffsfssync_net_send_string(conn, g_strdup_printf("remove\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
- }
- return 0;
-}
-
-
-int vhffsfssync_manage_event_create(int inotifyfd, char *pathname, gboolean sendfile) {
- struct stat st;
-
- if(! lstat(pathname, &st) ) {
-
- if( S_ISREG(st.st_mode) ) {
-#if DEBUG_INOTIFY
- printf("==> CREATE %s\n", pathname);
-#endif
- vhffsfssync_net_broadcast_string( g_strdup_printf("create\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
- if(sendfile && st.st_size > 0) {
- vhffsfssync_net_broadcast_file(pathname);
- }
- }
-
- if( S_ISDIR(st.st_mode) ) {
-#if DEBUG_INOTIFY
- printf("==> MKDIR %s\n", pathname);
-#endif
- vhffsfssync_add_watch(inotifyfd, pathname, VHFFSFSSYNC_WATCH_MASK);
- vhffsfssync_net_broadcast_string( g_strdup_printf("mkdir\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
- /* there is a short delay between the mkdir() and the add_watch(),
- we need to send events about the data which have already been written */
- vhffsfssync_fake_events_recursively( inotifyfd, pathname );
- }
-
- else if( S_ISLNK(st.st_mode) ) {
- char *linkto;
- int ret;
- linkto = malloc(st.st_size +1);
- ret = readlink(pathname, linkto, st.st_size);
- if( ret >= 0 ) {
- linkto[st.st_size] = '\0';
-#if DEBUG_INOTIFY
- printf("==> SYMLINK %s -> %s\n", pathname, linkto);
-#endif
- vhffsfssync_net_broadcast_string( g_strdup_printf("symlink\x1F%s\x1F%s\n", pathname, linkto) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
- }
- free(linkto);
- if(ret < 0) {
- if(errno == ENOENT) {
- // file already disappeared (common for temporary files)
- vhffsfssync_manage_event_remove(inotifyfd, pathname);
- } else {
- fprintf(stderr, "cannot readlink() '%s': %s\n", pathname, strerror(errno));
- return -1;
- }
- }
-
- }
- /* we don't need other file types (chr, block, fifo, socket, ...) */
- }
- else {
- if(errno == ENOENT) {
- // file already disappeared (common for temporary files)
- vhffsfssync_manage_event_remove(inotifyfd, pathname);
- } else {
- fprintf(stderr, "cannot lstat() '%s': %s\n", pathname, strerror(errno));
- return -1;
- }
- }
-
- return 0;
-}
-
-
-int vhffsfssync_manage_event(int inotifyfd, struct inotify_event *event) {
-
- char *dirpath, *pathname;
-#if DEBUG_INOTIFY
- printf("wd=%d mask=%x cookie=%d len=%d", event->wd, event->mask, event->cookie, event->len);
- if(event->len > 0) printf(" name=%s", event->name);
- printf("\n");
-#endif
-
- if(event->wd < 0) {
- fprintf(stderr, "Maximum number of events reached, some events are lost\n");
- return -1;
- }
-
- dirpath = (char*)g_hash_table_lookup(vhffsfssync_wd_to_path, &event->wd);
- // this wd has been deleted
- if(!dirpath) return -1;
-
- if(event->len > 0) {
- pathname = g_strdup_printf("%s/%s", dirpath, event->name);
- } else {
- pathname = strdup(dirpath);
- }
-
- // this event is not waiting for a cookie, delete file if necessary (IN_MOVED_FROM not followed with IN_MOVED_TO)
- if( !(event->mask & IN_MOVED_TO) && vhffsfssync_cookie.id ) {
-
- vhffsfssync_manage_event_remove(inotifyfd, vhffsfssync_cookie.from);
- vhffsfssync_cookie.id = 0;
- free(vhffsfssync_cookie.from);
- }
-
- // useless at the moment (chmod, chown, touch, ...)
- if( event->mask & IN_ATTRIB ) {
-#if DEBUG_INOTIFY
- printf("IN_ATTRIB\n");
-#endif
-
- // new file, directory, or symlink
- } else if( event->mask & IN_CREATE ) {
-#if DEBUG_INOTIFY
- printf("IN_CREATE\n");
-#endif
- vhffsfssync_manage_event_create(inotifyfd, pathname, FALSE);
-
- // deleted file, directory or symlink
- } else if( event->mask & IN_DELETE ) {
-#if DEBUG_INOTIFY
- printf("IN_DELETE\n");
-#endif
- vhffsfssync_manage_event_remove(inotifyfd, pathname);
-
- // watch deleted, not used
- } else if( event->mask & IN_DELETE_SELF ) {
-#if DEBUG_INOTIFY
- printf("IN_DELETE_SELF\n");
-#endif
- // We don't send REMOVE here because the dir can be deleted before the
- // event was added, in this case the add_watch failed to monitor this dir
- // and we'll not receive a IN_DELETE_SELF for it
- //
- //implicit deletion (IN_IGNORED will follow this event)
- //vhffsfssync_del_watch(inotifyfd, event->wd);
-
- // file modified
- } else if( event->mask & IN_MODIFY ) {
-#if DEBUG_INOTIFY
- printf("IN_MODIFY\n");
- /* we can send the data here */
- printf("==> SEND %s\n", pathname);
-#endif
- // file modified and closed
- } else if( event->mask & IN_CLOSE_WRITE ) {
-#if DEBUG_INOTIFY
- printf("IN_CLOSE_WRITE\n");
- /* we must send the data here */
- printf("==> SEND %s\n", pathname);
-#endif
- vhffsfssync_net_broadcast_file(pathname);
-
- // watch moved, not used
- } else if( event->mask & IN_MOVE_SELF ) {
-#if DEBUG_INOTIFY
- printf("IN_MOVE_SELF\n");
-#endif
- // not needed (we can rely on IN_MOVED_FROM and IN_MOVED_TO)
-
- // file/symlink/directory moved
- //
- // only from: delete the file/symlink/directory
- // only to: create the file/symlink/directory
- // both: mv the file/symlink/directory
- //
- } else if( event->mask & IN_MOVED_FROM ) {
-#if DEBUG_INOTIFY
- printf("IN_MOVED_FROM\n");
-#endif
- // set the cookie
- vhffsfssync_cookie.id = event->cookie;
- vhffsfssync_cookie.from = strdup(pathname);
- vhffsfssync_cookie.isdir = !!( event->mask & IN_ISDIR );
-
- } else if( event->mask & IN_MOVED_TO ) {
-#if DEBUG_INOTIFY
- printf("IN_MOVED_TO\n");
-#endif
- // mv
- if(vhffsfssync_cookie.id == event->cookie) {
-#if DEBUG_INOTIFY
- printf("==> MOVE %s -> %s (used cookie %d)\n", vhffsfssync_cookie.from, pathname, vhffsfssync_cookie.id);
-#endif
- if( vhffsfssync_cookie.isdir )
- vhffsfssync_modify_watch(inotifyfd, vhffsfssync_cookie.from, pathname);
-
- vhffsfssync_net_broadcast_string( g_strdup_printf("move\x1F%s\x1F%s\n", vhffsfssync_cookie.from, pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
- vhffsfssync_cookie.id = 0;
- free(vhffsfssync_cookie.from);
- }
- // create
- else {
- vhffsfssync_manage_event_create(inotifyfd, pathname, TRUE);
- }
-
- // watch deleted, clean it
- } else if( event->mask & IN_IGNORED ) {
-#if DEBUG_INOTIFY
- printf("IN_IGNORED\n");
-#endif
- vhffsfssync_del_watch(inotifyfd, NULL, event->wd);
-
- // this event is not handled, this should not happen
- } else {
-#if DEBUG_INOTIFY
- printf("OOOOOOOPPPSS!!!!!\n");
-#endif
- }
-
- free(pathname);
- return 0;
-}
-
-
-int vhffsfssync_fake_events_recursively(int inotifyfd, char *pathname) {
- DIR *d;
- struct dirent *dir;
-
- d = opendir(pathname);
- if(d) {
- while( (dir = readdir(d)) ) {
- if( strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
- char *path;
- char *sep = "/";
-
- if( pathname[strlen(pathname)-1] == '/' ) sep = "";
- path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
-
- // recursivity is done through vhffsfssync_manage_event_create()
- // which calls this function
- vhffsfssync_manage_event_create(inotifyfd, path, TRUE);
- free(path);
- }
- }
- closedir(d);
- }
-
- return 0;
-}
-
-
-int main(int argc, char *argv[]) {
-
- char *root;
- int inotifyfd, flags;
- int wd;
-
- int listenfd, opt;
- uint32_t bindaddr;
- uint16_t bindport;
- struct sockaddr_in src;
-
-
- /* chdir() to the filesystem to monitor */
- root = argv[1];
- if( chdir(root) < 0) {
- fprintf(stderr, "cannot chdir() to %s: %s\n", root, strerror(errno));
- return -1;
- }
- root = ".";
-#if DEBUG_INOTIFY
- printf("Monitoring %s\n", root);
-#endif
-
- /* -- inotify stuff -- */
-
- vhffsfssync_wd_to_path = g_hash_table_new_full(g_int_hash, g_int_equal, g_free, g_free);
- vhffsfssync_path_to_wd = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, NULL);
- vhffsfssync_cookie.id = 0;
- vhffsfssync_cookie.from = NULL;
- vhffsfssync_openfiles = 0;
-
- inotifyfd = inotify_init();
-
- /* set inotifyfd to non-blocking */
- flags = fcntl(inotifyfd, F_GETFL);
- if(flags >= 0) {
- flags |= O_NONBLOCK;
- fcntl(inotifyfd, F_SETFL, flags);
- }
-
- wd = vhffsfssync_add_watch_recursively(inotifyfd, root, VHFFSFSSYNC_WATCH_MASK);
- if(wd < 0) {
- fprintf(stderr, "Maximum number of watches probably reached, consider adding more or fixing what is being wrong before running me again (strace is your friend)... byebye!\n");
- return -1;
- }
-
-
- /* -- network stuff -- */
- vhffsfssync_conns = NULL;
-
- signal(SIGPIPE, SIG_IGN);
-
- /* listening for network connections */
- if( (listenfd = socket(AF_INET, SOCK_STREAM, 0) ) < 0) {
- fprintf(stderr, "socket() failed: %s\n", strerror(errno));
- return -1;
- }
-
- /* set listenfd to non-blocking */
- flags = fcntl(listenfd, F_GETFL);
- if(flags >= 0) {
- flags |= O_NONBLOCK;
- fcntl(listenfd, F_SETFL, flags);
- }
-
- /* add the ability to listen on a TIME_WAIT */
- opt = 1;
- if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt) ) ) {
- fprintf(stderr, "setsockopt() failed on socket %d: %s\n", listenfd, strerror(errno));
- }
-
- bindaddr = INADDR_ANY;
- bindport = 4567;
- src.sin_addr.s_addr = bindaddr;
- src.sin_family = AF_INET;
- src.sin_port = htons(bindport);
- if( bind(listenfd, (struct sockaddr*)&src, sizeof(src) ) < 0) {
- fprintf(stderr, "bind() failed on socket %d: %s\n", listenfd, strerror(errno));
- return -1;
- }
-
- if( listen(listenfd, SOMAXCONN) < 0) {
- fprintf(stderr, "listen() failed on socket %d: %s\n", listenfd, strerror(errno));
- return -1;
- }
-
-#if DEBUG_NET
- printf("Listening on %s:%d\n", inet_ntoa(src.sin_addr), bindport);
-#endif
-
- /* -- main loop -- */
- while(1) {
- int max_fd = 0;
- fd_set readfs;
- fd_set writefs;
- //struct timeval tv;
- GList *conns;
- int ret;
-
- FD_ZERO(&readfs);
- FD_ZERO(&writefs);
-
- /* inotify events */
- FD_SET(inotifyfd, &readfs);
- if(inotifyfd > max_fd) max_fd = inotifyfd;
-
- /* new connections */
- FD_SET(listenfd, &readfs);
- if(listenfd > max_fd) max_fd = listenfd;
-
- /* connections */
- for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
- vhffsfssync_conn *conn = conns->data;
- conns = g_list_next(conns);
-
- /* this connnection was disabled, destroy it */
- if(conn->fd < 0) {
- vhffsfssync_net_conn_destroy(conn);
- continue;
- }
-
- FD_SET(conn->fd, &readfs);
- if(conn->messages) FD_SET(conn->fd, &writefs);
- if(conn->fd > max_fd) max_fd = conn->fd;
- }
-
- //tv.tv_sec = 3600;
- //tv.tv_usec = 0;
- ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
- if(ret < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
- }
- }
- if(ret > 0) {
- /* inotify events */
- if( FD_ISSET(inotifyfd, &readfs) ) {
- char buf[VHFFSFSSYNC_BUF_LEN];
- ssize_t len;
-
- len = read(inotifyfd, buf, VHFFSFSSYNC_BUF_LEN);
- if(len < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- fprintf(stderr, "read() failed on inotify fd(%d): %s\n", inotifyfd, strerror(errno));
- }
- }
- else {
- char *cur = buf;
- while(len > 0) {
- int register next;
- struct inotify_event *ie;
-
- ie = (struct inotify_event*)cur;
- vhffsfssync_manage_event( inotifyfd, ie );
- next = sizeof(struct inotify_event);
- next += ie->len;
- len -= next;
- cur += next;
- }
- }
- }
-
- /* new connections */
- if( FD_ISSET(listenfd, &readfs) ) {
- struct sockaddr_in addr;
- int newfd, flags;
- socklen_t addr_len;
- vhffsfssync_conn *conn;
- //struct hostent *ent;
-
- addr_len = sizeof(addr);
- newfd = accept(listenfd, (struct sockaddr*)&addr, &addr_len);
- if(newfd <= 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- fprintf(stderr, "accept() failed on listen fd(%d): %s\n", listenfd, strerror(errno));
- }
- }
- else {
- //char *grosbuf;
- //grosbuf=malloc(104857600);
- //memset(grosbuf, 'A', 104857600);
-
- // We don't need the reverse DNS, the code here is for learning purpose
- //ent = gethostbyaddr((const void*)&addr.sin_addr, sizeof(sizeof(addr.sin_addr)), AF_INET);
- //if(ent) printf("And you are %s\n", ent->h_name);
-
- /* set newfd to non-blocking */
- flags = fcntl(newfd, F_GETFL);
- if(flags >= 0) {
- flags |= O_NONBLOCK;
- fcntl(newfd, F_SETFL, flags);
- }
-
- /* register the connection */
- conn = malloc(sizeof(vhffsfssync_conn));
- conn->fd = newfd;
- conn->sockaddr = addr;
- conn->order = 0;
- //conn->recvbuf = NULL;
- //conn->recvbuf_cur = 0;
- //conn->recvbuf_size = 0;
- //conn->sendbuf = NULL;
- //conn->sendbuf_cur = 0;
- //conn->sendbuf_size = 0;
- conn->messages = NULL;
- vhffsfssync_conns = g_list_append(vhffsfssync_conns, conn);
-#if DEBUG_NET
- printf("Welcome %s ! (using fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
-#endif
- vhffsfssync_net_send_string(conn, strdup("hello\n"), VHFFSFSSYNC_NET_PRIO_HIGHEST);
-/*
- grosbuf=malloc(10485760);
- memset(grosbuf, 'A', 10485760);
- vhffsfssync_net_send_data(conn, grosbuf, 1048576, 3);
-
- grosbuf=malloc(10485760);
- memset(grosbuf, 'B', 10485760);
- vhffsfssync_net_send_data(conn, grosbuf, 1048576, 2);
-
- grosbuf=malloc(10485760);
- memset(grosbuf, 'C', 10485760);
- vhffsfssync_net_send_data(conn, grosbuf, 1048576, 1);
-
- grosbuf=malloc(10485760);
- memset(grosbuf, 'D', 10485760);
- vhffsfssync_net_send_data(conn, grosbuf, 1048576, 0);
-*/
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 1);
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 1);
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
- //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
- //vhffsfssync_net_write(conn, "Salut!\n", strlen("Salut!\n"));
- //vhffsfssync_net_write(conn, grosbuf, 104857600);
- }
- }
-
- /* connections */
- for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
- vhffsfssync_conn *conn = conns->data;
- conns = g_list_next(conns);
-
- /* data to read ?, give give ! */
- if( FD_ISSET(conn->fd, &readfs) ) {
- char buf[VHFFSFSSYNC_BUF_LEN];
- ssize_t len;
-
- len = read(conn->fd, buf, VHFFSFSSYNC_BUF_LEN);
- if(len < 0) {
- switch(errno) {
- case EAGAIN:
- case EINTR:
- break;
- default:
- fprintf(stderr, "read() failed on socket %d: %s\n", conn->fd, strerror(errno));
- vhffsfssync_net_conn_disable(conn);
- }
- }
- else if(len == 0) {
- vhffsfssync_net_conn_disable(conn);
- }
- else {
- char *plop = malloc(len+1);
- memcpy(plop, buf, len);
- plop[len] = '\0';
- printf("%s", plop);
- free(plop);
- }
- }
-
- /* try to send more data */
- if(conn && conn->messages && FD_ISSET(conn->fd, &writefs) ) {
- vhffsfssync_net_send(conn);
- }
- }
- }
- }
-
- return 0;
-}
Copied: trunk/vhffs-fssync/vhffsfssync_master.c (from rev 1264, trunk/vhffs-fssync/getevents.c)
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_master.c (rev 0)
+++ trunk/vhffs-fssync/vhffsfssync_master.c 2008-10-07 11:13:12 UTC (rev 1265)
@@ -0,0 +1,1331 @@
+#define _FILE_OFFSET_BITS 64
+
+#define DEBUG_NET 0
+#define DEBUG_INOTIFY 0
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <dirent.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/inotify.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <glib.h>
+#include <sys/select.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/sendfile.h>
+
+
+/* TODO: vérifier qu'un nom de fichier ne contient pas un \x1F, sinon c'est le segfault assuré du client */
+/* convertir les \x1F vers un espace lors de l'émission de l'évent ? */
+
+/* -- inotify stuff -- */
+
+#define VHFFSFSSYNC_BUF_LEN 4096
+#define VHFFSFSSYNC_WATCH_MASK IN_ATTRIB|IN_CLOSE_WRITE|IN_CREATE|IN_DELETE|IN_DELETE_SELF|IN_MODIFY|IN_MOVE_SELF|IN_MOVED_FROM|IN_MOVED_TO|IN_DONT_FOLLOW|IN_ONLYDIR
+// Not used: IN_ACCESS, IN_CLOSE_NOWRITE, IN_OPEN
+
+// the maximum number of files simultaneously opened
+// huge values offer better performance
+// actually it is MAX = VHFFSFSSYNC_MAX_OPENFILES + number of clients
+#define VHFFSFSSYNC_MAX_OPENFILES 512
+
+// each monitor entry is associated with a path, we need to keep it to compute the path
+//char **vhffsfssync_wd_to_path = NULL;
+//int vhffsfssync_wd_to_path_len = 0; // number of allocated paths
+GHashTable *vhffsfssync_wd_to_path;
+GHashTable *vhffsfssync_path_to_wd;
+
+// return a timestamp in ms (it loops for 100000 sec)
+/*inline int vhffsfssync_timestamp() {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ return (tv.tv_sec%100000)*1000+tv.tv_usec/1000;
+}*/
+
+struct vhffsfssync_cookie {
+ uint32_t id;
+ char *from;
+ gboolean isdir;
+};
+static struct vhffsfssync_cookie vhffsfssync_cookie;
+
+int vhffsfssync_openfiles;
+
+// protos
+int vhffsfssync_add_watch(int inotifyfd, const char *pathname, uint32_t mask);
+int vhffsfssync_del_watch(int inotifyfd, const char *pathname, int wd);
+int vhffsfssync_add_watch_recursively(int inotifyfd, const char *pathname, uint32_t mask);
+int vhffsfssync_manage_event_create(int inotifyfd, char *pathname, gboolean sendfile);
+int vhffsfssync_manage_event(int inotifyfd, struct inotify_event *event);
+int vhffsfssync_fake_events_recursively(int inotifyfd, char *pathname);
+
+
+/* -- network stuff -- */
+#define VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK 65536
+
+GList *vhffsfssync_conns;
+
+typedef struct {
+ int fd;
+ struct sockaddr_in sockaddr;
+ GList *messages;
+ uint32_t order;
+// char *recvbuf;
+// ssize_t recvbuf_cur;
+// ssize_t recvbuf_size;
+// char *sendbuf;
+// ssize_t sendbuf_cur;
+// ssize_t sendbuf_size;
+} vhffsfssync_conn;
+
+
+/* message - generic */
+typedef unsigned short int msg_family_t;
+enum {
+ VHFFSFSSYNC_NET_MESSAGE_UNSPEC=0,
+ VHFFSFSSYNC_NET_MESSAGE_DATA,
+ VHFFSFSSYNC_NET_MESSAGE_FILE
+};
+
+/* message - priorities */
+enum {
+ VHFFSFSSYNC_NET_PRIO_HIGHEST=100, // values < 100 may be used internally, please don't set anything below 100 or die!
+ VHFFSFSSYNC_NET_PRIO_HIGH, // values >= 1000 are used for files, don't use them too
+ VHFFSFSSYNC_NET_PRIO_MEDIUM,
+ VHFFSFSSYNC_NET_PRIO_LOW,
+ VHFFSFSSYNC_NET_PRIO_LOWEST
+};
+
+
+#define __VHFFSFSSYNC_NET_MESSAGE_COMMON(msg_prefix) \
+ msg_family_t msg_prefix##family; \
+ uint32_t msg_prefix##priority; \
+ uint32_t msg_prefix##order
+
+#define __VHFFSFSSYNC_NET_MESSAGE_COMMON_SIZE ( sizeof (msg_family_t) + sizeof(uint32_t) + sizeof(uint32_t) )
+
+typedef struct {
+ __VHFFSFSSYNC_NET_MESSAGE_COMMON(msg_);
+ char msg_data[200];
+} vhffsfssync_net_message;
+
+/* message - common data */
+typedef struct {
+ __VHFFSFSSYNC_NET_MESSAGE_COMMON(data_);
+ char *data_buffer;
+ ssize_t data_len;
+ ssize_t data_cur;
+
+ /* pad to size of `vhffsfssync_net_message' */
+ unsigned char sin_zero[ sizeof(vhffsfssync_net_message)
+ - __VHFFSFSSYNC_NET_MESSAGE_COMMON_SIZE
+ - sizeof(char*)
+ - sizeof(ssize_t)
+ - sizeof(ssize_t) ];
+} vhffsfssync_net_message_data;
+
+/* message - file */
+typedef struct {
+ __VHFFSFSSYNC_NET_MESSAGE_COMMON(file_);
+ FILE *file_stream;
+ off_t file_offset;
+ off_t file_size;
+ off_t file_chunksize;
+ off_t file_chunkcur;
+ char *file_pathname;
+
+ /* pad to size of `vhffsfssync_net_message' */
+ unsigned char sin_zero[ sizeof(vhffsfssync_net_message)
+ - __VHFFSFSSYNC_NET_MESSAGE_COMMON_SIZE
+ - sizeof(FILE*)
+ - sizeof(off_t)
+ - sizeof(off_t)
+ - sizeof(off_t)
+ - sizeof(off_t)
+ - sizeof(char*) ];
+} vhffsfssync_net_message_file;
+
+
+// protos
+void vhffsfssync_net_conn_disable(vhffsfssync_conn *conn);
+void vhffsfssync_net_conn_destroy(vhffsfssync_conn *conn);
+inline vhffsfssync_net_message *vhffsfssync_net_new_message(vhffsfssync_conn *conn, msg_family_t family, uint32_t priority);
+gint vhffsfssync_net_message_insert_compare(gconstpointer a, gconstpointer b);
+inline void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg);
+int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len, uint32_t priority);
+void vhffsfssync_net_destroy_data(vhffsfssync_conn *conn, vhffsfssync_net_message_data *datamsg);
+inline int vhffsfssync_net_send_string(vhffsfssync_conn *conn, char *data, uint32_t priority) ;
+void vhffsfssync_net_broadcast_string(char *data, uint32_t priority);
+int vhffsfssync_net_send_file(vhffsfssync_conn *conn, char *pathname);
+void vhffsfssync_net_destroy_file(vhffsfssync_conn *conn, vhffsfssync_net_message_file *filemsg);
+FILE *vhffsfssync_net_file_open(vhffsfssync_conn *conn, const char *pathname, const char *mode);
+int vhffsfssync_net_file_close(vhffsfssync_conn *conn, FILE *stream);
+int vhffsfssync_net_remove_file(vhffsfssync_conn *conn, char *pathname);
+void vhffsfssync_net_broadcast_file(char *pathname);
+int vhffsfssync_net_send(vhffsfssync_conn *conn);
+
+
+/* ----------------------------------------- */
+void vhffsfssync_net_conn_disable(vhffsfssync_conn *conn) {
+ GList *msgs;
+
+ if(conn->fd >= 0) {
+#if DEBUG_NET
+ printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+ shutdown(conn->fd, SHUT_RDWR);
+ close(conn->fd);
+ }
+ conn->fd = -1;
+
+ while( (msgs = g_list_first(conn->messages)) ) {
+ vhffsfssync_net_destroy_message(conn, (vhffsfssync_net_message*)msgs->data );
+ }
+}
+
+
+void vhffsfssync_net_conn_destroy(vhffsfssync_conn *conn) {
+ vhffsfssync_conns = g_list_remove(vhffsfssync_conns, conn);
+ vhffsfssync_net_conn_disable(conn);
+ free(conn);
+}
+
+
+/* -- network stuff -- */
+inline vhffsfssync_net_message *vhffsfssync_net_new_message(vhffsfssync_conn *conn, msg_family_t family, uint32_t priority) {
+ vhffsfssync_net_message *msg;
+ msg = malloc( sizeof(vhffsfssync_net_message) );
+ msg->msg_family = family;
+ msg->msg_priority = priority;
+ msg->msg_order = conn->order++;
+ return msg;
+}
+
+
+gint vhffsfssync_net_message_insert_compare(gconstpointer a, gconstpointer b) {
+ vhffsfssync_net_message *first = (vhffsfssync_net_message*)a;
+ vhffsfssync_net_message *second = (vhffsfssync_net_message*)b;
+
+ // lowest priority is preferred
+ if(first->msg_priority != second->msg_priority)
+ return first->msg_priority - second->msg_priority;
+
+ // lowest order is preferred
+ return first->msg_order - second->msg_order;
+}
+
+
+inline void vhffsfssync_net_destroy_message(vhffsfssync_conn *conn, vhffsfssync_net_message *msg) {
+ if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_DATA) {
+ vhffsfssync_net_destroy_data(conn, (vhffsfssync_net_message_data*)msg);
+ }
+ else if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
+ vhffsfssync_net_destroy_file(conn, (vhffsfssync_net_message_file*)msg);
+ }
+}
+
+
+// !!!!!! the buffer is freed when the message has been sent, DON'T send static string and DON'T free() the data yourself
+int vhffsfssync_net_send_data(vhffsfssync_conn *conn, char *data, ssize_t len, uint32_t priority) {
+ vhffsfssync_net_message_data *msg;
+ if(!conn || !data || len <= 0) return -1;
+
+ msg = (vhffsfssync_net_message_data*)vhffsfssync_net_new_message(conn, VHFFSFSSYNC_NET_MESSAGE_DATA, priority);
+ msg->data_buffer = data;
+ msg->data_len = len;
+ msg->data_cur = 0;
+ conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
+
+ return 0;
+}
+
+
+void vhffsfssync_net_destroy_data(vhffsfssync_conn *conn, vhffsfssync_net_message_data *datamsg) {
+ conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)datamsg);
+ free(datamsg->data_buffer);
+ free(datamsg);
+}
+
+
+inline int vhffsfssync_net_send_string(vhffsfssync_conn *conn, char *data, uint32_t priority) {
+ return vhffsfssync_net_send_data(conn, data, strlen(data), priority);
+}
+
+
+void vhffsfssync_net_broadcast_string(char *data, uint32_t priority) {
+
+ GList *conns;
+ for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
+ vhffsfssync_conn *conn = conns->data;
+ conns = g_list_next(conns);
+
+ vhffsfssync_net_send_string(conn, strdup(data), priority);
+ }
+ free(data);
+}
+
+
+// prototype is simple, files are always the lowest of the lowest priority messages
+int vhffsfssync_net_send_file(vhffsfssync_conn *conn, char *pathname) {
+ vhffsfssync_net_message_file *msg;
+ struct stat st;
+ uint32_t maxprio = -1; // 4294967295
+ FILE *stream;
+
+ if(!conn || !pathname) {
+ return -1;
+ }
+
+ if( lstat(pathname, &st) < 0 ) {
+ fprintf(stderr, "lstat() failed on %s: %s\n", pathname, strerror(errno));
+ return -1;
+ }
+
+ /* only copy regular files */
+ if(! S_ISREG(st.st_mode) ) {
+ fprintf(stderr, "%s is not a regular file\n", pathname);
+ return -1;
+ }
+
+ stream = vhffsfssync_net_file_open(conn, pathname, "r");
+ if(!stream) {
+ return -1;
+ }
+
+ // if the file is being sent, cancel it
+ vhffsfssync_net_remove_file(conn, pathname);
+ //printf("%d SENDING FILE %s\n", conn->fd, pathname);
+ vhffsfssync_net_send_string( conn, g_strdup_printf("create\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
+
+ // the size of the file is the priority (small files are sent with more priority)
+ // but don't set the priority too low, low value can be used for anything else
+ msg = (vhffsfssync_net_message_file*)vhffsfssync_net_new_message(conn, VHFFSFSSYNC_NET_MESSAGE_FILE, MAX(MIN(st.st_size, maxprio),1000) );
+ msg->file_stream = stream;
+ msg->file_offset = 0;
+ msg->file_pathname = strdup(pathname);
+ msg->file_size = st.st_size;
+ msg->file_chunksize = -1;
+ msg->file_chunkcur = 0;
+ conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
+
+ return 0;
+}
+
+
+void vhffsfssync_net_destroy_file(vhffsfssync_conn *conn, vhffsfssync_net_message_file *filemsg) {
+ conn->messages = g_list_remove(conn->messages, (vhffsfssync_net_message*)filemsg);
+
+ /* we need to finish the chunk anyway */
+ if(filemsg->file_chunksize > 0) {
+ off_t len = filemsg->file_chunksize - filemsg->file_chunkcur;
+ if(len > 0) {
+ char *data = malloc(len);
+ memset(data, 0, len);
+ vhffsfssync_net_send_data(conn, data, len, 0);
+ }
+ }
+
+ vhffsfssync_net_file_close(conn, filemsg->file_stream);
+ free(filemsg->file_pathname);
+ free(filemsg);
+}
+
+
+FILE *vhffsfssync_net_file_open(vhffsfssync_conn *conn, const char *pathname, const char *mode) {
+ FILE *stream;
+
+ if(!conn || !pathname || !mode)
+ return NULL;
+
+ stream = fopen(pathname, mode);
+ if(!stream) {
+ fprintf(stderr, "fopen() failed on %s: %s\n", pathname, strerror(errno));
+ return stream;
+ }
+
+ vhffsfssync_openfiles++;
+ // do we need to free a file fd ?
+ if(vhffsfssync_openfiles > VHFFSFSSYNC_MAX_OPENFILES) {
+ GList *msgs;
+ // we prefer to fclose() the fd of the msg with the lowest priority
+ for(msgs = g_list_last(conn->messages) ; msgs ; ) {
+ vhffsfssync_net_message *msg = msgs->data;
+ msgs = g_list_previous(msgs);
+
+ if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
+ vhffsfssync_net_message_file *filemsg = (vhffsfssync_net_message_file*)msg;
+
+ if(filemsg->file_stream && filemsg->file_chunksize < 0) {
+ vhffsfssync_net_file_close(conn, filemsg->file_stream);
+ filemsg->file_stream = NULL;
+ break;
+ }
+ }
+ }
+ }
+
+ return stream;
+}
+
+
+int vhffsfssync_net_file_close(vhffsfssync_conn *conn, FILE *stream) {
+ int r;
+
+ if(!stream)
+ return -1;
+
+ r = fclose(stream);
+ if( r ) {
+ fprintf(stderr, "fclose() failed: %s\n", strerror(errno));
+ return r;
+ }
+ vhffsfssync_openfiles--;
+ return r;
+}
+
+
+void vhffsfssync_net_broadcast_file(char *pathname) {
+
+ GList *conns;
+ for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
+ vhffsfssync_conn *conn = conns->data;
+ conns = g_list_next(conns);
+ vhffsfssync_net_send_file(conn, pathname);
+ }
+}
+
+
+int vhffsfssync_net_remove_file(vhffsfssync_conn *conn, char *pathname) {
+ GList *msgs;
+ for(msgs = g_list_first(conn->messages) ; msgs ; ) {
+ vhffsfssync_net_message *msg = msgs->data;
+ msgs = g_list_next(msgs);
+
+ if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
+ vhffsfssync_net_message_file *filemsg = (vhffsfssync_net_message_file*)msg;
+ if(!strcmp(filemsg->file_pathname, pathname)) {
+ //printf("%d CANCELLING %s\n", conn->fd, pathname);
+ vhffsfssync_net_destroy_file(conn, filemsg);
+ }
+ }
+ }
+ return 0;
+}
+
+
+int vhffsfssync_net_send(vhffsfssync_conn *conn) {
+ GList *msgs;
+ gboolean full = FALSE;
+
+ if(!conn || conn->fd < 0) return -1;
+#if DEBUG_NET
+ printf("--------------------------------------------------\n");
+ printf("conn: %d, to: %s\n", conn->fd, inet_ntoa(conn->sockaddr.sin_addr));
+#endif
+ while(!full && conn->fd >= 0 && (msgs = g_list_first(conn->messages)) ) {
+ vhffsfssync_net_message *msg = msgs->data;
+#if DEBUG_NET
+ printf(" family: %d , priority: %d , order: %d\n", msg->msg_family, msg->msg_priority, msg->msg_order);
+#endif
+ // data
+ if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_DATA) {
+ vhffsfssync_net_message_data *datamsg;
+ ssize_t written;
+ ssize_t lentowrite;
+
+ // we need to make sure that the message will not be truncated, we set the current message to the highest priority
+ msg->msg_priority = 0;
+
+ datamsg = (vhffsfssync_net_message_data*)msg;
+#if DEBUG_NET
+ printf(" buffer: %d bytes, %d already written\n", datamsg->data_len, datamsg->data_cur);
+#endif
+ /* try to empty the buffer */
+ lentowrite = datamsg->data_len - datamsg->data_cur;
+ written = write(conn->fd, datamsg->data_buffer + datamsg->data_cur, lentowrite);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+#if DEBUG_NET
+ printf("=====> EAGAIN on write()\n");
+#endif
+ full = TRUE;
+ break;
+ default:
+ fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
+ vhffsfssync_net_conn_disable(conn);
+ }
+ }
+ else {
+ datamsg->data_cur += written;
+#if DEBUG_NET
+ printf(" %d bytes written on %d bytes\n", written, lentowrite);
+#endif
+ /* the buffer is not empty yet (but the SendQ into the kernel is) */
+ if(written < lentowrite) {
+ full = TRUE;
+ }
+ /* buffer is now empty */
+ else {
+ vhffsfssync_net_destroy_data(conn, datamsg);
+ }
+ }
+ }
+
+ // file
+ else if(msg->msg_family == VHFFSFSSYNC_NET_MESSAGE_FILE) {
+ vhffsfssync_net_message_file *filemsg;
+ ssize_t written;
+ off_t lentowrite;
+
+ filemsg = (vhffsfssync_net_message_file*)msg;
+#if DEBUG_NET
+ printf(" file: %s, offset = %lld, size = %lld\n", filemsg->file_pathname, (long long int)filemsg->file_offset, (long long int)filemsg->file_size);
+#endif
+ /* new chunk */
+ if(filemsg->file_chunksize < 0) {
+ if(!filemsg->file_stream) {
+ filemsg->file_stream = vhffsfssync_net_file_open(conn, filemsg->file_pathname, "r");
+ if(!filemsg->file_stream) {
+ vhffsfssync_net_destroy_file(conn, filemsg);
+ break;
+ }
+ }
+
+ lentowrite = filemsg->file_chunksize = MIN(VHFFSFSSYNC_NET_MESSAGE_FILE_CHUNK, filemsg->file_size - filemsg->file_offset);
+ filemsg->file_chunkcur = 0;
+ // we need to make sure that the chunk will not be truncated, we set the current message to the highest priority
+ msg->msg_priority = 1;
+ vhffsfssync_net_send_string(conn, g_strdup_printf("write\x1F%s\x1F%lld\x1F%lld\n", filemsg->file_pathname, (long long int)filemsg->file_offset, (long long int)filemsg->file_chunksize) , 0);
+ // we need to reset here in order to consider the new priorities
+ continue;
+ }
+ /* the previous chunk was partially sent */
+ else {
+ lentowrite = filemsg->file_chunksize - filemsg->file_chunkcur;
+ }
+
+ /* try to send the file */
+ written = sendfile(conn->fd, fileno(filemsg->file_stream), &filemsg->file_offset, lentowrite);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+#if DEBUG_NET
+ printf("=====> EAGAIN on sendfile()\n");
+#endif
+ full = TRUE;
+ break;
+ default:
+ fprintf(stderr, "sendfile() failed from file %s to socket %d: %s\n", filemsg->file_pathname, conn->fd, strerror(errno));
+ vhffsfssync_net_conn_disable(conn);
+ }
+ }
+ else {
+#if DEBUG_NET
+ printf(" %d bytes written, we are at offset %lld\n", written, filemsg->file_offset);
+#endif
+ filemsg->file_chunkcur += written;
+
+ /* end of file or file completed */
+ if( written == 0 || filemsg->file_offset == filemsg->file_size ) {
+ vhffsfssync_net_destroy_file(conn, filemsg);
+ }
+
+ /* the chunk is not fully sent yet */
+ else if(written < lentowrite) {
+ full = TRUE;
+ }
+
+ /* the chunk is sent */
+ else if(written == lentowrite) {
+ uint32_t maxprio = -1; // 4294967295
+
+ filemsg->file_chunksize = -1;
+ filemsg->file_chunkcur = 0;
+
+ // reschedule this file to a nicer priority
+ msg->msg_priority = MAX(MIN(filemsg->file_size - filemsg->file_offset, maxprio),1000);
+ conn->messages = g_list_remove(conn->messages, msg);
+ conn->messages = g_list_insert_sorted(conn->messages, msg, vhffsfssync_net_message_insert_compare);
+ }
+ }
+ }
+
+ // I don't want to stay in this jail
+ else full = TRUE;
+ }
+
+ return 0;
+}
+
+
+#if 0
+int vhffsfssync_net_write(vhffsfssync_conn *conn, char *buffer, ssize_t len) {
+
+ ssize_t written = -1;
+
+ /* new data to send */
+ if(buffer && len > 0) {
+ written = 0;
+ // buffer is empty, try to send the data directly (it avoids copying the data twice)
+ if(conn->sendbuf_size == 0) {
+ written = write(conn->fd, buffer, len);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ printf("=====> EAGAIN on write()\n");
+ break;
+ default:
+ fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
+ }
+ written = 0;
+ }
+ }
+
+ printf("%d bytes written on %d bytes\n", written, len);
+
+ /* this is the case if the buffer is not empty, or if the write failed, or if the write didn't write everything */
+ if(written < len) {
+ ssize_t lentobuf = len-written;
+ printf("buffering %d bytes\n", lentobuf);
+
+ /* As you may have noticed, the buffer is only growing as long as it still
+ * contains data to be sent, here we try to deal with that, as it needs data
+ * to be copied, we hope that it is not going to happen very often
+ */
+
+ conn->sendbuf = realloc(conn->sendbuf, conn->sendbuf_size+lentobuf);
+ memcpy(conn->sendbuf+conn->sendbuf_size, buffer+written, lentobuf);
+ conn->sendbuf_size += lentobuf;
+ printf("buffer size = %d\n", conn->sendbuf_size);
+ }
+ }
+
+ /* try to empty the buffer */
+ if(!buffer && len == 0 && conn->sendbuf_size > 0) {
+ ssize_t lentowrite = conn->sendbuf_size - conn->sendbuf_cur;
+ written = write(conn->fd, conn->sendbuf + conn->sendbuf_cur, lentowrite);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ printf("=====> EAGAIN on write()\n");
+ break;
+ default:
+ fprintf(stderr, "write() failed on socket %d: %s\n", conn->fd, strerror(errno));
+ }
+ written = 0;
+ }
+
+ if(written > 0) {
+ printf("%d bytes written on %d bytes\n", written, lentowrite);
+
+ /* the buffer is not empty yet */
+ if(written < lentowrite) {
+ conn->sendbuf_cur += written;
+ }
+ /* buffer is now empty */
+ else {
+ printf("buffer is now empty\n");
+ free(conn->sendbuf);
+ conn->sendbuf = NULL;
+ conn->sendbuf_cur = 0;
+ conn->sendbuf_size = 0;
+ }
+ }
+ }
+
+ return written;
+}
+#endif
+
+
+
+/* -- inotify stuff -- */
+
+int vhffsfssync_add_watch(int inotifyfd, const char *pathname, uint32_t mask) {
+
+ int wd;
+ int *_wd;
+ char *_pathname;
+
+ if( g_hash_table_lookup(vhffsfssync_path_to_wd, pathname) ) {
+ return -1;
+ }
+
+ wd = inotify_add_watch(inotifyfd, pathname, mask);
+ if(wd < 0) {
+ if(errno == ENOSPC) {
+ fprintf(stderr, "Maximum number of watches reached, consider adding more...\n");
+ }
+ return wd;
+ }
+
+ _wd = g_new(int, 1);
+ *_wd = wd;
+ _pathname = g_strdup(pathname);
+ g_hash_table_insert(vhffsfssync_wd_to_path, _wd, _pathname);
+ g_hash_table_insert(vhffsfssync_path_to_wd, _pathname, _wd);
+
+// if(wd >= vhffsfssync_wd_to_path_len) {
+// vhffsfssync_wd_to_path_len = ( (wd >>10) +1) <<10;
+// vhffsfssync_wd_to_path = realloc( vhffsfssync_wd_to_path, vhffsfssync_wd_to_path_len * sizeof(void*) );
+// }
+// vhffsfssync_wd_to_path[wd] = strdup(pathname);
+#if DEBUG_INOTIFY
+ printf("+ %d %s\n", wd, pathname);
+#endif
+ if(g_hash_table_size(vhffsfssync_wd_to_path) != g_hash_table_size(vhffsfssync_path_to_wd)) {
+ exit(-1);
+ }
+ return wd;
+}
+
+
+int vhffsfssync_del_watch(int inotifyfd, const char *pathname, int wd) {
+
+ if(!pathname && wd > 0) {
+ pathname = (char*)g_hash_table_lookup(vhffsfssync_wd_to_path, &wd);
+ // this wd has already been deleted
+ if(!pathname) return -1;
+ }
+ else if(pathname && wd == 0) {
+ int *_wd;
+ _wd = g_hash_table_lookup(vhffsfssync_path_to_wd, pathname);
+ // this wd has already been deleted
+ if(!_wd) return -1;
+ wd = *_wd;
+ }
+ else return -1;
+#if DEBUG_INOTIFY
+ printf("- %d %s\n", wd, pathname);
+#endif
+ g_hash_table_remove(vhffsfssync_path_to_wd, pathname);
+ g_hash_table_remove(vhffsfssync_wd_to_path, &wd);
+ if(g_hash_table_size(vhffsfssync_wd_to_path) != g_hash_table_size(vhffsfssync_path_to_wd)) {
+ exit(-1);
+ }
+ return inotify_rm_watch(inotifyfd, wd);
+}
+
+
+int vhffsfssync_modify_watch(int inotifyfd, const char *from, const char *to) {
+
+ int wd;
+ int *_wd;
+ char *_to;
+
+ _wd = g_hash_table_lookup(vhffsfssync_path_to_wd, from);
+ if(!_wd) return -1;
+ wd = *_wd;
+
+ g_hash_table_remove(vhffsfssync_path_to_wd, from);
+ g_hash_table_remove(vhffsfssync_wd_to_path, _wd);
+
+ _wd = g_new(int, 1);
+ *_wd = wd;
+ _to = strdup(to);
+ g_hash_table_insert(vhffsfssync_wd_to_path, _wd, _to);
+ g_hash_table_insert(vhffsfssync_path_to_wd, _to, _wd);
+
+#if DEBUG_INOTIFY
+ printf("= %d %s -> %s\n", wd, from, to);
+#endif
+ return 0;
+}
+
+
+int vhffsfssync_add_watch_recursively(int inotifyfd, const char *pathname, uint32_t mask) {
+
+ int wd;
+ DIR *d;
+ struct dirent *dir;
+
+ wd = vhffsfssync_add_watch(inotifyfd, pathname, mask);
+ if(wd < 0) return wd;
+
+ d = opendir(pathname);
+ if(d) {
+ while( (dir = readdir(d)) ) {
+ if(dir->d_type == DT_DIR && strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
+ char *path;
+ char *sep = "/";
+ if( pathname[strlen(pathname)-1] == '/' ) sep = "";
+ path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
+ wd = vhffsfssync_add_watch_recursively(inotifyfd, path, mask);
+ free(path);
+ }
+ }
+ closedir(d);
+ }
+
+ return wd;
+}
+
+
+int vhffsfssync_manage_event_remove(int inotifyfd, char *pathname) {
+ GList *conns;
+ int *_wd;
+
+#if DEBUG_INOTIFY
+ printf("==> REMOVE %s\n", pathname);
+#endif
+
+ if( (_wd = g_hash_table_lookup(vhffsfssync_path_to_wd, pathname)) ) {
+ vhffsfssync_del_watch(inotifyfd, NULL, *_wd);
+ }
+
+ /* connections */
+ for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
+ vhffsfssync_conn *conn = conns->data;
+ conns = g_list_next(conns);
+ vhffsfssync_net_remove_file(conn, pathname);
+ vhffsfssync_net_send_string(conn, g_strdup_printf("remove\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
+ }
+ return 0;
+}
+
+
+int vhffsfssync_manage_event_create(int inotifyfd, char *pathname, gboolean sendfile) {
+ struct stat st;
+
+ if(! lstat(pathname, &st) ) {
+
+ if( S_ISREG(st.st_mode) ) {
+#if DEBUG_INOTIFY
+ printf("==> CREATE %s\n", pathname);
+#endif
+ vhffsfssync_net_broadcast_string( g_strdup_printf("create\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
+ if(sendfile && st.st_size > 0) {
+ vhffsfssync_net_broadcast_file(pathname);
+ }
+ }
+
+ if( S_ISDIR(st.st_mode) ) {
+#if DEBUG_INOTIFY
+ printf("==> MKDIR %s\n", pathname);
+#endif
+ vhffsfssync_add_watch(inotifyfd, pathname, VHFFSFSSYNC_WATCH_MASK);
+ vhffsfssync_net_broadcast_string( g_strdup_printf("mkdir\x1F%s\n", pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
+ /* there is a short delay between the mkdir() and the add_watch(),
+ we need to send events about the data which have already been written */
+ vhffsfssync_fake_events_recursively( inotifyfd, pathname );
+ }
+
+ else if( S_ISLNK(st.st_mode) ) {
+ char *linkto;
+ int ret;
+ linkto = malloc(st.st_size +1);
+ ret = readlink(pathname, linkto, st.st_size);
+ if( ret >= 0 ) {
+ linkto[st.st_size] = '\0';
+#if DEBUG_INOTIFY
+ printf("==> SYMLINK %s -> %s\n", pathname, linkto);
+#endif
+ vhffsfssync_net_broadcast_string( g_strdup_printf("symlink\x1F%s\x1F%s\n", pathname, linkto) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
+ }
+ free(linkto);
+ if(ret < 0) {
+ if(errno == ENOENT) {
+ // file already disappeared (common for temporary files)
+ vhffsfssync_manage_event_remove(inotifyfd, pathname);
+ } else {
+ fprintf(stderr, "cannot readlink() '%s': %s\n", pathname, strerror(errno));
+ return -1;
+ }
+ }
+
+ }
+ /* we don't need other file types (chr, block, fifo, socket, ...) */
+ }
+ else {
+ if(errno == ENOENT) {
+ // file already disappeared (common for temporary files)
+ vhffsfssync_manage_event_remove(inotifyfd, pathname);
+ } else {
+ fprintf(stderr, "cannot lstat() '%s': %s\n", pathname, strerror(errno));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+int vhffsfssync_manage_event(int inotifyfd, struct inotify_event *event) {
+
+ char *dirpath, *pathname;
+#if DEBUG_INOTIFY
+ printf("wd=%d mask=%x cookie=%d len=%d", event->wd, event->mask, event->cookie, event->len);
+ if(event->len > 0) printf(" name=%s", event->name);
+ printf("\n");
+#endif
+
+ if(event->wd < 0) {
+ fprintf(stderr, "Maximum number of events reached, some events are lost\n");
+ return -1;
+ }
+
+ dirpath = (char*)g_hash_table_lookup(vhffsfssync_wd_to_path, &event->wd);
+ // this wd has been deleted
+ if(!dirpath) return -1;
+
+ if(event->len > 0) {
+ pathname = g_strdup_printf("%s/%s", dirpath, event->name);
+ } else {
+ pathname = strdup(dirpath);
+ }
+
+ // this event is not waiting for a cookie, delete file if necessary (IN_MOVED_FROM not followed with IN_MOVED_TO)
+ if( !(event->mask & IN_MOVED_TO) && vhffsfssync_cookie.id ) {
+
+ vhffsfssync_manage_event_remove(inotifyfd, vhffsfssync_cookie.from);
+ vhffsfssync_cookie.id = 0;
+ free(vhffsfssync_cookie.from);
+ }
+
+ // useless at the moment (chmod, chown, touch, ...)
+ if( event->mask & IN_ATTRIB ) {
+#if DEBUG_INOTIFY
+ printf("IN_ATTRIB\n");
+#endif
+
+ // new file, directory, or symlink
+ } else if( event->mask & IN_CREATE ) {
+#if DEBUG_INOTIFY
+ printf("IN_CREATE\n");
+#endif
+ vhffsfssync_manage_event_create(inotifyfd, pathname, FALSE);
+
+ // deleted file, directory or symlink
+ } else if( event->mask & IN_DELETE ) {
+#if DEBUG_INOTIFY
+ printf("IN_DELETE\n");
+#endif
+ vhffsfssync_manage_event_remove(inotifyfd, pathname);
+
+ // watch deleted, not used
+ } else if( event->mask & IN_DELETE_SELF ) {
+#if DEBUG_INOTIFY
+ printf("IN_DELETE_SELF\n");
+#endif
+ // We don't send REMOVE here because the dir can be deleted before the
+ // event was added, in this case the add_watch failed to monitor this dir
+ // and we'll not receive a IN_DELETE_SELF for it
+ //
+ //implicit deletion (IN_IGNORED will follow this event)
+ //vhffsfssync_del_watch(inotifyfd, event->wd);
+
+ // file modified
+ } else if( event->mask & IN_MODIFY ) {
+#if DEBUG_INOTIFY
+ printf("IN_MODIFY\n");
+ /* we can send the data here */
+ printf("==> SEND %s\n", pathname);
+#endif
+ // file modified and closed
+ } else if( event->mask & IN_CLOSE_WRITE ) {
+#if DEBUG_INOTIFY
+ printf("IN_CLOSE_WRITE\n");
+ /* we must send the data here */
+ printf("==> SEND %s\n", pathname);
+#endif
+ vhffsfssync_net_broadcast_file(pathname);
+
+ // watch moved, not used
+ } else if( event->mask & IN_MOVE_SELF ) {
+#if DEBUG_INOTIFY
+ printf("IN_MOVE_SELF\n");
+#endif
+ // not needed (we can rely on IN_MOVED_FROM and IN_MOVED_TO)
+
+ // file/symlink/directory moved
+ //
+ // only from: delete the file/symlink/directory
+ // only to: create the file/symlink/directory
+ // both: mv the file/symlink/directory
+ //
+ } else if( event->mask & IN_MOVED_FROM ) {
+#if DEBUG_INOTIFY
+ printf("IN_MOVED_FROM\n");
+#endif
+ // set the cookie
+ vhffsfssync_cookie.id = event->cookie;
+ vhffsfssync_cookie.from = strdup(pathname);
+ vhffsfssync_cookie.isdir = !!( event->mask & IN_ISDIR );
+
+ } else if( event->mask & IN_MOVED_TO ) {
+#if DEBUG_INOTIFY
+ printf("IN_MOVED_TO\n");
+#endif
+ // mv
+ if(vhffsfssync_cookie.id == event->cookie) {
+#if DEBUG_INOTIFY
+ printf("==> MOVE %s -> %s (used cookie %d)\n", vhffsfssync_cookie.from, pathname, vhffsfssync_cookie.id);
+#endif
+ if( vhffsfssync_cookie.isdir )
+ vhffsfssync_modify_watch(inotifyfd, vhffsfssync_cookie.from, pathname);
+
+ vhffsfssync_net_broadcast_string( g_strdup_printf("move\x1F%s\x1F%s\n", vhffsfssync_cookie.from, pathname) , VHFFSFSSYNC_NET_PRIO_MEDIUM);
+ vhffsfssync_cookie.id = 0;
+ free(vhffsfssync_cookie.from);
+ }
+ // create
+ else {
+ vhffsfssync_manage_event_create(inotifyfd, pathname, TRUE);
+ }
+
+ // watch deleted, clean it
+ } else if( event->mask & IN_IGNORED ) {
+#if DEBUG_INOTIFY
+ printf("IN_IGNORED\n");
+#endif
+ vhffsfssync_del_watch(inotifyfd, NULL, event->wd);
+
+ // this event is not handled, this should not happen
+ } else {
+#if DEBUG_INOTIFY
+ printf("OOOOOOOPPPSS!!!!!\n");
+#endif
+ }
+
+ free(pathname);
+ return 0;
+}
+
+
+int vhffsfssync_fake_events_recursively(int inotifyfd, char *pathname) {
+ DIR *d;
+ struct dirent *dir;
+
+ d = opendir(pathname);
+ if(d) {
+ while( (dir = readdir(d)) ) {
+ if( strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
+ char *path;
+ char *sep = "/";
+
+ if( pathname[strlen(pathname)-1] == '/' ) sep = "";
+ path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
+
+ // recursivity is done through vhffsfssync_manage_event_create()
+ // which calls this function
+ vhffsfssync_manage_event_create(inotifyfd, path, TRUE);
+ free(path);
+ }
+ }
+ closedir(d);
+ }
+
+ return 0;
+}
+
+
+int main(int argc, char *argv[]) {
+
+ char *root;
+ int inotifyfd, flags;
+ int wd;
+
+ int listenfd, opt;
+ uint32_t bindaddr;
+ uint16_t bindport;
+ struct sockaddr_in src;
+
+
+ /* chdir() to the filesystem to monitor */
+ root = argv[1];
+ if( chdir(root) < 0) {
+ fprintf(stderr, "cannot chdir() to %s: %s\n", root, strerror(errno));
+ return -1;
+ }
+ root = ".";
+#if DEBUG_INOTIFY
+ printf("Monitoring %s\n", root);
+#endif
+
+ /* -- inotify stuff -- */
+
+ vhffsfssync_wd_to_path = g_hash_table_new_full(g_int_hash, g_int_equal, g_free, g_free);
+ vhffsfssync_path_to_wd = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, NULL);
+ vhffsfssync_cookie.id = 0;
+ vhffsfssync_cookie.from = NULL;
+ vhffsfssync_openfiles = 0;
+
+ inotifyfd = inotify_init();
+
+ /* set inotifyfd to non-blocking */
+ flags = fcntl(inotifyfd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(inotifyfd, F_SETFL, flags);
+ }
+
+ wd = vhffsfssync_add_watch_recursively(inotifyfd, root, VHFFSFSSYNC_WATCH_MASK);
+ if(wd < 0) {
+ fprintf(stderr, "Maximum number of watches probably reached, consider adding more or fixing what is being wrong before running me again (strace is your friend)... byebye!\n");
+ return -1;
+ }
+
+
+ /* -- network stuff -- */
+ vhffsfssync_conns = NULL;
+
+ signal(SIGPIPE, SIG_IGN);
+
+ /* listening for network connections */
+ if( (listenfd = socket(AF_INET, SOCK_STREAM, 0) ) < 0) {
+ fprintf(stderr, "socket() failed: %s\n", strerror(errno));
+ return -1;
+ }
+
+ /* set listenfd to non-blocking */
+ flags = fcntl(listenfd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(listenfd, F_SETFL, flags);
+ }
+
+ /* add the ability to listen on a TIME_WAIT */
+ opt = 1;
+ if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt) ) ) {
+ fprintf(stderr, "setsockopt() failed on socket %d: %s\n", listenfd, strerror(errno));
+ }
+
+ bindaddr = INADDR_ANY;
+ bindport = 4567;
+ src.sin_addr.s_addr = bindaddr;
+ src.sin_family = AF_INET;
+ src.sin_port = htons(bindport);
+ if( bind(listenfd, (struct sockaddr*)&src, sizeof(src) ) < 0) {
+ fprintf(stderr, "bind() failed on socket %d: %s\n", listenfd, strerror(errno));
+ return -1;
+ }
+
+ if( listen(listenfd, SOMAXCONN) < 0) {
+ fprintf(stderr, "listen() failed on socket %d: %s\n", listenfd, strerror(errno));
+ return -1;
+ }
+
+#if DEBUG_NET
+ printf("Listening on %s:%d\n", inet_ntoa(src.sin_addr), bindport);
+#endif
+
+ /* -- main loop -- */
+ while(1) {
+ int max_fd = 0;
+ fd_set readfs;
+ fd_set writefs;
+ //struct timeval tv;
+ GList *conns;
+ int ret;
+
+ FD_ZERO(&readfs);
+ FD_ZERO(&writefs);
+
+ /* inotify events */
+ FD_SET(inotifyfd, &readfs);
+ if(inotifyfd > max_fd) max_fd = inotifyfd;
+
+ /* new connections */
+ FD_SET(listenfd, &readfs);
+ if(listenfd > max_fd) max_fd = listenfd;
+
+ /* connections */
+ for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
+ vhffsfssync_conn *conn = conns->data;
+ conns = g_list_next(conns);
+
+ /* this connnection was disabled, destroy it */
+ if(conn->fd < 0) {
+ vhffsfssync_net_conn_destroy(conn);
+ continue;
+ }
+
+ FD_SET(conn->fd, &readfs);
+ if(conn->messages) FD_SET(conn->fd, &writefs);
+ if(conn->fd > max_fd) max_fd = conn->fd;
+ }
+
+ //tv.tv_sec = 3600;
+ //tv.tv_usec = 0;
+ ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
+ if(ret < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ }
+ }
+ if(ret > 0) {
+ /* inotify events */
+ if( FD_ISSET(inotifyfd, &readfs) ) {
+ char buf[VHFFSFSSYNC_BUF_LEN];
+ ssize_t len;
+
+ len = read(inotifyfd, buf, VHFFSFSSYNC_BUF_LEN);
+ if(len < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "read() failed on inotify fd(%d): %s\n", inotifyfd, strerror(errno));
+ }
+ }
+ else {
+ char *cur = buf;
+ while(len > 0) {
+ int register next;
+ struct inotify_event *ie;
+
+ ie = (struct inotify_event*)cur;
+ vhffsfssync_manage_event( inotifyfd, ie );
+ next = sizeof(struct inotify_event);
+ next += ie->len;
+ len -= next;
+ cur += next;
+ }
+ }
+ }
+
+ /* new connections */
+ if( FD_ISSET(listenfd, &readfs) ) {
+ struct sockaddr_in addr;
+ int newfd, flags;
+ socklen_t addr_len;
+ vhffsfssync_conn *conn;
+ //struct hostent *ent;
+
+ addr_len = sizeof(addr);
+ newfd = accept(listenfd, (struct sockaddr*)&addr, &addr_len);
+ if(newfd <= 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "accept() failed on listen fd(%d): %s\n", listenfd, strerror(errno));
+ }
+ }
+ else {
+ //char *grosbuf;
+ //grosbuf=malloc(104857600);
+ //memset(grosbuf, 'A', 104857600);
+
+ // We don't need the reverse DNS, the code here is for learning purpose
+ //ent = gethostbyaddr((const void*)&addr.sin_addr, sizeof(sizeof(addr.sin_addr)), AF_INET);
+ //if(ent) printf("And you are %s\n", ent->h_name);
+
+ /* set newfd to non-blocking */
+ flags = fcntl(newfd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(newfd, F_SETFL, flags);
+ }
+
+ /* register the connection */
+ conn = malloc(sizeof(vhffsfssync_conn));
+ conn->fd = newfd;
+ conn->sockaddr = addr;
+ conn->order = 0;
+ //conn->recvbuf = NULL;
+ //conn->recvbuf_cur = 0;
+ //conn->recvbuf_size = 0;
+ //conn->sendbuf = NULL;
+ //conn->sendbuf_cur = 0;
+ //conn->sendbuf_size = 0;
+ conn->messages = NULL;
+ vhffsfssync_conns = g_list_append(vhffsfssync_conns, conn);
+#if DEBUG_NET
+ printf("Welcome %s ! (using fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+ vhffsfssync_net_send_string(conn, strdup("hello\n"), VHFFSFSSYNC_NET_PRIO_HIGHEST);
+/*
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'A', 10485760);
+ vhffsfssync_net_send_data(conn, grosbuf, 1048576, 3);
+
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'B', 10485760);
+ vhffsfssync_net_send_data(conn, grosbuf, 1048576, 2);
+
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'C', 10485760);
+ vhffsfssync_net_send_data(conn, grosbuf, 1048576, 1);
+
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'D', 10485760);
+ vhffsfssync_net_send_data(conn, grosbuf, 1048576, 0);
+*/
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 1);
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 1);
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
+ //vhffsfssync_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
+ //vhffsfssync_net_write(conn, "Salut!\n", strlen("Salut!\n"));
+ //vhffsfssync_net_write(conn, grosbuf, 104857600);
+ }
+ }
+
+ /* connections */
+ for(conns = g_list_first(vhffsfssync_conns) ; conns ; ) {
+ vhffsfssync_conn *conn = conns->data;
+ conns = g_list_next(conns);
+
+ /* data to read ?, give give ! */
+ if( FD_ISSET(conn->fd, &readfs) ) {
+ char buf[VHFFSFSSYNC_BUF_LEN];
+ ssize_t len;
+
+ len = read(conn->fd, buf, VHFFSFSSYNC_BUF_LEN);
+ if(len < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "read() failed on socket %d: %s\n", conn->fd, strerror(errno));
+ vhffsfssync_net_conn_disable(conn);
+ }
+ }
+ else if(len == 0) {
+ vhffsfssync_net_conn_disable(conn);
+ }
+ else {
+ char *plop = malloc(len+1);
+ memcpy(plop, buf, len);
+ plop[len] = '\0';
+ printf("%s", plop);
+ free(plop);
+ }
+ }
+
+ /* try to send more data */
+ if(conn && conn->messages && FD_ISSET(conn->fd, &writefs) ) {
+ vhffsfssync_net_send(conn);
+ }
+ }
+ }
+ }
+
+ return 0;
+}
Copied: trunk/vhffs-fssync/vhffsfssync_slave.c (from rev 1264, trunk/vhffs-fssync/client.c)
===================================================================
--- trunk/vhffs-fssync/vhffsfssync_slave.c (rev 0)
+++ trunk/vhffs-fssync/vhffsfssync_slave.c 2008-10-07 11:13:12 UTC (rev 1265)
@@ -0,0 +1,431 @@
+#define _FILE_OFFSET_BITS 64
+#define _ATFILE_SOURCE
+
+#define DEBUG_NET 0
+#define DEBUG_EVENTS 0
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <dirent.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <glib.h>
+#include <sys/select.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+
+
+/* -- network stuff -- */
+// huge buffer size reduce syscalls
+#define VHFFSFSSYNC_NET_BUF_LEN 262144
+
+typedef struct {
+ int fd;
+ struct sockaddr_in sockaddr;
+
+ char buf[VHFFSFSSYNC_NET_BUF_LEN];
+ uint32_t buf_len;
+ char *buf_cur;
+
+ FILE *chunk_file;
+ size_t chunk_stilltoread;
+} vhffsfssync_conn;
+
+// protos
+int vhffsfssync_remove(char *pathname);
+int vhffsfssync_mkdir(char *pathname, mode_t mode);
+int vhffsfssync_event(vhffsfssync_conn *conn, char *event);
+int vhffsfssync_parse(vhffsfssync_conn *conn);
+
+
+/* ---------------------------------------- */
+int vhffsfssync_remove(char *pathname) {
+ struct stat st;
+
+ if(! lstat(pathname, &st) ) {
+
+ if( S_ISDIR(st.st_mode) ) {
+ DIR *d;
+ struct dirent *dir;
+
+ d = opendir(pathname);
+ if(d) {
+ while( (dir = readdir(d)) ) {
+ if( strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
+ char *path;
+ char *sep = "/";
+ if( pathname[strlen(pathname)-1] == '/' ) sep = "";
+ path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
+ vhffsfssync_remove(path);
+ free(path);
+ }
+ }
+ closedir(d);
+ if( rmdir(pathname) < 0) {
+ fprintf(stderr, "cannot rmdir() '%s': %s\n", pathname, strerror(errno));
+ }
+ }
+ else {
+ fprintf(stderr, "cannot opendir() '%s': %s\n", pathname, strerror(errno));
+ }
+ }
+ else {
+ if( unlink(pathname) < 0) {
+ fprintf(stderr, "cannot unlink() '%s': %s\n", pathname, strerror(errno));
+ }
+ }
+ }
+ else {
+ if(errno != ENOENT) {
+ fprintf(stderr, "cannot lstat() '%s': %s\n", pathname, strerror(errno));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+int vhffsfssync_mkdir(char *pathname, mode_t mode) {
+ char *cur, *dirs[64];
+ int i, fd, fd_, argc;
+
+ argc = 0;
+ cur = pathname;
+ while(*cur != '\0' && argc < 64) {
+ for( ; *cur != '/' && *cur != '\0' ; cur++ );
+ dirs[argc++] = pathname;
+ if( *cur == '/' ) {
+ *cur = '\0';
+ pathname = ++cur;
+ }
+ }
+
+ fd = AT_FDCWD;
+ for(i = 0 ; i < argc ; i++) {
+ mkdirat(fd, dirs[i], 0755);
+ fd_ = openat(fd, dirs[i], 0);
+ if(fd >= 0) close(fd);
+ fd = fd_;
+ if(fd < 0) {
+ fprintf(stderr, "openat() failed on %s: %s\n", dirs[i], strerror(errno));
+ break;
+ }
+ }
+ if(fd >= 0) close(fd);
+ return 0;
+}
+
+
+int vhffsfssync_event(vhffsfssync_conn *conn, char *event) {
+ char *cur, *args[10];
+ int argc;
+
+ argc = 0;
+ cur = event;
+ while(*cur != '\0' && argc < 10) {
+ for( ; *cur != '\x1F' && *cur != '\0' ; cur++ );
+ args[argc++] = event;
+ if( *cur == '\x1F' ) {
+ *cur = '\0';
+ event = ++cur;
+ }
+ }
+
+ if(!argc) return -1;
+
+#if DEBUG_EVENTS
+ int i;
+ for(i = 0 ; i < argc ; i++) {
+ printf("%s ", args[i]);
+ }
+ printf("\n");
+#endif
+
+ if(!strcmp(args[0], "remove")) {
+ char *pathname = args[1];
+ vhffsfssync_remove(pathname);
+ }
+ else if(!strcmp(args[0], "create")) {
+ char *pathname = args[1];
+ int fd;
+
+ fd = open(pathname, O_CREAT|O_WRONLY|O_TRUNC, 0644);
+ if(fd >= 0) {
+ close(fd);
+ }
+ else {
+ fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
+ }
+ }
+ else if(!strcmp(args[0], "mkdir")) {
+ char *path = args[1];
+ vhffsfssync_mkdir(path, 0755);
+ }
+ else if(!strcmp(args[0], "symlink")) {
+ char *from = args[1];
+ char *to = args[2];
+ symlink(to, from);
+ }
+ else if(!strcmp(args[0], "move")) {
+ char *from = args[1];
+ char *to = args[2];
+ rename(from, to);
+ }
+ else if(!strcmp(args[0], "write")) {
+ char *pathname = args[1];
+ off_t offset = atoll(args[2]);
+ size_t size = atoll(args[3]);
+ int fd;
+
+ conn->chunk_stilltoread = size;
+
+ //printf("FILE: %s %lld %d\n", pathname, offset, size);
+ if(size > 0) {
+ fd = open(pathname, O_CREAT|O_WRONLY, 0644);
+ if(fd >= 0) {
+ conn->chunk_file = fdopen(fd, "w");
+ if(conn->chunk_file) {
+ if( fseeko(conn->chunk_file, offset, SEEK_SET) < 0 ) {
+ fprintf(stderr, "fseeko() on %lld failed on file %s: %s\n", offset, pathname, strerror(errno));
+ fclose(conn->chunk_file);
+ conn->chunk_file = NULL;
+ }
+ }
+ else {
+ fprintf(stderr, "fdopen() failed on %s: %s\n", pathname, strerror(errno));
+ close(fd);
+ }
+ }
+ else {
+ fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
+ }
+ }
+ else {
+ fd = open(pathname, O_CREAT|O_WRONLY|O_TRUNC, 0644);
+ if(fd >= 0) {
+ close(fd);
+ }
+ else {
+ fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
+ }
+ }
+ }
+ else if(!strcmp(args[0], "hello")) {
+ // hello too !
+ }
+ else {
+ fprintf(stderr, "Received unhandled event: %s\n", args[0]);
+ exit(1);
+ }
+
+ return 0;
+}
+
+
+int vhffsfssync_parse(vhffsfssync_conn *conn) {
+ char *cur, *end;
+
+ cur = conn->buf;
+ end = conn->buf + conn->buf_len; //beware: end can be outside the buffer, you should NOT read *end
+
+ //printf("Buffer %d, stilltoread: %d\n", conn->buf_len, conn->chunk_stilltoread);
+
+ while(cur < end) {
+
+ // text mode
+ if(!conn->chunk_stilltoread) {
+ char *begin;
+ for(begin = cur ; cur < end && *cur != '\n' ; cur++ );
+
+ if( *cur == '\n' ) {
+ *cur = '\0';
+ vhffsfssync_event(conn, begin);
+ begin = ++cur;
+ }
+
+ if(cur == end) {
+ register uint32_t len;
+ len = end - begin;
+ //printf("Not parsed %d\n", len);
+
+ // buffer is full and we didn't manage to fetch everything
+ if(len == VHFFSFSSYNC_NET_BUF_LEN) {
+ fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
+ conn->buf_cur = conn->buf;
+ conn->buf_len = 0;
+ break;
+ }
+
+ // copy the data that is not parsed to the begin of the buffer
+ memcpy(conn->buf, begin, len);
+ conn->buf_cur = conn->buf + len;
+ conn->buf_len = len;
+ break;
+ }
+ }
+
+ // binary mode (receiving a chunk)
+ else {
+ size_t canread = MIN(conn->chunk_stilltoread, end-cur);
+ conn->chunk_stilltoread -= canread;
+#if DEBUG_EVENTS
+ printf("binary mode: read: %d stilltoread: %d\n", canread, conn->chunk_stilltoread);
+#endif
+ if(conn->chunk_file) {
+ /* TODO: handle errors and use a bigger nmemb */
+ size_t len = fwrite(cur, 1, canread, conn->chunk_file);
+#if DEBUG_EVENTS
+ printf(" written=%d\n", len);
+#endif
+ if(len != canread) {
+ /* TODO: Handle errors */
+ }
+ if(!conn->chunk_stilltoread) {
+#if DEBUG_EVENTS
+ printf(" closing file\n");
+#endif
+ fclose(conn->chunk_file);
+ conn->chunk_file = NULL;
+ }
+ }
+ cur += canread;
+
+ // all the data have been read, resetting buffer
+ if(cur == end) {
+ conn->buf_cur = conn->buf;
+ conn->buf_len = 0;
+ break;
+ }
+ }
+ }
+ return 0;
+}
+
+
+int main(int argc, char *argv[]) {
+
+ char *root, *host;
+ int flags, port;
+ vhffsfssync_conn *conn;
+
+ /* chdir() to the filesystem to write the data */
+ root = argv[1];
+ if( chdir(root) < 0) {
+ fprintf(stderr, "cannot chdir() to %s: %s\n", root, strerror(errno));
+ return -1;
+ }
+ root = ".";
+
+ /* -- network stuff -- */
+ host = argv[2];
+ port = atoi(argv[3]);
+ signal(SIGPIPE, SIG_IGN);
+ conn = malloc(sizeof(vhffsfssync_conn));
+
+ /* -- main loop -- */
+ while(1) {
+ conn->fd = -1;
+ conn->buf_len = 0;
+ conn->buf_cur = conn->buf;
+ conn->chunk_stilltoread = 0;
+ conn->chunk_file = NULL;
+
+ /* connect */
+ inet_aton(host, &conn->sockaddr.sin_addr);
+ conn->sockaddr.sin_family = AF_INET;
+ conn->sockaddr.sin_port = htons(port);
+
+ conn->fd = socket(conn->sockaddr.sin_family, SOCK_STREAM, IPPROTO_IP);
+ if(conn->fd < 0) {
+ fprintf(stderr, "socket() failed: %s\n", strerror(errno));
+ return -1;
+ }
+
+ if( connect(conn->fd, (struct sockaddr*)&conn->sockaddr, sizeof(conn->sockaddr)) < 0 ) {
+
+ fprintf(stderr, "connect() failed: %s\n", strerror(errno));
+ shutdown(conn->fd, SHUT_RDWR);
+ close(conn->fd);
+ conn->fd = -1;
+ sleep(1);
+ continue;
+ }
+
+ /* set newfd to non-blocking */
+ flags = fcntl(conn->fd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(conn->fd, F_SETFL, flags);
+ }
+
+ /* connected */
+ while(conn->fd >= 0) {
+ int max_fd = 0;
+ fd_set readfs;
+ fd_set writefs;
+ //struct timeval tv;
+ int ret;
+
+ FD_ZERO(&readfs);
+ FD_ZERO(&writefs);
+
+ FD_SET(conn->fd, &readfs);
+ if(conn->fd > max_fd) max_fd = conn->fd;
+
+ //tv.tv_sec = 3600;
+ //tv.tv_usec = 0;
+ ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
+ if(ret < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ }
+ }
+ if(ret > 0) {
+ /* data to read ?, give give ! */
+ if(FD_ISSET(conn->fd, &readfs) ) {
+ ssize_t len;
+
+ len = read(conn->fd, conn->buf_cur, VHFFSFSSYNC_NET_BUF_LEN - (conn->buf_cur - conn->buf) );
+ if(len < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "read() failed on socket %d: %s\n", conn->fd, strerror(errno));
+ conn->fd = -1;
+ break;
+ }
+ }
+ else if(len == 0) {
+#if DEBUG_NET
+ printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+ conn->fd = -1;
+ }
+ else {
+ //printf("Read %d\n", len);
+ conn->buf_len += len;
+ vhffsfssync_parse(conn);
+ }
+ }
+ }
+ }
+ }
+
+ return 0;
+}