patches/evolution-data-server-03-mail-rlimit.diff
author dcarbery
Fri, 24 Nov 2006 16:37:59 +0000
branch217update
changeset 19096 d542fc2c823e
parent 8398 b983e1f7b702
permissions -rw-r--r--
Merged trunk changes r9797:9829 into 217update branch.

Index: libedataserver/e-msgport.c
===================================================================
RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.c,v
retrieving revision 1.10
diff -u -p -r1.10 e-msgport.c
--- libedataserver/e-msgport.c	26 Jul 2006 18:12:08 -0000	1.10
+++ libedataserver/e-msgport.c	21 Sep 2006 16:49:41 -0000
@@ -486,29 +486,19 @@ em_cache_clear(EMCache *emc)
 }
 
 struct _EMsgPort {
-	EDList queue;
-	int condwait;		/* how many waiting in condwait */
-	union {
-		int pipe[2];	/* On Win32 actually a pair of SOCKETs */
-		struct {
-			int read;
-			int write;
-		} fd;
-	} pipe;
+	GAsyncQueue *queue;
+	EMsg *cache;
+	gint pipe[2];  /* on Win32, actually a pair of SOCKETs */
 #ifdef HAVE_NSS
-	union {
-		PRFileDesc *pipe[2];
-		struct {
-			PRFileDesc *read;
-			PRFileDesc *write;
-		} fd;
-	} prpipe;
-#endif
-	/* @#@$#$ glib stuff */
-	GCond *cond;
-	GMutex *lock;
+	PRFileDesc *prpipe[2];
+#endif
 };
 
+/* message flags */
+enum {
+	MSG_FLAG_SYNC_WITH_PIPE    = 1 << 0,
+	MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1
+};
 
 #ifdef HAVE_NSS
 static int
@@ -529,189 +519,237 @@ e_prpipe (PRFileDesc **fds)
 }
 #endif
 
-EMsgPort *e_msgport_new(void)
+static void
+msgport_sync_with_pipe (gint fd)
+{
+	gchar buffer[1];
+
+	while (fd >= 0) {
+		if (E_READ (fd, buffer, 1) > 0)
+			break;
+		else if (!E_IS_STATUS_INTR ()) {
+			g_warning ("%s: Failed to read from pipe: %s",
+				G_STRFUNC, g_strerror (errno));
+			break;
+		}
+	}
+}
+
+#ifdef HAVE_NSS
+static void
+msgport_sync_with_prpipe (PRFileDesc *prfd)
+{
+	gchar buffer[1];
+
+	while (prfd != NULL) {
+		if (PR_Read (prfd, buffer, 1) > 0)
+			break;
+		else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
+			gchar *text = g_alloca (PR_GetErrorTextLength ());
+			PR_GetErrorText (text);
+			g_warning ("%s: Failed to read from NSPR pipe: %s",
+				G_STRFUNC, text);
+			break;
+		}
+	}
+}
+#endif
+
+EMsgPort *
+e_msgport_new (void)
 {
-	EMsgPort *mp;
+	EMsgPort *msgport;
 
-	mp = g_malloc(sizeof(*mp));
-	e_dlist_init(&mp->queue);
-	mp->lock = g_mutex_new();
-	mp->cond = g_cond_new();
-	e_pipe (mp->pipe.pipe);
+	msgport = g_slice_new (EMsgPort);
+	msgport->queue = g_async_queue_new ();
+	msgport->cache = NULL;
+	msgport->pipe[0] = -1;
+	msgport->pipe[1] = -1;
 #ifdef HAVE_NSS
-	e_prpipe (mp->prpipe.pipe);
+	msgport->prpipe[0] = NULL;
+	msgport->prpipe[1] = NULL;
 #endif
-	mp->condwait = 0;
 
-	return mp;
+	return msgport;
 }
 
-void e_msgport_destroy(EMsgPort *mp)
+void
+e_msgport_destroy (EMsgPort *msgport)
 {
-	g_mutex_free(mp->lock);
-	g_cond_free(mp->cond);
-	if (mp->pipe.fd.read != -1) {
-		E_CLOSE(mp->pipe.fd.read);
-		E_CLOSE(mp->pipe.fd.write);
+	g_return_if_fail (msgport != NULL);
+
+	if (msgport->pipe[0] >= 0) {
+		E_CLOSE (msgport->pipe[0]);
+		E_CLOSE (msgport->pipe[1]);
 	}
 #ifdef HAVE_NSS
-	if (mp->prpipe.fd.read) {
-		PR_Close(mp->prpipe.fd.read);
-		PR_Close(mp->prpipe.fd.write);
+	if (msgport->prpipe[0] != NULL) {
+		PR_Close (msgport->prpipe[0]);
+		PR_Close (msgport->prpipe[1]);
 	}
 #endif
-	g_free(mp);
+
+	g_async_queue_unref (msgport->queue);
+	g_slice_free (EMsgPort, msgport);
 }
 
-/* get a fd that can be used to wait on the port asynchronously */
-int e_msgport_fd(EMsgPort *mp)
+int
+e_msgport_fd (EMsgPort *msgport)
 {
-	return mp->pipe.fd.read;
+	gint fd;
+
+	g_return_val_if_fail (msgport != NULL, -1);
+
+	g_async_queue_lock (msgport->queue);
+	fd = msgport->pipe[0];
+	if (fd < 0 && e_pipe (msgport->pipe) == 0)
+		fd = msgport->pipe[0];
+	g_async_queue_unlock (msgport->queue);
+
+	return fd;
 }
 
 #ifdef HAVE_NSS
-PRFileDesc *e_msgport_prfd(EMsgPort *mp)
+PRFileDesc *
+e_msgport_prfd (EMsgPort *msgport)
 {
-	return mp->prpipe.fd.read;
+	PRFileDesc *prfd;
+
+	g_return_val_if_fail (msgport != NULL, NULL);
+
+	g_async_queue_lock (msgport->queue);
+	prfd = msgport->prpipe[0];
+	if (prfd == NULL && e_prpipe (msgport->prpipe) == 0)
+		prfd = msgport->prpipe[0];
+	g_async_queue_unlock (msgport->queue);
+
+	return prfd;
 }
 #endif
 
-void e_msgport_put(EMsgPort *mp, EMsg *msg)
+void
+e_msgport_put (EMsgPort *msgport, EMsg *msg)
 {
+	gint fd;
 #ifdef HAVE_NSS
 	PRFileDesc *prfd;
 #endif
-	ssize_t w;
-	int fd;
-	
-	m(printf("put:\n"));
-	g_mutex_lock(mp->lock);
-	e_dlist_addtail(&mp->queue, &msg->ln);
-	if (mp->condwait > 0) {
-		m(printf("put: condwait > 0, waking up\n"));
-		g_cond_signal(mp->cond);
+
+	g_return_if_fail (msgport != NULL);
+	g_return_if_fail (msg != NULL);
+
+	g_async_queue_lock (msgport->queue);
+
+	msg->flags = 0;
+
+	fd = msgport->pipe[1];
+	while (fd >= 0) {
+		if (E_WRITE (fd, "E", 1) > 0) {
+			msg->flags |= MSG_FLAG_SYNC_WITH_PIPE;
+			break;
+		} else if (!E_IS_STATUS_INTR ()) {
+			g_warning ("%s: Failed to write to pipe: %s",
+				G_STRFUNC, g_strerror (errno));
+			break;
+		}
 	}
-	
-	fd = mp->pipe.fd.write;
-#ifdef HAVE_NSS
-	prfd = mp->prpipe.fd.write;
-#endif
-	g_mutex_unlock(mp->lock);
 
 #ifdef HAVE_NSS
-	if (prfd != NULL) {
-		m(printf("put: have pr pipe, writing notification to it\n"));
-		do {
-			w = PR_Write (prfd, "E", 1);
-		} while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
+	prfd = msgport->prpipe[1];
+	while (prfd != NULL) {
+		if (PR_Write (prfd, "E", 1) > 0) {
+			msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE;
+			break;
+		} else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
+			gchar *text = g_alloca (PR_GetErrorTextLength ());
+			PR_GetErrorText (text);
+			g_warning ("%s: Failed to write to NSPR pipe: %s",
+				G_STRFUNC, text);
+			break;
+		}
 	}
 #endif
-	if (fd != -1) {
-		m(printf("put: have pipe, writing notification to it\n"));
-		do {
-			w = E_WRITE (fd, "E", 1);
-		} while (w == -1 && E_IS_STATUS_INTR ());
-	}
 
-	m(printf("put: done\n"));
+	g_async_queue_push_unlocked (msgport->queue, msg);
+	g_async_queue_unlock (msgport->queue);
 }
 
-static void
-msgport_cleanlock(void *data)
+EMsg *
+e_msgport_wait (EMsgPort *msgport)
 {
-	EMsgPort *mp = data;
+	EMsg *msg;
 
-	g_mutex_unlock(mp->lock);
-}
+	g_return_val_if_fail (msgport != NULL, NULL);
 
-EMsg *e_msgport_wait(EMsgPort *mp)
-{
-	EMsg *msg;
+	g_async_queue_lock (msgport->queue);
 
-	m(printf("wait:\n"));
-	g_mutex_lock(mp->lock);
-	while (e_dlist_empty(&mp->queue)) {
-		if (mp->pipe.fd.read != -1) {
-			fd_set rfds;
-			int retry;
-
-			m(printf("wait: waiting on pipe\n"));
-			g_mutex_unlock(mp->lock);
-			do {
-				FD_ZERO(&rfds);
-				FD_SET(mp->pipe.fd.read, &rfds);
-				retry = E_IS_SOCKET_ERROR(select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL)) && E_IS_STATUS_INTR();
-				pthread_testcancel();
-			} while (retry);
-			g_mutex_lock(mp->lock);
-			m(printf("wait: got pipe\n"));
-#ifdef HAVE_NSS
-		} else if (mp->prpipe.fd.read != NULL) {
-			PRPollDesc rfds[1];
-			int retry;
-
-			m(printf("wait: waitng on pr pipe\n"));
-			g_mutex_unlock(mp->lock);
-			do {
-				rfds[0].fd = mp->prpipe.fd.read;
-				rfds[0].in_flags = PR_POLL_READ|PR_POLL_ERR;
-				retry = PR_Poll(rfds, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR;
-				pthread_testcancel();
-			} while (retry);
-			g_mutex_lock(mp->lock);
-			m(printf("wait: got pr pipe\n"));
-#endif /* HAVE_NSS */
-		} else {
-			m(printf("wait: waiting on condition\n"));
-			mp->condwait++;
-			/* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
-			pthread_cleanup_push(msgport_cleanlock, mp);
-			g_cond_wait(mp->cond, mp->lock);
-			pthread_cleanup_pop(0);
-			m(printf("wait: got condition\n"));
-			mp->condwait--;
-		}
+	/* check the cache first */
+	if (msgport->cache != NULL) {
+		msg = msgport->cache;
+		/* don't clear the cache */
+		g_async_queue_unlock (msgport->queue);
+		return msg;
 	}
-	msg = (EMsg *)mp->queue.head;
-	m(printf("wait: message = %p\n", msg));
-	g_mutex_unlock(mp->lock);
-	m(printf("wait: done\n"));
+
+	msg = g_async_queue_pop_unlocked (msgport->queue);
+
+	g_assert (msg != NULL);
+
+	/* The message is not actually "removed" from the EMsgPort until
+ 	 * e_msgport_get() is called.  So we cache the popped message. */
+	msgport->cache = msg;
+
+	if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
+		msgport_sync_with_pipe (msgport->pipe[0]);
+#ifdef HAVE_NSS
+	if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
+		msgport_sync_with_prpipe (msgport->prpipe[0]);
+#endif
+
+	g_async_queue_unlock (msgport->queue);
+
 	return msg;
 }
 
-EMsg *e_msgport_get(EMsgPort *mp)
+EMsg *
+e_msgport_get (EMsgPort *msgport)
 {
 	EMsg *msg;
-	char dummy[1];
-	ssize_t n;
-	
-	g_mutex_lock(mp->lock);
-	msg = (EMsg *)e_dlist_remhead(&mp->queue);
-	if (msg) {
-		if (mp->pipe.fd.read != -1) {
-			do {
-				n = E_READ (mp->pipe.fd.read, dummy, 1);
-			} while (n == -1 && E_IS_STATUS_INTR ());
-		}
+
+	g_return_val_if_fail (msgport != NULL, NULL);
+
+	g_async_queue_lock (msgport->queue);
+
+	/* check the cache first */
+	if (msgport->cache != NULL) {
+		msg = msgport->cache;
+		msgport->cache = NULL;
+		g_async_queue_unlock (msgport->queue);
+		return msg;
+	}
+
+	msg = g_async_queue_try_pop_unlocked (msgport->queue);
+
+	if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
+		msgport_sync_with_pipe (msgport->pipe[0]);
 #ifdef HAVE_NSS
-		if (mp->prpipe.fd.read != NULL) {
-			do {
-				n = PR_Read (mp->prpipe.fd.read, dummy, 1);
-			} while (n == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
-		}
+	if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
+		msgport_sync_with_prpipe (msgport->prpipe[0]);
 #endif
-	}
-	m(printf("get: message = %p\n", msg));
-	g_mutex_unlock(mp->lock);
+
+	g_async_queue_unlock (msgport->queue);
 
 	return msg;
 }
 
-void e_msgport_reply(EMsg *msg)
+void
+e_msgport_reply (EMsg *msg)
 {
-	if (msg->reply_port) {
-		e_msgport_put(msg->reply_port, msg);
-	}
+	g_return_if_fail (msg != NULL);
+
+	if (msg->reply_port)
+		e_msgport_put (msg->reply_port, msg);
+
 	/* else lost? */
 }
 
@@ -1099,7 +1137,7 @@ void e_thread_put(EThread *e, EMsg *msg)
 	switch(e->type) {
 	case E_THREAD_QUEUE:
 		/* if the queue is full, lose this new addition */
-		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
+		if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
 			e_msgport_put(e->server_port, msg);
 		} else {
 			printf("queue limit reached, dropping new message\n");
@@ -1108,7 +1146,7 @@ void e_thread_put(EThread *e, EMsg *msg)
 		break;
 	case E_THREAD_DROP:
 		/* if the queue is full, lose the oldest (unprocessed) message */
-		if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
+		if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
 			e_msgport_put(e->server_port, msg);
 		} else {
 			printf("queue limit reached, dropping old message\n");
Index: libedataserver/e-msgport.h
===================================================================
RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.h,v
retrieving revision 1.3
diff -u -p -r1.3 e-msgport.h
--- libedataserver/e-msgport.h	3 Dec 2004 03:33:06 -0000	1.3
+++ libedataserver/e-msgport.h	21 Sep 2006 16:49:41 -0000
@@ -56,6 +56,7 @@ typedef struct _EMsgPort EMsgPort;
 typedef struct _EMsg {
 	EDListNode ln;
 	EMsgPort *reply_port;
+	gint flags;
 } EMsg;
 
 EMsgPort *e_msgport_new(void);