[vhffs-dev] [1261] Add a new tool for VHFFS, it allows synching file-systems over networks |
[ Thread Index |
Date Index
| More vhffs.org/vhffs-dev Archives
]
Revision: 1261
Author: gradator
Date: 2008-10-07 00:55:19 +0200 (Tue, 07 Oct 2008)
Log Message:
-----------
Add a new tool for VHFFS, it allows synching file-systems over networks
in best-effort !
Designed for downloads repositories it doesn't currently scale a lot if
files are oftenly modified.
It doesn't support uid/gid/mode/acl and will probably never do.
It doesn't support anything else than regular files, directories and
symlink, and will probably never support the others.
It uses inotify(7), as a result it scales a lot even with billions of files
and directories.
This is a monoprocess/monothread software featuring non-blocking I/O and
zerocopy ways, so it reduces to the maximum possible the number of
context-switchs.
Events are permanently scheduled (prioritized, ordered) based on what is
happening.
The network layer scales to tens-gigabits per second.
It is just a great piece of software for synching slaves downloads
servers (mirrors) from a hidden master server.
Added Paths:
-----------
trunk/vhffs-fssync/
trunk/vhffs-fssync/client.c
trunk/vhffs-fssync/getevents.c
Added: trunk/vhffs-fssync/client.c
===================================================================
--- trunk/vhffs-fssync/client.c (rev 0)
+++ trunk/vhffs-fssync/client.c 2008-10-06 22:55:19 UTC (rev 1261)
@@ -0,0 +1,420 @@
+#define _FILE_OFFSET_BITS 64
+#define _ATFILE_SOURCE
+
+#define DEBUG_NET 0
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <dirent.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <glib.h>
+#include <sys/select.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+
+
+/* -- network stuff -- */
+// huge buffer size reduce syscalls
+#define COUCOU_NET_BUF_LEN 262144
+
+typedef struct {
+ int fd;
+ struct sockaddr_in sockaddr;
+
+ char buf[COUCOU_NET_BUF_LEN];
+ uint32_t buf_len;
+ char *buf_cur;
+
+ FILE *chunk_file;
+ size_t chunk_stilltoread;
+} coucou_conn;
+
+// protos
+int coucou_remove(char *pathname);
+int coucou_mkdir(char *pathname, mode_t mode);
+int coucou_event(coucou_conn *conn, char *event);
+int coucou_parse(coucou_conn *conn);
+
+
+/* ---------------------------------------- */
+int coucou_remove(char *pathname) {
+ struct stat st;
+
+ if(! lstat(pathname, &st) ) {
+
+ if( S_ISDIR(st.st_mode) ) {
+ DIR *d;
+ struct dirent *dir;
+
+ d = opendir(pathname);
+ if(d) {
+ while( (dir = readdir(d)) ) {
+ if( strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
+ char *path;
+ char *sep = "/";
+ if( pathname[strlen(pathname)-1] == '/' ) sep = "";
+ path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
+ coucou_remove(path);
+ free(path);
+ }
+ }
+ closedir(d);
+ if( rmdir(pathname) < 0) {
+ fprintf(stderr, "cannot rmdir() '%s': %s\n", pathname, strerror(errno));
+ }
+ }
+ else {
+ fprintf(stderr, "cannot opendir() '%s': %s\n", pathname, strerror(errno));
+ }
+ }
+ else {
+ if( unlink(pathname) < 0) {
+ fprintf(stderr, "cannot unlink() '%s': %s\n", pathname, strerror(errno));
+ }
+ }
+ }
+ else {
+ if(errno != ENOENT) {
+ fprintf(stderr, "cannot lstat() '%s': %s\n", pathname, strerror(errno));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+int coucou_mkdir(char *pathname, mode_t mode) {
+ char *cur, *dirs[64];
+ int i, fd, fd_, argc;
+
+ argc = 0;
+ cur = pathname;
+ while(*cur != '\0' && argc < 64) {
+ for( ; *cur != '/' && *cur != '\0' ; cur++ );
+ dirs[argc++] = pathname;
+ if( *cur == '/' ) {
+ *cur = '\0';
+ pathname = ++cur;
+ }
+ }
+
+ fd = AT_FDCWD;
+ for(i = 0 ; i < argc ; i++) {
+ mkdirat(fd, dirs[i], 0755);
+ fd_ = openat(fd, dirs[i], 0);
+ if(fd >= 0) close(fd);
+ fd = fd_;
+ if(fd < 0) {
+ fprintf(stderr, "openat() failed on %s: %s\n", dirs[i], strerror(errno));
+ break;
+ }
+ }
+ if(fd >= 0) close(fd);
+ return 0;
+}
+
+
+int coucou_event(coucou_conn *conn, char *event) {
+ char *cur, *args[10];
+ int argc;
+ int i;
+
+ argc = 0;
+ cur = event;
+ while(*cur != '\0' && argc < 10) {
+ for( ; *cur != '\x1F' && *cur != '\0' ; cur++ );
+ args[argc++] = event;
+ if( *cur == '\x1F' ) {
+ *cur = '\0';
+ event = ++cur;
+ }
+ }
+
+ if(!argc) return -1;
+
+ for(i = 0 ; i < argc ; i++) {
+ printf("%s ", args[i]);
+ }
+ printf("\n");
+
+ if(!strcmp(args[0], "remove")) {
+ char *pathname = args[1];
+ coucou_remove(pathname);
+ }
+ else if(!strcmp(args[0], "create")) {
+ char *pathname = args[1];
+ int fd;
+
+ fd = open(pathname, O_CREAT|O_WRONLY|O_TRUNC, 0644);
+ if(fd >= 0) {
+ close(fd);
+ }
+ else {
+ fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
+ }
+ }
+ else if(!strcmp(args[0], "mkdir")) {
+ char *path = args[1];
+ coucou_mkdir(path, 0755);
+ }
+ else if(!strcmp(args[0], "symlink")) {
+ char *from = args[1];
+ char *to = args[2];
+ symlink(to, from);
+ }
+ else if(!strcmp(args[0], "move")) {
+ char *from = args[1];
+ char *to = args[2];
+ rename(from, to);
+ }
+ else if(!strcmp(args[0], "write")) {
+ char *pathname = args[1];
+ off_t offset = atoll(args[2]);
+ size_t size = atoll(args[3]);
+ int fd;
+
+ conn->chunk_stilltoread = size;
+
+ //printf("FILE: %s %lld %d\n", pathname, offset, size);
+ if(size > 0) {
+ fd = open(pathname, O_CREAT|O_WRONLY, 0644);
+ if(fd >= 0) {
+ conn->chunk_file = fdopen(fd, "w");
+ if(conn->chunk_file) {
+ if( fseeko(conn->chunk_file, offset, SEEK_SET) < 0 ) {
+ fprintf(stderr, "fseeko() on %lld failed on file %s: %s\n", offset, pathname, strerror(errno));
+ fclose(conn->chunk_file);
+ conn->chunk_file = NULL;
+ }
+ }
+ else {
+ fprintf(stderr, "fdopen() failed on %s: %s\n", pathname, strerror(errno));
+ close(fd);
+ }
+ }
+ else {
+ fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
+ }
+ }
+ else {
+ fd = open(pathname, O_CREAT|O_WRONLY|O_TRUNC, 0644);
+ if(fd >= 0) {
+ close(fd);
+ }
+ else {
+ fprintf(stderr, "open() failed on %s: %s\n", pathname, strerror(errno));
+ }
+ }
+ }
+ else if(!strcmp(args[0], "hello")) {
+ // hello too !
+ }
+ else {
+ fprintf(stderr, "Received unhandled event: %s\n", args[0]);
+ exit(1);
+ }
+
+ return 0;
+}
+
+
+int coucou_parse(coucou_conn *conn) {
+ char *cur, *end;
+
+ cur = conn->buf;
+ end = conn->buf + conn->buf_len; //beware: end can be outside the buffer, you should NOT read *end
+
+ //printf("Buffer %d, stilltoread: %d\n", conn->buf_len, conn->chunk_stilltoread);
+
+ while(cur < end) {
+
+ // text mode
+ if(!conn->chunk_stilltoread) {
+ char *begin;
+ for(begin = cur ; cur < end && *cur != '\n' ; cur++ );
+
+ if( *cur == '\n' ) {
+ *cur = '\0';
+ coucou_event(conn, begin);
+ begin = ++cur;
+ }
+
+ if(cur == end) {
+ register uint32_t len;
+ len = end - begin;
+ //printf("Not parsed %d\n", len);
+
+ // buffer is full and we didn't manage to fetch everything
+ if(len == COUCOU_NET_BUF_LEN) {
+ fprintf(stderr, "The buffer is not large enough, throwing away the content\n");
+ conn->buf_cur = conn->buf;
+ conn->buf_len = 0;
+ break;
+ }
+
+ // copy the data that is not parsed to the begin of the buffer
+ memcpy(conn->buf, begin, len);
+ conn->buf_cur = conn->buf + len;
+ conn->buf_len = len;
+ break;
+ }
+ }
+
+ // binary mode (receiving a chunk)
+ else {
+ size_t canread = MIN(conn->chunk_stilltoread, end-cur);
+ conn->chunk_stilltoread -= canread;
+
+ printf("binary mode: read: %d stilltoread: %d\n", canread, conn->chunk_stilltoread);
+ if(conn->chunk_file) {
+ /* TODO: handle errors and use a bigger nmemb */
+ size_t len = fwrite(cur, 1, canread, conn->chunk_file);
+ printf(" written=%d\n", len);
+ if(!conn->chunk_stilltoread) {
+ printf("closing file\n");
+ fclose(conn->chunk_file);
+ conn->chunk_file = NULL;
+ }
+ }
+ cur += canread;
+
+ // all the data have been read, resetting buffer
+ if(cur == end) {
+ conn->buf_cur = conn->buf;
+ conn->buf_len = 0;
+ break;
+ }
+ }
+ }
+ return 0;
+}
+
+
+int main(int argc, char *argv[]) {
+
+ char *root, *host;
+ int flags, port;
+ coucou_conn *conn;
+
+ /* chdir() to the filesystem to write the data */
+ root = argv[1];
+ if( chdir(root) < 0) {
+ fprintf(stderr, "cannot chdir() to %s: %s\n", root, strerror(errno));
+ return -1;
+ }
+ root = ".";
+
+ /* -- network stuff -- */
+ host = argv[2];
+ port = atoi(argv[3]);
+ signal(SIGPIPE, SIG_IGN);
+ conn = malloc(sizeof(coucou_conn));
+
+ /* -- main loop -- */
+ while(1) {
+ conn->fd = -1;
+ conn->buf_len = 0;
+ conn->buf_cur = conn->buf;
+ conn->chunk_stilltoread = 0;
+ conn->chunk_file = NULL;
+
+ /* connect */
+ inet_aton(host, &conn->sockaddr.sin_addr);
+ conn->sockaddr.sin_family = AF_INET;
+ conn->sockaddr.sin_port = htons(port);
+
+ conn->fd = socket(conn->sockaddr.sin_family, SOCK_STREAM, IPPROTO_IP);
+ if(conn->fd < 0) {
+ fprintf(stderr, "socket() failed: %s\n", strerror(errno));
+ return -1;
+ }
+
+ if( connect(conn->fd, (struct sockaddr*)&conn->sockaddr, sizeof(conn->sockaddr)) < 0 ) {
+
+ fprintf(stderr, "connect() failed: %s\n", strerror(errno));
+ shutdown(conn->fd, SHUT_RDWR);
+ close(conn->fd);
+ conn->fd = -1;
+ sleep(1);
+ continue;
+ }
+
+ /* set newfd to non-blocking */
+ flags = fcntl(conn->fd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(conn->fd, F_SETFL, flags);
+ }
+
+ /* connected */
+ while(conn->fd >= 0) {
+ int max_fd = 0;
+ fd_set readfs;
+ fd_set writefs;
+ //struct timeval tv;
+ int ret;
+
+ FD_ZERO(&readfs);
+ FD_ZERO(&writefs);
+
+ FD_SET(conn->fd, &readfs);
+ if(conn->fd > max_fd) max_fd = conn->fd;
+
+ //tv.tv_sec = 3600;
+ //tv.tv_usec = 0;
+ ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
+ if(ret < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ }
+ }
+ if(ret > 0) {
+ /* data to read ?, give give ! */
+ if(FD_ISSET(conn->fd, &readfs) ) {
+ ssize_t len;
+
+ len = read(conn->fd, conn->buf_cur, COUCOU_NET_BUF_LEN - (conn->buf_cur - conn->buf) );
+ if(len < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "read() failed on socket fd: %s\n", strerror(errno));
+ conn->fd = -1;
+ break;
+ }
+ }
+ else if(len == 0) {
+#if DEBUG_NET
+ printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+ conn->fd = -1;
+ }
+ else {
+ //printf("Read %d\n", len);
+ conn->buf_len += len;
+ coucou_parse(conn);
+ }
+ }
+ }
+ }
+ }
+
+ return 0;
+}
Added: trunk/vhffs-fssync/getevents.c
===================================================================
--- trunk/vhffs-fssync/getevents.c (rev 0)
+++ trunk/vhffs-fssync/getevents.c 2008-10-06 22:55:19 UTC (rev 1261)
@@ -0,0 +1,1327 @@
+#define _FILE_OFFSET_BITS 64
+
+#define DEBUG_NET 0
+#define DEBUG_INOTIFY 1
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <dirent.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/inotify.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <glib.h>
+#include <sys/select.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <sys/sendfile.h>
+
+
+/* TODO: vérifier qu'un nom de fichier ne contient pas un \x1F, sinon c'est le segfault assuré du client */
+/* convertir les \x1F vers un espace lors de l'émission de l'évent ? */
+
+/* -- inotify stuff -- */
+
+#define COUCOU_BUF_LEN 4096
+#define COUCOU_WATCH_MASK IN_ATTRIB|IN_CLOSE_WRITE|IN_CREATE|IN_DELETE|IN_DELETE_SELF|IN_MODIFY|IN_MOVE_SELF|IN_MOVED_FROM|IN_MOVED_TO|IN_DONT_FOLLOW|IN_ONLYDIR
+// Not used: IN_ACCESS, IN_CLOSE_NOWRITE, IN_OPEN
+
+// the maximum number of files simultaneously opened
+// huge values offer better performance
+// actually it is MAX = COUCOU_MAX_OPENFILES + number of clients
+#define COUCOU_MAX_OPENFILES 512
+
+// each monitor entry is associated with a path, we need to keep it to compute the path
+//char **coucou_wd_to_path = NULL;
+//int coucou_wd_to_path_len = 0; // number of allocated paths
+GHashTable *coucou_wd_to_path;
+GHashTable *coucou_path_to_wd;
+
+// return a timestamp in ms (it loops for 100000 sec)
+/*inline int coucou_timestamp() {
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ return (tv.tv_sec%100000)*1000+tv.tv_usec/1000;
+}*/
+
+struct coucou_cookie {
+ uint32_t id;
+ char *from;
+ gboolean isdir;
+};
+static struct coucou_cookie coucou_cookie;
+
+int coucou_openfiles;
+
+// protos
+int coucou_add_watch(int inotifyfd, const char *pathname, uint32_t mask);
+int coucou_del_watch(int inotifyfd, const char *pathname, int wd);
+int coucou_add_watch_recursively(int inotifyfd, const char *pathname, uint32_t mask);
+int coucou_manage_event_create(int inotifyfd, char *pathname, gboolean sendfile);
+int coucou_manage_event(int inotifyfd, struct inotify_event *event);
+int coucou_fake_events_recursively(int inotifyfd, char *pathname);
+
+
+/* -- network stuff -- */
+#define COUCOU_NET_MESSAGE_FILE_CHUNK 65536
+
+GList *coucou_conns;
+
+typedef struct {
+ int fd;
+ struct sockaddr_in sockaddr;
+ GList *messages;
+ uint32_t order;
+// char *recvbuf;
+// ssize_t recvbuf_cur;
+// ssize_t recvbuf_size;
+// char *sendbuf;
+// ssize_t sendbuf_cur;
+// ssize_t sendbuf_size;
+} coucou_conn;
+
+
+/* message - generic */
+typedef unsigned short int msg_family_t;
+enum {
+ COUCOU_NET_MESSAGE_UNSPEC=0,
+ COUCOU_NET_MESSAGE_DATA,
+ COUCOU_NET_MESSAGE_FILE
+};
+
+/* message - priorities */
+enum {
+ COUCOU_NET_PRIO_HIGHEST=100, // values < 100 may be used internally, please don't set anything below 100 or die!
+ COUCOU_NET_PRIO_HIGH, // values >= 1000 are used for files, don't use them too
+ COUCOU_NET_PRIO_MEDIUM,
+ COUCOU_NET_PRIO_LOW,
+ COUCOU_NET_PRIO_LOWEST
+};
+
+
+#define __COUCOU_NET_MESSAGE_COMMON(msg_prefix) \
+ msg_family_t msg_prefix##family; \
+ uint32_t msg_prefix##priority; \
+ uint32_t msg_prefix##order
+
+#define __COUCOU_NET_MESSAGE_COMMON_SIZE ( sizeof (msg_family_t) + sizeof(uint32_t) + sizeof(uint32_t) )
+
+typedef struct {
+ __COUCOU_NET_MESSAGE_COMMON(msg_);
+ char msg_data[200];
+} coucou_net_message;
+
+/* message - common data */
+typedef struct {
+ __COUCOU_NET_MESSAGE_COMMON(data_);
+ char *data_buffer;
+ ssize_t data_len;
+ ssize_t data_cur;
+
+ /* pad to size of `coucou_net_message' */
+ unsigned char sin_zero[ sizeof(coucou_net_message)
+ - __COUCOU_NET_MESSAGE_COMMON_SIZE
+ - sizeof(char*)
+ - sizeof(ssize_t)
+ - sizeof(ssize_t) ];
+} coucou_net_message_data;
+
+/* message - file */
+typedef struct {
+ __COUCOU_NET_MESSAGE_COMMON(file_);
+ FILE *file_stream;
+ off_t file_offset;
+ off_t file_size;
+ off_t file_chunksize;
+ off_t file_chunkcur;
+ char *file_pathname;
+
+ /* pad to size of `coucou_net_message' */
+ unsigned char sin_zero[ sizeof(coucou_net_message)
+ - __COUCOU_NET_MESSAGE_COMMON_SIZE
+ - sizeof(FILE*)
+ - sizeof(off_t)
+ - sizeof(off_t)
+ - sizeof(off_t)
+ - sizeof(off_t)
+ - sizeof(char*) ];
+} coucou_net_message_file;
+
+
+// protos
+void coucou_net_conn_destroy(coucou_conn *conn);
+inline coucou_net_message *coucou_net_new_message(coucou_conn *conn, msg_family_t family, uint32_t priority);
+gint coucou_net_message_insert_compare(gconstpointer a, gconstpointer b);
+inline void coucou_net_destroy_message(coucou_conn *conn, coucou_net_message *msg);
+int coucou_net_send_data(coucou_conn *conn, char *data, ssize_t len, uint32_t priority);
+void coucou_net_destroy_data(coucou_conn *conn, coucou_net_message_data *datamsg);
+inline int coucou_net_send_string(coucou_conn *conn, char *data, uint32_t priority) ;
+void coucou_net_broadcast_string(char *data, uint32_t priority);
+int coucou_net_send_file(coucou_conn *conn, char *pathname);
+void coucou_net_destroy_file(coucou_conn *conn, coucou_net_message_file *filemsg);
+FILE *coucou_net_file_open(coucou_conn *conn, const char *pathname, const char *mode);
+int coucou_net_file_close(coucou_conn *conn, FILE *stream);
+int coucou_net_remove_file(coucou_conn *conn, char *pathname);
+void coucou_net_broadcast_file(char *pathname);
+int coucou_net_send(coucou_conn *conn);
+
+
+/* ----------------------------------------- */
+
+void coucou_net_conn_destroy(coucou_conn *conn) {
+ GList *msgs;
+
+ coucou_conns = g_list_remove(coucou_conns, conn);
+
+ if(conn->fd >= 0) {
+ shutdown(conn->fd, SHUT_RDWR);
+ close(conn->fd);
+ }
+
+ while( (msgs = g_list_first(conn->messages)) ) {
+ coucou_net_destroy_message(conn, (coucou_net_message*)msgs->data );
+ }
+
+ free(conn);
+}
+
+
+/* -- network stuff -- */
+inline coucou_net_message *coucou_net_new_message(coucou_conn *conn, msg_family_t family, uint32_t priority) {
+ coucou_net_message *msg;
+ msg = malloc( sizeof(coucou_net_message) );
+ msg->msg_family = family;
+ msg->msg_priority = priority;
+ msg->msg_order = conn->order++;
+ return msg;
+}
+
+
+gint coucou_net_message_insert_compare(gconstpointer a, gconstpointer b) {
+ coucou_net_message *first = (coucou_net_message*)a;
+ coucou_net_message *second = (coucou_net_message*)b;
+
+ // lowest priority is preferred
+ if(first->msg_priority != second->msg_priority)
+ return first->msg_priority - second->msg_priority;
+
+ // lowest order is preferred
+ return first->msg_order - second->msg_order;
+}
+
+
+inline void coucou_net_destroy_message(coucou_conn *conn, coucou_net_message *msg) {
+ if(msg->msg_family == COUCOU_NET_MESSAGE_DATA) {
+ coucou_net_destroy_data(conn, (coucou_net_message_data*)msg);
+ }
+ else if(msg->msg_family == COUCOU_NET_MESSAGE_FILE) {
+ coucou_net_destroy_file(conn, (coucou_net_message_file*)msg);
+ }
+}
+
+
+// !!!!!! the buffer is freed when the message has been sent, DON'T send static string and DON'T free() the data yourself
+int coucou_net_send_data(coucou_conn *conn, char *data, ssize_t len, uint32_t priority) {
+ coucou_net_message_data *msg;
+ if(!conn || !data || len <= 0) return -1;
+
+ msg = (coucou_net_message_data*)coucou_net_new_message(conn, COUCOU_NET_MESSAGE_DATA, priority);
+ msg->data_buffer = data;
+ msg->data_len = len;
+ msg->data_cur = 0;
+ conn->messages = g_list_insert_sorted(conn->messages, msg, coucou_net_message_insert_compare);
+
+ // try to send the message right away
+ coucou_net_send(conn);
+ return 0;
+}
+
+
+void coucou_net_destroy_data(coucou_conn *conn, coucou_net_message_data *datamsg) {
+ conn->messages = g_list_remove(conn->messages, (coucou_net_message*)datamsg);
+ free(datamsg->data_buffer);
+ free(datamsg);
+}
+
+
+inline int coucou_net_send_string(coucou_conn *conn, char *data, uint32_t priority) {
+ return coucou_net_send_data(conn, data, strlen(data), priority);
+}
+
+
+void coucou_net_broadcast_string(char *data, uint32_t priority) {
+
+ GList *conns;
+ for(conns = g_list_first(coucou_conns) ; conns ; ) {
+ coucou_conn *conn = conns->data;
+ conns = g_list_next(conns);
+
+ coucou_net_send_string(conn, strdup(data), priority);
+ }
+ free(data);
+}
+
+
+// prototype is simple, files are always the lowest of the lowest priority messages
+int coucou_net_send_file(coucou_conn *conn, char *pathname) {
+ coucou_net_message_file *msg;
+ struct stat st;
+ uint32_t maxprio = -1; // 4294967295
+ FILE *stream;
+
+ if(!conn || !pathname) {
+ return -1;
+ }
+
+ if( lstat(pathname, &st) < 0 ) {
+ fprintf(stderr, "lstat() failed on %s: %s\n", pathname, strerror(errno));
+ return -1;
+ }
+
+ /* only copy regular files */
+ if(! S_ISREG(st.st_mode) ) {
+ fprintf(stderr, "%s is not a regular file\n", pathname);
+ return -1;
+ }
+
+ stream = coucou_net_file_open(conn, pathname, "r");
+ if(!stream) {
+ return -1;
+ }
+
+ // if the file is being sent, cancel it
+ coucou_net_remove_file(conn, pathname);
+ //printf("%d SENDING FILE %s\n", conn->fd, pathname);
+ coucou_net_send_string( conn, g_strdup_printf("create\x1F%s\n", pathname) , COUCOU_NET_PRIO_MEDIUM);
+
+ // the size of the file is the priority (small files are sent with more priority)
+ // but don't set the priority too low, low value can be used for anything else
+ msg = (coucou_net_message_file*)coucou_net_new_message(conn, COUCOU_NET_MESSAGE_FILE, MAX(MIN(st.st_size, maxprio),1000) );
+ msg->file_stream = stream;
+ msg->file_offset = 0;
+ msg->file_pathname = strdup(pathname);
+ msg->file_size = st.st_size;
+ msg->file_chunksize = -1;
+ msg->file_chunkcur = 0;
+ conn->messages = g_list_insert_sorted(conn->messages, msg, coucou_net_message_insert_compare);
+
+ // try to send the file right away
+ coucou_net_send(conn);
+ return 0;
+}
+
+
+void coucou_net_destroy_file(coucou_conn *conn, coucou_net_message_file *filemsg) {
+ conn->messages = g_list_remove(conn->messages, (coucou_net_message*)filemsg);
+
+ /* we need to finish the chunk anyway */
+ if(filemsg->file_chunksize > 0) {
+ off_t len = filemsg->file_chunksize - filemsg->file_chunkcur;
+ if(len > 0) {
+ char *data = malloc(len);
+ memset(data, 0, len);
+ coucou_net_send_data(conn, data, len, 0);
+ }
+ }
+
+ coucou_net_file_close(conn, filemsg->file_stream);
+ free(filemsg->file_pathname);
+ free(filemsg);
+}
+
+
+FILE *coucou_net_file_open(coucou_conn *conn, const char *pathname, const char *mode) {
+ FILE *stream;
+
+ if(!conn || !pathname || !mode)
+ return NULL;
+
+ stream = fopen(pathname, mode);
+ if(!stream) {
+ fprintf(stderr, "fopen() failed on %s: %s\n", pathname, strerror(errno));
+ return stream;
+ }
+
+ coucou_openfiles++;
+ // do we need to free a file fd ?
+ if(coucou_openfiles > COUCOU_MAX_OPENFILES) {
+ GList *msgs;
+ // we prefer to fclose() the fd of the msg with the lowest priority
+ for(msgs = g_list_last(conn->messages) ; msgs ; ) {
+ coucou_net_message *msg = msgs->data;
+ msgs = g_list_previous(msgs);
+
+ if(msg->msg_family == COUCOU_NET_MESSAGE_FILE) {
+ coucou_net_message_file *filemsg = (coucou_net_message_file*)msg;
+
+ if(filemsg->file_stream && filemsg->file_chunksize < 0) {
+ coucou_net_file_close(conn, filemsg->file_stream);
+ filemsg->file_stream = NULL;
+ break;
+ }
+ }
+ }
+ }
+
+ return stream;
+}
+
+
+int coucou_net_file_close(coucou_conn *conn, FILE *stream) {
+ int r;
+
+ if(!stream)
+ return -1;
+
+ r = fclose(stream);
+ if( r ) {
+ fprintf(stderr, "fclose() failed: %s\n", strerror(errno));
+ return r;
+ }
+ coucou_openfiles--;
+ return r;
+}
+
+
+void coucou_net_broadcast_file(char *pathname) {
+
+ GList *conns;
+ for(conns = g_list_first(coucou_conns) ; conns ; ) {
+ coucou_conn *conn = conns->data;
+ conns = g_list_next(conns);
+ coucou_net_send_file(conn, pathname);
+ }
+}
+
+
+int coucou_net_remove_file(coucou_conn *conn, char *pathname) {
+ GList *msgs;
+ for(msgs = g_list_first(conn->messages) ; msgs ; ) {
+ coucou_net_message *msg = msgs->data;
+ msgs = g_list_next(msgs);
+
+ if(msg->msg_family == COUCOU_NET_MESSAGE_FILE) {
+ coucou_net_message_file *filemsg = (coucou_net_message_file*)msg;
+ if(!strcmp(filemsg->file_pathname, pathname)) {
+ //printf("%d CANCELLING %s\n", conn->fd, pathname);
+ coucou_net_destroy_file(conn, filemsg);
+ }
+ }
+ }
+ return 0;
+}
+
+
+int coucou_net_send(coucou_conn *conn) {
+ GList *msgs;
+ gboolean full = FALSE;
+
+ if(conn->fd < 0) return -1;
+#if DEBUG_NET
+ printf("--------------------------------------------------\n");
+ printf("conn: %d, to: %s\n", conn->fd, inet_ntoa(conn->sockaddr.sin_addr));
+#endif
+ while(!full && (msgs = g_list_first(conn->messages)) ) {
+ coucou_net_message *msg = msgs->data;
+#if DEBUG_NET
+ printf(" family: %d , priority: %d , order: %d\n", msg->msg_family, msg->msg_priority, msg->msg_order);
+#endif
+ // data
+ if(msg->msg_family == COUCOU_NET_MESSAGE_DATA) {
+ coucou_net_message_data *datamsg;
+ ssize_t written;
+ ssize_t lentowrite;
+
+ // we need to make sure that the message will not be truncated, we set the current message to the highest priority
+ msg->msg_priority = 0;
+
+ datamsg = (coucou_net_message_data*)msg;
+#if DEBUG_NET
+ printf(" buffer: %d bytes, %d already written\n", datamsg->data_len, datamsg->data_cur);
+#endif
+ /* try to empty the buffer */
+ lentowrite = datamsg->data_len - datamsg->data_cur;
+ written = write(conn->fd, datamsg->data_buffer + datamsg->data_cur, lentowrite);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+#if DEBUG_NET
+ printf("=====> EAGAIN on write()\n");
+#endif
+ full = TRUE;
+ break;
+ default:
+ fprintf(stderr, "write() failed on socket fd: %s\n", strerror(errno));
+ coucou_net_conn_destroy(conn);
+ }
+ }
+ else {
+ datamsg->data_cur += written;
+#if DEBUG_NET
+ printf(" %d bytes written on %d bytes\n", written, lentowrite);
+#endif
+ /* the buffer is not empty yet (but the SendQ into the kernel is) */
+ if(written < lentowrite) {
+ full = TRUE;
+ }
+ /* buffer is now empty */
+ else {
+ coucou_net_destroy_data(conn, datamsg);
+ }
+ }
+ }
+
+ // file
+ else if(msg->msg_family == COUCOU_NET_MESSAGE_FILE) {
+ coucou_net_message_file *filemsg;
+ ssize_t written;
+ off_t lentowrite;
+
+ filemsg = (coucou_net_message_file*)msg;
+#if DEBUG_NET
+ printf(" file: %s, offset = %lld, size = %lld\n", filemsg->file_pathname, (long long int)filemsg->file_offset, (long long int)filemsg->file_size);
+#endif
+ /* new chunk */
+ if(filemsg->file_chunksize < 0) {
+ if(!filemsg->file_stream) {
+ filemsg->file_stream = coucou_net_file_open(conn, filemsg->file_pathname, "r");
+ if(!filemsg->file_stream) {
+ coucou_net_destroy_file(conn, filemsg);
+ break;
+ }
+ }
+
+ lentowrite = filemsg->file_chunksize = MIN(COUCOU_NET_MESSAGE_FILE_CHUNK, filemsg->file_size - filemsg->file_offset);
+ filemsg->file_chunkcur = 0;
+ // we need to make sure that the chunk will not be truncated, we set the current message to the highest priority
+ msg->msg_priority = 1;
+ coucou_net_send_string(conn, g_strdup_printf("write\x1F%s\x1F%lld\x1F%lld\n", filemsg->file_pathname, (long long int)filemsg->file_offset, (long long int)filemsg->file_chunksize) , 0);
+ // coucou_net_send_string() called coucou_net_send() too, we really need to break here
+ break;
+ }
+ /* the previous chunk was partially sent */
+ else {
+ lentowrite = filemsg->file_chunksize - filemsg->file_chunkcur;
+ }
+
+ /* try to send the file */
+ written = sendfile(conn->fd, fileno(filemsg->file_stream), &filemsg->file_offset, lentowrite);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+#if DEBUG_NET
+ printf("=====> EAGAIN on sendfile()\n");
+#endif
+ full = TRUE;
+ break;
+ default:
+ fprintf(stderr, "sendfile() failed from file %s to socket: %s\n", filemsg->file_pathname, strerror(errno));
+ coucou_net_conn_destroy(conn);
+ }
+ }
+ else {
+#if DEBUG_NET
+ printf(" %d bytes written, we are at offset %lld\n", written, filemsg->file_offset);
+#endif
+ filemsg->file_chunkcur += written;
+
+ /* end of file or file completed */
+ if( written == 0 || filemsg->file_offset == filemsg->file_size ) {
+ coucou_net_destroy_file(conn, filemsg);
+ }
+
+ /* the chunk is not fully sent yet */
+ else if(written < lentowrite) {
+ full = TRUE;
+ }
+
+ /* the chunk is sent */
+ else if(written == lentowrite) {
+ uint32_t maxprio = -1; // 4294967295
+
+ filemsg->file_chunksize = -1;
+ filemsg->file_chunkcur = 0;
+
+ // reschedule this file to a nicer priority
+ msg->msg_priority = MAX(MIN(filemsg->file_size - filemsg->file_offset, maxprio),1000);
+ conn->messages = g_list_remove(conn->messages, msg);
+ conn->messages = g_list_insert_sorted(conn->messages, msg, coucou_net_message_insert_compare);
+ }
+ }
+ }
+
+ // I don't want to stay in this jail
+ else full = TRUE;
+ }
+
+ return 0;
+}
+
+
+#if 0
+int coucou_net_write(coucou_conn *conn, char *buffer, ssize_t len) {
+
+ ssize_t written = -1;
+
+ /* new data to send */
+ if(buffer && len > 0) {
+ written = 0;
+ // buffer is empty, try to send the data directly (it avoids copying the data twice)
+ if(conn->sendbuf_size == 0) {
+ written = write(conn->fd, buffer, len);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ printf("=====> EAGAIN on write()\n");
+ break;
+ default:
+ fprintf(stderr, "write() failed on socket fd: %s\n", strerror(errno));
+ }
+ written = 0;
+ }
+ }
+
+ printf("%d bytes written on %d bytes\n", written, len);
+
+ /* this is the case if the buffer is not empty, or if the write failed, or if the write didn't write everything */
+ if(written < len) {
+ ssize_t lentobuf = len-written;
+ printf("buffering %d bytes\n", lentobuf);
+
+ /* As you may have noticed, the buffer is only growing as long as it still
+ * contains data to be sent, here we try to deal with that, as it needs data
+ * to be copied, we hope that it is not going to happen very often
+ */
+
+ conn->sendbuf = realloc(conn->sendbuf, conn->sendbuf_size+lentobuf);
+ memcpy(conn->sendbuf+conn->sendbuf_size, buffer+written, lentobuf);
+ conn->sendbuf_size += lentobuf;
+ printf("buffer size = %d\n", conn->sendbuf_size);
+ }
+ }
+
+ /* try to empty the buffer */
+ if(!buffer && len == 0 && conn->sendbuf_size > 0) {
+ ssize_t lentowrite = conn->sendbuf_size - conn->sendbuf_cur;
+ written = write(conn->fd, conn->sendbuf + conn->sendbuf_cur, lentowrite);
+ if(written < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ printf("=====> EAGAIN on write()\n");
+ break;
+ default:
+ fprintf(stderr, "write() failed on socket fd: %s\n", strerror(errno));
+ }
+ written = 0;
+ }
+
+ if(written > 0) {
+ printf("%d bytes written on %d bytes\n", written, lentowrite);
+
+ /* the buffer is not empty yet */
+ if(written < lentowrite) {
+ conn->sendbuf_cur += written;
+ }
+ /* buffer is now empty */
+ else {
+ printf("buffer is now empty\n");
+ free(conn->sendbuf);
+ conn->sendbuf = NULL;
+ conn->sendbuf_cur = 0;
+ conn->sendbuf_size = 0;
+ }
+ }
+ }
+
+ return written;
+}
+#endif
+
+
+
+/* -- inotify stuff -- */
+
+int coucou_add_watch(int inotifyfd, const char *pathname, uint32_t mask) {
+
+ int wd;
+ int *_wd;
+ char *_pathname;
+
+ if( g_hash_table_lookup(coucou_path_to_wd, pathname) ) {
+ return -1;
+ }
+
+ wd = inotify_add_watch(inotifyfd, pathname, mask);
+ if(wd < 0) {
+ if(errno == ENOSPC) {
+ fprintf(stderr, "Maximum number of watches reached, consider adding more...\n");
+ }
+ return wd;
+ }
+
+ _wd = g_new(int, 1);
+ *_wd = wd;
+ _pathname = g_strdup(pathname);
+ g_hash_table_insert(coucou_wd_to_path, _wd, _pathname);
+ g_hash_table_insert(coucou_path_to_wd, _pathname, _wd);
+
+// if(wd >= coucou_wd_to_path_len) {
+// coucou_wd_to_path_len = ( (wd >>10) +1) <<10;
+// coucou_wd_to_path = realloc( coucou_wd_to_path, coucou_wd_to_path_len * sizeof(void*) );
+// }
+// coucou_wd_to_path[wd] = strdup(pathname);
+#if DEBUG_INOTIFY
+ printf("+ %d %s\n", wd, pathname);
+#endif
+ if(g_hash_table_size(coucou_wd_to_path) != g_hash_table_size(coucou_path_to_wd)) {
+ exit(-1);
+ }
+ return wd;
+}
+
+
+int coucou_del_watch(int inotifyfd, const char *pathname, int wd) {
+
+ if(!pathname && wd > 0) {
+ pathname = (char*)g_hash_table_lookup(coucou_wd_to_path, &wd);
+ // this wd has already been deleted
+ if(!pathname) return -1;
+ }
+ else if(pathname && wd == 0) {
+ int *_wd;
+ _wd = g_hash_table_lookup(coucou_path_to_wd, pathname);
+ // this wd has already been deleted
+ if(!_wd) return -1;
+ wd = *_wd;
+ }
+ else return -1;
+#if DEBUG_INOTIFY
+ printf("- %d %s\n", wd, pathname);
+#endif
+ g_hash_table_remove(coucou_path_to_wd, pathname);
+ g_hash_table_remove(coucou_wd_to_path, &wd);
+ if(g_hash_table_size(coucou_wd_to_path) != g_hash_table_size(coucou_path_to_wd)) {
+ exit(-1);
+ }
+ return inotify_rm_watch(inotifyfd, wd);
+}
+
+
+int coucou_modify_watch(int inotifyfd, const char *from, const char *to) {
+
+ int wd;
+ int *_wd;
+ char *_to;
+
+ _wd = g_hash_table_lookup(coucou_path_to_wd, from);
+ if(!_wd) return -1;
+ wd = *_wd;
+
+ g_hash_table_remove(coucou_path_to_wd, from);
+ g_hash_table_remove(coucou_wd_to_path, _wd);
+
+ _wd = g_new(int, 1);
+ *_wd = wd;
+ _to = strdup(to);
+ g_hash_table_insert(coucou_wd_to_path, _wd, _to);
+ g_hash_table_insert(coucou_path_to_wd, _to, _wd);
+
+#if DEBUG_INOTIFY
+ printf("= %d %s -> %s\n", wd, from, to);
+#endif
+ return 0;
+}
+
+
+int coucou_add_watch_recursively(int inotifyfd, const char *pathname, uint32_t mask) {
+
+ int wd;
+ DIR *d;
+ struct dirent *dir;
+
+ wd = coucou_add_watch(inotifyfd, pathname, mask);
+ if(wd < 0) return wd;
+
+ d = opendir(pathname);
+ if(d) {
+ while( (dir = readdir(d)) ) {
+ if(dir->d_type == DT_DIR && strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
+ char *path;
+ char *sep = "/";
+ if( pathname[strlen(pathname)-1] == '/' ) sep = "";
+ path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
+ wd = coucou_add_watch_recursively(inotifyfd, path, mask);
+ free(path);
+ }
+ }
+ closedir(d);
+ }
+
+ return wd;
+}
+
+
+int coucou_manage_event_remove(int inotifyfd, char *pathname) {
+ GList *conns;
+ int *_wd;
+
+#if DEBUG_INOTIFY
+ printf("==> REMOVE %s\n", pathname);
+#endif
+
+ if( (_wd = g_hash_table_lookup(coucou_path_to_wd, pathname)) ) {
+ coucou_del_watch(inotifyfd, NULL, *_wd);
+ }
+
+ /* connections */
+ for(conns = g_list_first(coucou_conns) ; conns ; ) {
+ coucou_conn *conn = conns->data;
+ conns = g_list_next(conns);
+ coucou_net_remove_file(conn, pathname);
+ coucou_net_send_string(conn, g_strdup_printf("remove\x1F%s\n", pathname) , COUCOU_NET_PRIO_MEDIUM);
+ }
+ return 0;
+}
+
+
+int coucou_manage_event_create(int inotifyfd, char *pathname, gboolean sendfile) {
+ struct stat st;
+
+ if(! lstat(pathname, &st) ) {
+
+ if( S_ISREG(st.st_mode) ) {
+#if DEBUG_INOTIFY
+ printf("==> CREATE %s\n", pathname);
+#endif
+ coucou_net_broadcast_string( g_strdup_printf("create\x1F%s\n", pathname) , COUCOU_NET_PRIO_MEDIUM);
+ if(sendfile && st.st_size > 0) {
+ coucou_net_broadcast_file(pathname);
+ }
+ }
+
+ if( S_ISDIR(st.st_mode) ) {
+#if DEBUG_INOTIFY
+ printf("==> MKDIR %s\n", pathname);
+#endif
+ coucou_add_watch(inotifyfd, pathname, COUCOU_WATCH_MASK);
+ coucou_net_broadcast_string( g_strdup_printf("mkdir\x1F%s\n", pathname) , COUCOU_NET_PRIO_MEDIUM);
+ /* there is a short delay between the mkdir() and the add_watch(),
+ we need to send events about the data which have already been written */
+ coucou_fake_events_recursively( inotifyfd, pathname );
+ }
+
+ else if( S_ISLNK(st.st_mode) ) {
+ char *linkto;
+ int ret;
+ linkto = malloc(st.st_size +1);
+ ret = readlink(pathname, linkto, st.st_size);
+ if( ret >= 0 ) {
+ linkto[st.st_size] = '\0';
+#if DEBUG_INOTIFY
+ printf("==> SYMLINK %s -> %s\n", pathname, linkto);
+#endif
+ coucou_net_broadcast_string( g_strdup_printf("symlink\x1F%s\x1F%s\n", pathname, linkto) , COUCOU_NET_PRIO_MEDIUM);
+ }
+ free(linkto);
+ if(ret < 0) {
+ if(errno == ENOENT) {
+ // file already disappeared (common for temporary files)
+ coucou_manage_event_remove(inotifyfd, pathname);
+ } else {
+ fprintf(stderr, "cannot readlink() '%s': %s\n", pathname, strerror(errno));
+ return -1;
+ }
+ }
+
+ }
+ /* we don't need other file types (chr, block, fifo, socket, ...) */
+ }
+ else {
+ if(errno == ENOENT) {
+ // file already disappeared (common for temporary files)
+ coucou_manage_event_remove(inotifyfd, pathname);
+ } else {
+ fprintf(stderr, "cannot lstat() '%s': %s\n", pathname, strerror(errno));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+int coucou_manage_event(int inotifyfd, struct inotify_event *event) {
+
+ char *dirpath, *pathname;
+#if DEBUG_INOTIFY
+ printf("wd=%d mask=%x cookie=%d len=%d", event->wd, event->mask, event->cookie, event->len);
+ if(event->len > 0) printf(" name=%s", event->name);
+ printf("\n");
+#endif
+
+ if(event->wd < 0) {
+ fprintf(stderr, "Maximum number of events reached, some events are lost\n");
+ return -1;
+ }
+
+ dirpath = (char*)g_hash_table_lookup(coucou_wd_to_path, &event->wd);
+ // this wd has been deleted
+ if(!dirpath) return -1;
+
+ if(event->len > 0) {
+ pathname = g_strdup_printf("%s/%s", dirpath, event->name);
+ } else {
+ pathname = strdup(dirpath);
+ }
+
+ // this event is not waiting for a cookie, delete file if necessary (IN_MOVED_FROM not followed with IN_MOVED_TO)
+ if( !(event->mask & IN_MOVED_TO) && coucou_cookie.id ) {
+
+ coucou_manage_event_remove(inotifyfd, coucou_cookie.from);
+ coucou_cookie.id = 0;
+ free(coucou_cookie.from);
+ }
+
+ // useless at the moment (chmod, chown, touch, ...)
+ if( event->mask & IN_ATTRIB ) {
+#if DEBUG_INOTIFY
+ printf("IN_ATTRIB\n");
+#endif
+
+ // new file, directory, or symlink
+ } else if( event->mask & IN_CREATE ) {
+#if DEBUG_INOTIFY
+ printf("IN_CREATE\n");
+#endif
+ coucou_manage_event_create(inotifyfd, pathname, FALSE);
+
+ // deleted file, directory or symlink
+ } else if( event->mask & IN_DELETE ) {
+#if DEBUG_INOTIFY
+ printf("IN_DELETE\n");
+#endif
+ coucou_manage_event_remove(inotifyfd, pathname);
+
+ // watch deleted, not used
+ } else if( event->mask & IN_DELETE_SELF ) {
+#if DEBUG_INOTIFY
+ printf("IN_DELETE_SELF\n");
+#endif
+ // We don't send REMOVE here because the dir can be deleted before the
+ // event was added, in this case the add_watch failed to monitor this dir
+ // and we'll not receive a IN_DELETE_SELF for it
+ //
+ //implicit deletion (IN_IGNORED will follow this event)
+ //coucou_del_watch(inotifyfd, event->wd);
+
+ // file modified
+ } else if( event->mask & IN_MODIFY ) {
+#if DEBUG_INOTIFY
+ printf("IN_MODIFY\n");
+ /* we can send the data here */
+ printf("==> SEND %s\n", pathname);
+#endif
+ // file modified and closed
+ } else if( event->mask & IN_CLOSE_WRITE ) {
+#if DEBUG_INOTIFY
+ printf("IN_CLOSE_WRITE\n");
+ /* we must send the data here */
+ printf("==> SEND %s\n", pathname);
+#endif
+ coucou_net_broadcast_file(pathname);
+
+ // watch moved, not used
+ } else if( event->mask & IN_MOVE_SELF ) {
+#if DEBUG_INOTIFY
+ printf("IN_MOVE_SELF\n");
+#endif
+ // not needed (we can rely on IN_MOVED_FROM and IN_MOVED_TO)
+
+ // file/symlink/directory moved
+ //
+ // only from: delete the file/symlink/directory
+ // only to: create the file/symlink/directory
+ // both: mv the file/symlink/directory
+ //
+ } else if( event->mask & IN_MOVED_FROM ) {
+#if DEBUG_INOTIFY
+ printf("IN_MOVED_FROM\n");
+#endif
+ // set the cookie
+ coucou_cookie.id = event->cookie;
+ coucou_cookie.from = strdup(pathname);
+ coucou_cookie.isdir = !!( event->mask & IN_ISDIR );
+
+ } else if( event->mask & IN_MOVED_TO ) {
+#if DEBUG_INOTIFY
+ printf("IN_MOVED_TO\n");
+#endif
+ // mv
+ if(coucou_cookie.id == event->cookie) {
+#if DEBUG_INOTIFY
+ printf("==> MOVE %s -> %s (used cookie %d)\n", coucou_cookie.from, pathname, coucou_cookie.id);
+#endif
+ if( coucou_cookie.isdir )
+ coucou_modify_watch(inotifyfd, coucou_cookie.from, pathname);
+
+ coucou_net_broadcast_string( g_strdup_printf("move\x1F%s\x1F%s\n", coucou_cookie.from, pathname) , COUCOU_NET_PRIO_MEDIUM);
+ coucou_cookie.id = 0;
+ free(coucou_cookie.from);
+ }
+ // create
+ else {
+ coucou_manage_event_create(inotifyfd, pathname, TRUE);
+ }
+
+ // watch deleted, clean it
+ } else if( event->mask & IN_IGNORED ) {
+#if DEBUG_INOTIFY
+ printf("IN_IGNORED\n");
+#endif
+ coucou_del_watch(inotifyfd, NULL, event->wd);
+
+ // this event is not handled, this should not happen
+ } else {
+#if DEBUG_INOTIFY
+ printf("OOOOOOOPPPSS!!!!!\n");
+#endif
+ }
+
+ free(pathname);
+ return 0;
+}
+
+
+int coucou_fake_events_recursively(int inotifyfd, char *pathname) {
+ DIR *d;
+ struct dirent *dir;
+
+ d = opendir(pathname);
+ if(d) {
+ while( (dir = readdir(d)) ) {
+ if( strcmp(dir->d_name, ".") && strcmp(dir->d_name, "..") ) {
+ char *path;
+ char *sep = "/";
+
+ if( pathname[strlen(pathname)-1] == '/' ) sep = "";
+ path = g_strdup_printf("%s%s%s", pathname, sep, dir->d_name);
+
+ // recursivity is done through coucou_manage_event_create()
+ // which calls this function
+ coucou_manage_event_create(inotifyfd, path, TRUE);
+ free(path);
+ }
+ }
+ closedir(d);
+ }
+
+ return 0;
+}
+
+
+int main(int argc, char *argv[]) {
+
+ char *root;
+ int inotifyfd, flags;
+ int wd;
+
+ int listenfd, opt;
+ uint32_t bindaddr;
+ uint16_t bindport;
+ struct sockaddr_in src;
+
+
+ /* chdir() to the filesystem to monitor */
+ root = argv[1];
+ if( chdir(root) < 0) {
+ fprintf(stderr, "cannot chdir() to %s: %s\n", root, strerror(errno));
+ return -1;
+ }
+ root = ".";
+#if DEBUG_INOTIFY
+ printf("Monitoring %s\n", root);
+#endif
+
+ /* -- inotify stuff -- */
+
+ coucou_wd_to_path = g_hash_table_new_full(g_int_hash, g_int_equal, g_free, g_free);
+ coucou_path_to_wd = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, NULL);
+ coucou_cookie.id = 0;
+ coucou_cookie.from = NULL;
+ coucou_openfiles = 0;
+
+ inotifyfd = inotify_init();
+
+ /* set inotifyfd to non-blocking */
+ flags = fcntl(inotifyfd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(inotifyfd, F_SETFL, flags);
+ }
+
+ wd = coucou_add_watch_recursively(inotifyfd, root, COUCOU_WATCH_MASK);
+ if(wd < 0) {
+ fprintf(stderr, "Maximum number of watches probably reached, consider adding more or fixing what is being wrong before running me again (strace is your friend)... byebye!\n");
+ return -1;
+ }
+
+
+ /* -- network stuff -- */
+ coucou_conns = NULL;
+
+ signal(SIGPIPE, SIG_IGN);
+
+ /* listening for network connections */
+ if( (listenfd = socket(AF_INET, SOCK_STREAM, 0) ) < 0) {
+ fprintf(stderr, "socket() failed: %s\n", strerror(errno));
+ return -1;
+ }
+
+ /* set listenfd to non-blocking */
+ flags = fcntl(listenfd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(listenfd, F_SETFL, flags);
+ }
+
+ /* add the ability to listen on a TIME_WAIT */
+ opt = 1;
+ if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt) ) ) {
+ fprintf(stderr, "setsockopt() failed: %s\n", strerror(errno));
+ }
+
+ bindaddr = INADDR_ANY;
+ bindport = 4567;
+ src.sin_addr.s_addr = bindaddr;
+ src.sin_family = AF_INET;
+ src.sin_port = htons(bindport);
+ if( bind(listenfd, (struct sockaddr*)&src, sizeof(src) ) < 0) {
+ fprintf(stderr, "bind() failed: %s\n", strerror(errno));
+ return -1;
+ }
+
+ if( listen(listenfd, SOMAXCONN) < 0) {
+ fprintf(stderr, "listen() failed: %s\n", strerror(errno));
+ return -1;
+ }
+
+#if DEBUG_NET
+ printf("Listening on %s:%d\n", inet_ntoa(src.sin_addr), bindport);
+#endif
+
+ /* -- main loop -- */
+ while(1) {
+ int max_fd = 0;
+ fd_set readfs;
+ fd_set writefs;
+ //struct timeval tv;
+ GList *conns;
+ int ret;
+
+ FD_ZERO(&readfs);
+ FD_ZERO(&writefs);
+
+ /* inotify events */
+ FD_SET(inotifyfd, &readfs);
+ if(inotifyfd > max_fd) max_fd = inotifyfd;
+
+ /* new connections */
+ FD_SET(listenfd, &readfs);
+ if(listenfd > max_fd) max_fd = listenfd;
+
+ /* connections */
+ for(conns = g_list_first(coucou_conns) ; conns ; ) {
+ coucou_conn *conn = conns->data;
+ conns = g_list_next(conns);
+
+ FD_SET(conn->fd, &readfs);
+ if(conn->messages) FD_SET(conn->fd, &writefs);
+ if(conn->fd > max_fd) max_fd = conn->fd;
+ }
+
+ //tv.tv_sec = 3600;
+ //tv.tv_usec = 0;
+ ret = select(max_fd + 1, &readfs, &writefs, NULL, NULL);
+ if(ret < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ }
+ }
+ if(ret > 0) {
+ /* inotify events */
+ if( FD_ISSET(inotifyfd, &readfs) ) {
+ char buf[COUCOU_BUF_LEN];
+ ssize_t len;
+
+ len = read(inotifyfd, buf, COUCOU_BUF_LEN);
+ if(len < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "read() failed on inotify fd: %s\n", strerror(errno));
+ }
+ }
+ else {
+ char *cur = buf;
+ while(len > 0) {
+ int register next;
+ struct inotify_event *ie;
+
+ ie = (struct inotify_event*)cur;
+ coucou_manage_event( inotifyfd, ie );
+ next = sizeof(struct inotify_event);
+ next += ie->len;
+ len -= next;
+ cur += next;
+ }
+ }
+ }
+
+ /* new connections */
+ if( FD_ISSET(listenfd, &readfs) ) {
+ struct sockaddr_in addr;
+ int newfd, flags;
+ socklen_t addr_len;
+ coucou_conn *conn;
+ //struct hostent *ent;
+
+ addr_len = sizeof(addr);
+ newfd = accept(listenfd, (struct sockaddr*)&addr, &addr_len);
+ if(newfd <= 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "accept() failed on listen fd: %s\n", strerror(errno));
+ }
+ }
+ else {
+ //char *grosbuf;
+ //grosbuf=malloc(104857600);
+ //memset(grosbuf, 'A', 104857600);
+
+ // We don't need the reverse DNS, the code here is for learning purpose
+ //ent = gethostbyaddr((const void*)&addr.sin_addr, sizeof(sizeof(addr.sin_addr)), AF_INET);
+ //if(ent) printf("And you are %s\n", ent->h_name);
+
+ /* set newfd to non-blocking */
+ flags = fcntl(newfd, F_GETFL);
+ if(flags >= 0) {
+ flags |= O_NONBLOCK;
+ fcntl(newfd, F_SETFL, flags);
+ }
+
+ /* register the connection */
+ conn = malloc(sizeof(coucou_conn));
+ conn->fd = newfd;
+ conn->sockaddr = addr;
+ conn->order = 0;
+ //conn->recvbuf = NULL;
+ //conn->recvbuf_cur = 0;
+ //conn->recvbuf_size = 0;
+ //conn->sendbuf = NULL;
+ //conn->sendbuf_cur = 0;
+ //conn->sendbuf_size = 0;
+ conn->messages = NULL;
+ coucou_conns = g_list_append(coucou_conns, conn);
+#if DEBUG_NET
+ printf("Welcome %s ! (using fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+ coucou_net_send_data(conn, strdup("hello\n"), strlen("hello\n"), 2);
+/*
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'A', 10485760);
+ coucou_net_send_data(conn, grosbuf, 1048576, 3);
+
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'B', 10485760);
+ coucou_net_send_data(conn, grosbuf, 1048576, 2);
+
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'C', 10485760);
+ coucou_net_send_data(conn, grosbuf, 1048576, 1);
+
+ grosbuf=malloc(10485760);
+ memset(grosbuf, 'D', 10485760);
+ coucou_net_send_data(conn, grosbuf, 1048576, 0);
+*/
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 1);
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 1);
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 2);
+ //coucou_net_send_data(conn, strdup("Salut!\n"), strlen("Salut!\n"), 0);
+ //coucou_net_write(conn, "Salut!\n", strlen("Salut!\n"));
+ //coucou_net_write(conn, grosbuf, 104857600);
+ }
+ }
+
+ /* connections */
+ for(conns = g_list_first(coucou_conns) ; conns ; ) {
+ coucou_conn *conn = conns->data;
+ conns = g_list_next(conns);
+
+ /* data to read ?, give give ! */
+ if( FD_ISSET(conn->fd, &readfs) ) {
+ char buf[COUCOU_BUF_LEN];
+ ssize_t len;
+
+ len = read(conn->fd, buf, COUCOU_BUF_LEN);
+ if(len < 0) {
+ switch(errno) {
+ case EAGAIN:
+ case EINTR:
+ break;
+ default:
+ fprintf(stderr, "read() failed on socket fd: %s\n", strerror(errno));
+ coucou_net_conn_destroy(conn);
+ conn = NULL;
+ }
+ }
+ else if(len == 0) {
+#if DEBUG_NET
+ printf("Byebye %s... (used fd %d)\n", inet_ntoa(conn->sockaddr.sin_addr), conn->fd);
+#endif
+ coucou_net_conn_destroy(conn);
+ conn = NULL;
+ }
+ else {
+ char *plop = malloc(len+1);
+ memcpy(plop, buf, len);
+ plop[len] = '\0';
+ printf("%s", plop);
+ free(plop);
+ }
+ }
+
+ /* try to send more data */
+ if(conn && conn->messages && FD_ISSET(conn->fd, &writefs) ) {
+ coucou_net_send(conn);
+ }
+ }
+ }
+ }
+
+ return 0;
+}