Merged trunk changes r9797:9829 into 217update branch.
Index: libedataserver/e-msgport.c
===================================================================
RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.c,v
retrieving revision 1.10
diff -u -p -r1.10 e-msgport.c
--- libedataserver/e-msgport.c 26 Jul 2006 18:12:08 -0000 1.10
+++ libedataserver/e-msgport.c 21 Sep 2006 16:49:41 -0000
@@ -486,29 +486,19 @@ em_cache_clear(EMCache *emc)
}
struct _EMsgPort {
- EDList queue;
- int condwait; /* how many waiting in condwait */
- union {
- int pipe[2]; /* On Win32 actually a pair of SOCKETs */
- struct {
- int read;
- int write;
- } fd;
- } pipe;
+ GAsyncQueue *queue;
+ EMsg *cache;
+ gint pipe[2]; /* on Win32, actually a pair of SOCKETs */
#ifdef HAVE_NSS
- union {
- PRFileDesc *pipe[2];
- struct {
- PRFileDesc *read;
- PRFileDesc *write;
- } fd;
- } prpipe;
-#endif
- /* @#@$#$ glib stuff */
- GCond *cond;
- GMutex *lock;
+ PRFileDesc *prpipe[2];
+#endif
};
+/* message flags */
+enum {
+ MSG_FLAG_SYNC_WITH_PIPE = 1 << 0,
+ MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1
+};
#ifdef HAVE_NSS
static int
@@ -529,189 +519,237 @@ e_prpipe (PRFileDesc **fds)
}
#endif
-EMsgPort *e_msgport_new(void)
+static void
+msgport_sync_with_pipe (gint fd)
+{
+ gchar buffer[1];
+
+ while (fd >= 0) {
+ if (E_READ (fd, buffer, 1) > 0)
+ break;
+ else if (!E_IS_STATUS_INTR ()) {
+ g_warning ("%s: Failed to read from pipe: %s",
+ G_STRFUNC, g_strerror (errno));
+ break;
+ }
+ }
+}
+
+#ifdef HAVE_NSS
+static void
+msgport_sync_with_prpipe (PRFileDesc *prfd)
+{
+ gchar buffer[1];
+
+ while (prfd != NULL) {
+ if (PR_Read (prfd, buffer, 1) > 0)
+ break;
+ else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
+ gchar *text = g_alloca (PR_GetErrorTextLength ());
+ PR_GetErrorText (text);
+ g_warning ("%s: Failed to read from NSPR pipe: %s",
+ G_STRFUNC, text);
+ break;
+ }
+ }
+}
+#endif
+
+EMsgPort *
+e_msgport_new (void)
{
- EMsgPort *mp;
+ EMsgPort *msgport;
- mp = g_malloc(sizeof(*mp));
- e_dlist_init(&mp->queue);
- mp->lock = g_mutex_new();
- mp->cond = g_cond_new();
- e_pipe (mp->pipe.pipe);
+ msgport = g_slice_new (EMsgPort);
+ msgport->queue = g_async_queue_new ();
+ msgport->cache = NULL;
+ msgport->pipe[0] = -1;
+ msgport->pipe[1] = -1;
#ifdef HAVE_NSS
- e_prpipe (mp->prpipe.pipe);
+ msgport->prpipe[0] = NULL;
+ msgport->prpipe[1] = NULL;
#endif
- mp->condwait = 0;
- return mp;
+ return msgport;
}
-void e_msgport_destroy(EMsgPort *mp)
+void
+e_msgport_destroy (EMsgPort *msgport)
{
- g_mutex_free(mp->lock);
- g_cond_free(mp->cond);
- if (mp->pipe.fd.read != -1) {
- E_CLOSE(mp->pipe.fd.read);
- E_CLOSE(mp->pipe.fd.write);
+ g_return_if_fail (msgport != NULL);
+
+ if (msgport->pipe[0] >= 0) {
+ E_CLOSE (msgport->pipe[0]);
+ E_CLOSE (msgport->pipe[1]);
}
#ifdef HAVE_NSS
- if (mp->prpipe.fd.read) {
- PR_Close(mp->prpipe.fd.read);
- PR_Close(mp->prpipe.fd.write);
+ if (msgport->prpipe[0] != NULL) {
+ PR_Close (msgport->prpipe[0]);
+ PR_Close (msgport->prpipe[1]);
}
#endif
- g_free(mp);
+
+ g_async_queue_unref (msgport->queue);
+ g_slice_free (EMsgPort, msgport);
}
-/* get a fd that can be used to wait on the port asynchronously */
-int e_msgport_fd(EMsgPort *mp)
+int
+e_msgport_fd (EMsgPort *msgport)
{
- return mp->pipe.fd.read;
+ gint fd;
+
+ g_return_val_if_fail (msgport != NULL, -1);
+
+ g_async_queue_lock (msgport->queue);
+ fd = msgport->pipe[0];
+ if (fd < 0 && e_pipe (msgport->pipe) == 0)
+ fd = msgport->pipe[0];
+ g_async_queue_unlock (msgport->queue);
+
+ return fd;
}
#ifdef HAVE_NSS
-PRFileDesc *e_msgport_prfd(EMsgPort *mp)
+PRFileDesc *
+e_msgport_prfd (EMsgPort *msgport)
{
- return mp->prpipe.fd.read;
+ PRFileDesc *prfd;
+
+ g_return_val_if_fail (msgport != NULL, NULL);
+
+ g_async_queue_lock (msgport->queue);
+ prfd = msgport->prpipe[0];
+ if (prfd == NULL && e_prpipe (msgport->prpipe) == 0)
+ prfd = msgport->prpipe[0];
+ g_async_queue_unlock (msgport->queue);
+
+ return prfd;
}
#endif
-void e_msgport_put(EMsgPort *mp, EMsg *msg)
+void
+e_msgport_put (EMsgPort *msgport, EMsg *msg)
{
+ gint fd;
#ifdef HAVE_NSS
PRFileDesc *prfd;
#endif
- ssize_t w;
- int fd;
-
- m(printf("put:\n"));
- g_mutex_lock(mp->lock);
- e_dlist_addtail(&mp->queue, &msg->ln);
- if (mp->condwait > 0) {
- m(printf("put: condwait > 0, waking up\n"));
- g_cond_signal(mp->cond);
+
+ g_return_if_fail (msgport != NULL);
+ g_return_if_fail (msg != NULL);
+
+ g_async_queue_lock (msgport->queue);
+
+ msg->flags = 0;
+
+ fd = msgport->pipe[1];
+ while (fd >= 0) {
+ if (E_WRITE (fd, "E", 1) > 0) {
+ msg->flags |= MSG_FLAG_SYNC_WITH_PIPE;
+ break;
+ } else if (!E_IS_STATUS_INTR ()) {
+ g_warning ("%s: Failed to write to pipe: %s",
+ G_STRFUNC, g_strerror (errno));
+ break;
+ }
}
-
- fd = mp->pipe.fd.write;
-#ifdef HAVE_NSS
- prfd = mp->prpipe.fd.write;
-#endif
- g_mutex_unlock(mp->lock);
#ifdef HAVE_NSS
- if (prfd != NULL) {
- m(printf("put: have pr pipe, writing notification to it\n"));
- do {
- w = PR_Write (prfd, "E", 1);
- } while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
+ prfd = msgport->prpipe[1];
+ while (prfd != NULL) {
+ if (PR_Write (prfd, "E", 1) > 0) {
+ msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE;
+ break;
+ } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) {
+ gchar *text = g_alloca (PR_GetErrorTextLength ());
+ PR_GetErrorText (text);
+ g_warning ("%s: Failed to write to NSPR pipe: %s",
+ G_STRFUNC, text);
+ break;
+ }
}
#endif
- if (fd != -1) {
- m(printf("put: have pipe, writing notification to it\n"));
- do {
- w = E_WRITE (fd, "E", 1);
- } while (w == -1 && E_IS_STATUS_INTR ());
- }
- m(printf("put: done\n"));
+ g_async_queue_push_unlocked (msgport->queue, msg);
+ g_async_queue_unlock (msgport->queue);
}
-static void
-msgport_cleanlock(void *data)
+EMsg *
+e_msgport_wait (EMsgPort *msgport)
{
- EMsgPort *mp = data;
+ EMsg *msg;
- g_mutex_unlock(mp->lock);
-}
+ g_return_val_if_fail (msgport != NULL, NULL);
-EMsg *e_msgport_wait(EMsgPort *mp)
-{
- EMsg *msg;
+ g_async_queue_lock (msgport->queue);
- m(printf("wait:\n"));
- g_mutex_lock(mp->lock);
- while (e_dlist_empty(&mp->queue)) {
- if (mp->pipe.fd.read != -1) {
- fd_set rfds;
- int retry;
-
- m(printf("wait: waiting on pipe\n"));
- g_mutex_unlock(mp->lock);
- do {
- FD_ZERO(&rfds);
- FD_SET(mp->pipe.fd.read, &rfds);
- retry = E_IS_SOCKET_ERROR(select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL)) && E_IS_STATUS_INTR();
- pthread_testcancel();
- } while (retry);
- g_mutex_lock(mp->lock);
- m(printf("wait: got pipe\n"));
-#ifdef HAVE_NSS
- } else if (mp->prpipe.fd.read != NULL) {
- PRPollDesc rfds[1];
- int retry;
-
- m(printf("wait: waitng on pr pipe\n"));
- g_mutex_unlock(mp->lock);
- do {
- rfds[0].fd = mp->prpipe.fd.read;
- rfds[0].in_flags = PR_POLL_READ|PR_POLL_ERR;
- retry = PR_Poll(rfds, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR;
- pthread_testcancel();
- } while (retry);
- g_mutex_lock(mp->lock);
- m(printf("wait: got pr pipe\n"));
-#endif /* HAVE_NSS */
- } else {
- m(printf("wait: waiting on condition\n"));
- mp->condwait++;
- /* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
- pthread_cleanup_push(msgport_cleanlock, mp);
- g_cond_wait(mp->cond, mp->lock);
- pthread_cleanup_pop(0);
- m(printf("wait: got condition\n"));
- mp->condwait--;
- }
+ /* check the cache first */
+ if (msgport->cache != NULL) {
+ msg = msgport->cache;
+ /* don't clear the cache */
+ g_async_queue_unlock (msgport->queue);
+ return msg;
}
- msg = (EMsg *)mp->queue.head;
- m(printf("wait: message = %p\n", msg));
- g_mutex_unlock(mp->lock);
- m(printf("wait: done\n"));
+
+ msg = g_async_queue_pop_unlocked (msgport->queue);
+
+ g_assert (msg != NULL);
+
+ /* The message is not actually "removed" from the EMsgPort until
+ * e_msgport_get() is called. So we cache the popped message. */
+ msgport->cache = msg;
+
+ if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
+ msgport_sync_with_pipe (msgport->pipe[0]);
+#ifdef HAVE_NSS
+ if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
+ msgport_sync_with_prpipe (msgport->prpipe[0]);
+#endif
+
+ g_async_queue_unlock (msgport->queue);
+
return msg;
}
-EMsg *e_msgport_get(EMsgPort *mp)
+EMsg *
+e_msgport_get (EMsgPort *msgport)
{
EMsg *msg;
- char dummy[1];
- ssize_t n;
-
- g_mutex_lock(mp->lock);
- msg = (EMsg *)e_dlist_remhead(&mp->queue);
- if (msg) {
- if (mp->pipe.fd.read != -1) {
- do {
- n = E_READ (mp->pipe.fd.read, dummy, 1);
- } while (n == -1 && E_IS_STATUS_INTR ());
- }
+
+ g_return_val_if_fail (msgport != NULL, NULL);
+
+ g_async_queue_lock (msgport->queue);
+
+ /* check the cache first */
+ if (msgport->cache != NULL) {
+ msg = msgport->cache;
+ msgport->cache = NULL;
+ g_async_queue_unlock (msgport->queue);
+ return msg;
+ }
+
+ msg = g_async_queue_try_pop_unlocked (msgport->queue);
+
+ if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE)
+ msgport_sync_with_pipe (msgport->pipe[0]);
#ifdef HAVE_NSS
- if (mp->prpipe.fd.read != NULL) {
- do {
- n = PR_Read (mp->prpipe.fd.read, dummy, 1);
- } while (n == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
- }
+ if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE)
+ msgport_sync_with_prpipe (msgport->prpipe[0]);
#endif
- }
- m(printf("get: message = %p\n", msg));
- g_mutex_unlock(mp->lock);
+
+ g_async_queue_unlock (msgport->queue);
return msg;
}
-void e_msgport_reply(EMsg *msg)
+void
+e_msgport_reply (EMsg *msg)
{
- if (msg->reply_port) {
- e_msgport_put(msg->reply_port, msg);
- }
+ g_return_if_fail (msg != NULL);
+
+ if (msg->reply_port)
+ e_msgport_put (msg->reply_port, msg);
+
/* else lost? */
}
@@ -1099,7 +1137,7 @@ void e_thread_put(EThread *e, EMsg *msg)
switch(e->type) {
case E_THREAD_QUEUE:
/* if the queue is full, lose this new addition */
- if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
+ if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
e_msgport_put(e->server_port, msg);
} else {
printf("queue limit reached, dropping new message\n");
@@ -1108,7 +1146,7 @@ void e_thread_put(EThread *e, EMsg *msg)
break;
case E_THREAD_DROP:
/* if the queue is full, lose the oldest (unprocessed) message */
- if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
+ if (g_async_queue_length(e->server_port->queue) < e->queue_limit) {
e_msgport_put(e->server_port, msg);
} else {
printf("queue limit reached, dropping old message\n");
Index: libedataserver/e-msgport.h
===================================================================
RCS file: /cvs/gnome/evolution-data-server/libedataserver/e-msgport.h,v
retrieving revision 1.3
diff -u -p -r1.3 e-msgport.h
--- libedataserver/e-msgport.h 3 Dec 2004 03:33:06 -0000 1.3
+++ libedataserver/e-msgport.h 21 Sep 2006 16:49:41 -0000
@@ -56,6 +56,7 @@ typedef struct _EMsgPort EMsgPort;
typedef struct _EMsg {
EDListNode ln;
EMsgPort *reply_port;
+ gint flags;
} EMsg;
EMsgPort *e_msgport_new(void);