source: trunk/packages/xen-3.1/xen-3.1/tools/xenstore/xs.c @ 34

Last change on this file since 34 was 34, checked in by hartmans, 18 years ago

Add xen and xen-common

File size: 19.1 KB
Line 
1/*
2    Xen Store Daemon interface providing simple tree-like database.
3    Copyright (C) 2005 Rusty Russell IBM Corporation
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 2.1 of the License, or (at your option) any later version.
9
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18*/
19
20#include <sys/types.h>
21#include <sys/stat.h>
22#include <fcntl.h>
23#include <sys/socket.h>
24#include <sys/un.h>
25#include <string.h>
26#include <unistd.h>
27#include <stdbool.h>
28#include <stdlib.h>
29#include <assert.h>
30#include <stdio.h>
31#include <signal.h>
32#include <stdint.h>
33#include <errno.h>
34#include <pthread.h>
35#include "xs.h"
36#include "list.h"
37#include "utils.h"
38
39struct xs_stored_msg {
40        struct list_head list;
41        struct xsd_sockmsg hdr;
42        char *body;
43};
44
45struct xs_handle {
46        /* Communications channel to xenstore daemon. */
47        int fd;
48
49        /*
50         * A read thread which pulls messages off the comms channel and
51         * signals waiters.
52         */
53        pthread_t read_thr;
54        int read_thr_exists;
55
56        /*
57         * A list of fired watch messages, protected by a mutex. Users can
58         * wait on the conditional variable until a watch is pending.
59         */
60        struct list_head watch_list;
61        pthread_mutex_t watch_mutex;
62        pthread_cond_t watch_condvar;
63
64        /* Clients can select() on this pipe to wait for a watch to fire. */
65        int watch_pipe[2];
66
67        /*
68         * A list of replies. Currently only one will ever be outstanding
69         * because we serialise requests. The requester can wait on the
70         * conditional variable for its response.
71         */
72        struct list_head reply_list;
73        pthread_mutex_t reply_mutex;
74        pthread_cond_t reply_condvar;
75
76        /* One request at a time. */
77        pthread_mutex_t request_mutex;
78};
79
80static int read_message(struct xs_handle *h);
81static void *read_thread(void *arg);
82
83int xs_fileno(struct xs_handle *h)
84{
85        char c = 0;
86
87        pthread_mutex_lock(&h->watch_mutex);
88
89        if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
90                /* Kick things off if the watch list is already non-empty. */
91                if (!list_empty(&h->watch_list))
92                        while (write(h->watch_pipe[1], &c, 1) != 1)
93                                continue;
94        }
95
96        pthread_mutex_unlock(&h->watch_mutex);
97
98        return h->watch_pipe[0];
99}
100
101static int get_socket(const char *connect_to)
102{
103        struct sockaddr_un addr;
104        int sock, saved_errno, flags;
105
106        sock = socket(PF_UNIX, SOCK_STREAM, 0);
107        if (sock < 0)
108                return -1;
109
110        if ((flags = fcntl(sock, F_GETFD)) < 0)
111                goto error;
112        flags |= FD_CLOEXEC;
113        if (fcntl(sock, F_SETFD, flags) < 0)
114                goto error;
115
116        addr.sun_family = AF_UNIX;
117        strcpy(addr.sun_path, connect_to);
118
119        if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0)
120                goto error;
121
122        return sock;
123
124error:
125        saved_errno = errno;
126        close(sock);
127        errno = saved_errno;
128        return -1;
129}
130
131static int get_dev(const char *connect_to)
132{
133        return open(connect_to, O_RDWR);
134}
135
136static struct xs_handle *get_handle(const char *connect_to)
137{
138        struct stat buf;
139        struct xs_handle *h = NULL;
140        int fd = -1, saved_errno;
141
142        if (stat(connect_to, &buf) != 0)
143                return NULL;
144
145        if (S_ISSOCK(buf.st_mode))
146                fd = get_socket(connect_to);
147        else
148                fd = get_dev(connect_to);
149
150        if (fd == -1)
151                return NULL;
152
153        h = malloc(sizeof(*h));
154        if (h == NULL) {
155                saved_errno = errno;
156                close(fd);
157                errno = saved_errno;
158                return NULL;
159        }
160
161        memset(h, 0, sizeof(*h));
162
163        h->fd = fd;
164
165        /* Watch pipe is allocated on demand in xs_fileno(). */
166        h->watch_pipe[0] = h->watch_pipe[1] = -1;
167
168        INIT_LIST_HEAD(&h->watch_list);
169        pthread_mutex_init(&h->watch_mutex, NULL);
170        pthread_cond_init(&h->watch_condvar, NULL);
171
172        INIT_LIST_HEAD(&h->reply_list);
173        pthread_mutex_init(&h->reply_mutex, NULL);
174        pthread_cond_init(&h->reply_condvar, NULL);
175
176        pthread_mutex_init(&h->request_mutex, NULL);
177
178        return h;
179}
180
181struct xs_handle *xs_daemon_open(void)
182{
183        return get_handle(xs_daemon_socket());
184}
185
186struct xs_handle *xs_daemon_open_readonly(void)
187{
188        return get_handle(xs_daemon_socket_ro());
189}
190
191struct xs_handle *xs_domain_open(void)
192{
193        return get_handle(xs_domain_dev());
194}
195
196void xs_daemon_close(struct xs_handle *h)
197{
198        struct xs_stored_msg *msg, *tmsg;
199
200        pthread_mutex_lock(&h->request_mutex);
201        pthread_mutex_lock(&h->reply_mutex);
202        pthread_mutex_lock(&h->watch_mutex);
203
204        if (h->read_thr_exists) {
205                /* XXX FIXME: May leak an unpublished message buffer. */
206                pthread_cancel(h->read_thr);
207                pthread_join(h->read_thr, NULL);
208        }
209
210        list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
211                free(msg->body);
212                free(msg);
213        }
214
215        list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
216                free(msg->body);
217                free(msg);
218        }
219
220        pthread_mutex_unlock(&h->request_mutex);
221        pthread_mutex_unlock(&h->reply_mutex);
222        pthread_mutex_unlock(&h->watch_mutex);
223
224        if (h->watch_pipe[0] != -1) {
225                close(h->watch_pipe[0]);
226                close(h->watch_pipe[1]);
227        }
228
229        close(h->fd);
230
231        free(h);
232}
233
234static bool read_all(int fd, void *data, unsigned int len)
235{
236        while (len) {
237                int done;
238
239                done = read(fd, data, len);
240                if (done < 0) {
241                        if (errno == EINTR)
242                                continue;
243                        return false;
244                }
245                if (done == 0) {
246                        /* It closed fd on us?  EBADF is appropriate. */
247                        errno = EBADF;
248                        return false;
249                }
250                data += done;
251                len -= done;
252        }
253
254        return true;
255}
256
257#ifdef XSTEST
258#define read_all read_all_choice
259#define xs_write_all write_all_choice
260#endif
261
262static int get_error(const char *errorstring)
263{
264        unsigned int i;
265
266        for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++)
267                if (i == ARRAY_SIZE(xsd_errors) - 1)
268                        return EINVAL;
269        return xsd_errors[i].errnum;
270}
271
272/* Adds extra nul terminator, because we generally (always?) hold strings. */
273static void *read_reply(
274        struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
275{
276        struct xs_stored_msg *msg;
277        char *body;
278
279        /* Read from comms channel ourselves if there is no reader thread. */
280        if (!h->read_thr_exists && (read_message(h) == -1))
281                return NULL;
282
283        pthread_mutex_lock(&h->reply_mutex);
284        while (list_empty(&h->reply_list))
285                pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
286        msg = list_top(&h->reply_list, struct xs_stored_msg, list);
287        list_del(&msg->list);
288        assert(list_empty(&h->reply_list));
289        pthread_mutex_unlock(&h->reply_mutex);
290
291        *type = msg->hdr.type;
292        if (len)
293                *len = msg->hdr.len;
294        body = msg->body;
295
296        free(msg);
297
298        return body;
299}
300
301/* Send message to xs, get malloc'ed reply.  NULL and set errno on error. */
302static void *xs_talkv(struct xs_handle *h, xs_transaction_t t,
303                      enum xsd_sockmsg_type type,
304                      const struct iovec *iovec,
305                      unsigned int num_vecs,
306                      unsigned int *len)
307{
308        struct xsd_sockmsg msg;
309        void *ret = NULL;
310        int saved_errno;
311        unsigned int i;
312        struct sigaction ignorepipe, oldact;
313
314        msg.tx_id = t;
315        msg.req_id = 0;
316        msg.type = type;
317        msg.len = 0;
318        for (i = 0; i < num_vecs; i++)
319                msg.len += iovec[i].iov_len;
320
321        ignorepipe.sa_handler = SIG_IGN;
322        sigemptyset(&ignorepipe.sa_mask);
323        ignorepipe.sa_flags = 0;
324        sigaction(SIGPIPE, &ignorepipe, &oldact);
325
326        pthread_mutex_lock(&h->request_mutex);
327
328        if (!xs_write_all(h->fd, &msg, sizeof(msg)))
329                goto fail;
330
331        for (i = 0; i < num_vecs; i++)
332                if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
333                        goto fail;
334
335        ret = read_reply(h, &msg.type, len);
336        if (!ret)
337                goto fail;
338
339        pthread_mutex_unlock(&h->request_mutex);
340
341        sigaction(SIGPIPE, &oldact, NULL);
342        if (msg.type == XS_ERROR) {
343                saved_errno = get_error(ret);
344                free(ret);
345                errno = saved_errno;
346                return NULL;
347        }
348
349        if (msg.type != type) {
350                free(ret);
351                saved_errno = EBADF;
352                goto close_fd;
353        }
354        return ret;
355
356fail:
357        /* We're in a bad state, so close fd. */
358        saved_errno = errno;
359        pthread_mutex_unlock(&h->request_mutex);
360        sigaction(SIGPIPE, &oldact, NULL);
361close_fd:
362        close(h->fd);
363        h->fd = -1;
364        errno = saved_errno;
365        return NULL;
366}
367
368/* free(), but don't change errno. */
369static void free_no_errno(void *p)
370{
371        int saved_errno = errno;
372        free(p);
373        errno = saved_errno;
374}
375
376/* Simplified version of xs_talkv: single message. */
377static void *xs_single(struct xs_handle *h, xs_transaction_t t,
378                       enum xsd_sockmsg_type type,
379                       const char *string,
380                       unsigned int *len)
381{
382        struct iovec iovec;
383
384        iovec.iov_base = (void *)string;
385        iovec.iov_len = strlen(string) + 1;
386        return xs_talkv(h, t, type, &iovec, 1, len);
387}
388
389static bool xs_bool(char *reply)
390{
391        if (!reply)
392                return false;
393        free(reply);
394        return true;
395}
396
397char **xs_directory(struct xs_handle *h, xs_transaction_t t,
398                    const char *path, unsigned int *num)
399{
400        char *strings, *p, **ret;
401        unsigned int len;
402
403        strings = xs_single(h, t, XS_DIRECTORY, path, &len);
404        if (!strings)
405                return NULL;
406
407        /* Count the strings. */
408        *num = xs_count_strings(strings, len);
409
410        /* Transfer to one big alloc for easy freeing. */
411        ret = malloc(*num * sizeof(char *) + len);
412        if (!ret) {
413                free_no_errno(strings);
414                return NULL;
415        }
416        memcpy(&ret[*num], strings, len);
417        free_no_errno(strings);
418
419        strings = (char *)&ret[*num];
420        for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
421                ret[(*num)++] = p;
422        return ret;
423}
424
425/* Get the value of a single file, nul terminated.
426 * Returns a malloced value: call free() on it after use.
427 * len indicates length in bytes, not including the nul.
428 */
429void *xs_read(struct xs_handle *h, xs_transaction_t t,
430              const char *path, unsigned int *len)
431{
432        return xs_single(h, t, XS_READ, path, len);
433}
434
435/* Write the value of a single file.
436 * Returns false on failure.
437 */
438bool xs_write(struct xs_handle *h, xs_transaction_t t,
439              const char *path, const void *data, unsigned int len)
440{
441        struct iovec iovec[2];
442
443        iovec[0].iov_base = (void *)path;
444        iovec[0].iov_len = strlen(path) + 1;
445        iovec[1].iov_base = (void *)data;
446        iovec[1].iov_len = len;
447
448        return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
449                                ARRAY_SIZE(iovec), NULL));
450}
451
452/* Create a new directory.
453 * Returns false on failure, or success if it already exists.
454 */
455bool xs_mkdir(struct xs_handle *h, xs_transaction_t t,
456              const char *path)
457{
458        return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
459}
460
461/* Destroy a file or directory (directories must be empty).
462 * Returns false on failure, or success if it doesn't exist.
463 */
464bool xs_rm(struct xs_handle *h, xs_transaction_t t,
465           const char *path)
466{
467        return xs_bool(xs_single(h, t, XS_RM, path, NULL));
468}
469
470/* Get permissions of node (first element is owner).
471 * Returns malloced array, or NULL: call free() after use.
472 */
473struct xs_permissions *xs_get_permissions(struct xs_handle *h,
474                                          xs_transaction_t t,
475                                          const char *path, unsigned int *num)
476{
477        char *strings;
478        unsigned int len;
479        struct xs_permissions *ret;
480
481        strings = xs_single(h, t, XS_GET_PERMS, path, &len);
482        if (!strings)
483                return NULL;
484
485        /* Count the strings: each one perms then domid. */
486        *num = xs_count_strings(strings, len);
487
488        /* Transfer to one big alloc for easy freeing. */
489        ret = malloc(*num * sizeof(struct xs_permissions));
490        if (!ret) {
491                free_no_errno(strings);
492                return NULL;
493        }
494
495        if (!xs_strings_to_perms(ret, *num, strings)) {
496                free_no_errno(ret);
497                ret = NULL;
498        }
499
500        free(strings);
501        return ret;
502}
503
504/* Set permissions of node (must be owner).
505 * Returns false on failure.
506 */
507bool xs_set_permissions(struct xs_handle *h,
508                        xs_transaction_t t,
509                        const char *path,
510                        struct xs_permissions *perms,
511                        unsigned int num_perms)
512{
513        unsigned int i;
514        struct iovec iov[1+num_perms];
515
516        iov[0].iov_base = (void *)path;
517        iov[0].iov_len = strlen(path) + 1;
518       
519        for (i = 0; i < num_perms; i++) {
520                char buffer[MAX_STRLEN(unsigned int)+1];
521
522                if (!xs_perm_to_string(&perms[i], buffer))
523                        goto unwind;
524
525                iov[i+1].iov_base = strdup(buffer);
526                iov[i+1].iov_len = strlen(buffer) + 1;
527                if (!iov[i+1].iov_base)
528                        goto unwind;
529        }
530
531        if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
532                goto unwind;
533        for (i = 0; i < num_perms; i++)
534                free(iov[i+1].iov_base);
535        return true;
536
537unwind:
538        num_perms = i;
539        for (i = 0; i < num_perms; i++)
540                free_no_errno(iov[i+1].iov_base);
541        return false;
542}
543
544/* Watch a node for changes (poll on fd to detect, or call read_watch()).
545 * When the node (or any child) changes, fd will become readable.
546 * Token is returned when watch is read, to allow matching.
547 * Returns false on failure.
548 */
549bool xs_watch(struct xs_handle *h, const char *path, const char *token)
550{
551        struct iovec iov[2];
552
553        /* We dynamically create a reader thread on demand. */
554        pthread_mutex_lock(&h->request_mutex);
555        if (!h->read_thr_exists) {
556                if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
557                        pthread_mutex_unlock(&h->request_mutex);
558                        return false;
559                }
560                h->read_thr_exists = 1;
561        }
562        pthread_mutex_unlock(&h->request_mutex);
563
564        iov[0].iov_base = (void *)path;
565        iov[0].iov_len = strlen(path) + 1;
566        iov[1].iov_base = (void *)token;
567        iov[1].iov_len = strlen(token) + 1;
568
569        return xs_bool(xs_talkv(h, XBT_NULL, XS_WATCH, iov,
570                                ARRAY_SIZE(iov), NULL));
571}
572
573/* Find out what node change was on (will block if nothing pending).
574 * Returns array of two pointers: path and token, or NULL.
575 * Call free() after use.
576 */
577char **xs_read_watch(struct xs_handle *h, unsigned int *num)
578{
579        struct xs_stored_msg *msg;
580        char **ret, *strings, c = 0;
581        unsigned int num_strings, i;
582
583        pthread_mutex_lock(&h->watch_mutex);
584
585        /* Wait on the condition variable for a watch to fire. */
586        while (list_empty(&h->watch_list))
587                pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
588        msg = list_top(&h->watch_list, struct xs_stored_msg, list);
589        list_del(&msg->list);
590
591        /* Clear the pipe token if there are no more pending watches. */
592        if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
593                while (read(h->watch_pipe[0], &c, 1) != 1)
594                        continue;
595
596        pthread_mutex_unlock(&h->watch_mutex);
597
598        assert(msg->hdr.type == XS_WATCH_EVENT);
599
600        strings     = msg->body;
601        num_strings = xs_count_strings(strings, msg->hdr.len);
602
603        ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
604        if (!ret) {
605                free_no_errno(strings);
606                free_no_errno(msg);
607                return NULL;
608        }
609
610        ret[0] = (char *)(ret + num_strings);
611        memcpy(ret[0], strings, msg->hdr.len);
612
613        free(strings);
614        free(msg);
615
616        for (i = 1; i < num_strings; i++)
617                ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
618
619        *num = num_strings;
620
621        return ret;
622}
623
624/* Remove a watch on a node.
625 * Returns false on failure (no watch on that node).
626 */
627bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
628{
629        struct iovec iov[2];
630
631        iov[0].iov_base = (char *)path;
632        iov[0].iov_len = strlen(path) + 1;
633        iov[1].iov_base = (char *)token;
634        iov[1].iov_len = strlen(token) + 1;
635
636        return xs_bool(xs_talkv(h, XBT_NULL, XS_UNWATCH, iov,
637                                ARRAY_SIZE(iov), NULL));
638}
639
640/* Start a transaction: changes by others will not be seen during this
641 * transaction, and changes will not be visible to others until end.
642 * You can only have one transaction at any time.
643 * Returns XBT_NULL on failure.
644 */
645xs_transaction_t xs_transaction_start(struct xs_handle *h)
646{
647        char *id_str;
648        xs_transaction_t id;
649
650        id_str = xs_single(h, XBT_NULL, XS_TRANSACTION_START, "", NULL);
651        if (id_str == NULL)
652                return XBT_NULL;
653
654        id = strtoul(id_str, NULL, 0);
655        free(id_str);
656
657        return id;
658}
659
660/* End a transaction.
661 * If abandon is true, transaction is discarded instead of committed.
662 * Returns false on failure, which indicates an error: transactions will
663 * not fail spuriously.
664 */
665bool xs_transaction_end(struct xs_handle *h, xs_transaction_t t,
666                        bool abort)
667{
668        char abortstr[2];
669
670        if (abort)
671                strcpy(abortstr, "F");
672        else
673                strcpy(abortstr, "T");
674       
675        return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
676}
677
678/* Introduce a new domain.
679 * This tells the store daemon about a shared memory page and event channel
680 * associated with a domain: the domain uses these to communicate.
681 */
682bool xs_introduce_domain(struct xs_handle *h,
683                         unsigned int domid, unsigned long mfn,
684                         unsigned int eventchn)
685{
686        char domid_str[MAX_STRLEN(domid)];
687        char mfn_str[MAX_STRLEN(mfn)];
688        char eventchn_str[MAX_STRLEN(eventchn)];
689        struct iovec iov[3];
690
691        sprintf(domid_str, "%u", domid);
692        sprintf(mfn_str, "%lu", mfn);
693        sprintf(eventchn_str, "%u", eventchn);
694
695        iov[0].iov_base = domid_str;
696        iov[0].iov_len = strlen(domid_str) + 1;
697        iov[1].iov_base = mfn_str;
698        iov[1].iov_len = strlen(mfn_str) + 1;
699        iov[2].iov_base = eventchn_str;
700        iov[2].iov_len = strlen(eventchn_str) + 1;
701
702        return xs_bool(xs_talkv(h, XBT_NULL, XS_INTRODUCE, iov,
703                                ARRAY_SIZE(iov), NULL));
704}
705
706static void * single_with_domid(struct xs_handle *h,
707                                enum xsd_sockmsg_type type,
708                                unsigned int domid)
709{
710        char domid_str[MAX_STRLEN(domid)];
711
712        sprintf(domid_str, "%u", domid);
713
714        return xs_single(h, XBT_NULL, type, domid_str, NULL);
715}
716
717bool xs_release_domain(struct xs_handle *h, unsigned int domid)
718{
719        return xs_bool(single_with_domid(h, XS_RELEASE, domid));
720}
721
722/* clear the shutdown bit for the given domain */
723bool xs_resume_domain(struct xs_handle *h, unsigned int domid)
724{
725        return xs_bool(single_with_domid(h, XS_RESUME, domid));
726}
727
728char *xs_get_domain_path(struct xs_handle *h, unsigned int domid)
729{
730        char domid_str[MAX_STRLEN(domid)];
731
732        sprintf(domid_str, "%u", domid);
733
734        return xs_single(h, XBT_NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
735}
736
737bool xs_is_domain_introduced(struct xs_handle *h, unsigned int domid)
738{
739        return strcmp("F",
740                      single_with_domid(h, XS_IS_DOMAIN_INTRODUCED, domid));
741}
742
743/* Only useful for DEBUG versions */
744char *xs_debug_command(struct xs_handle *h, const char *cmd,
745                       void *data, unsigned int len)
746{
747        struct iovec iov[2];
748
749        iov[0].iov_base = (void *)cmd;
750        iov[0].iov_len = strlen(cmd) + 1;
751        iov[1].iov_base = data;
752        iov[1].iov_len = len;
753
754        return xs_talkv(h, XBT_NULL, XS_DEBUG, iov,
755                        ARRAY_SIZE(iov), NULL);
756}
757
758static int read_message(struct xs_handle *h)
759{
760        struct xs_stored_msg *msg = NULL;
761        char *body = NULL;
762        int saved_errno;
763
764        /* Allocate message structure and read the message header. */
765        msg = malloc(sizeof(*msg));
766        if (msg == NULL)
767                goto error;
768        if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
769                goto error;
770
771        /* Allocate and read the message body. */
772        body = msg->body = malloc(msg->hdr.len + 1);
773        if (body == NULL)
774                goto error;
775        if (!read_all(h->fd, body, msg->hdr.len))
776                goto error;
777        body[msg->hdr.len] = '\0';
778
779        if (msg->hdr.type == XS_WATCH_EVENT) {
780                pthread_mutex_lock(&h->watch_mutex);
781
782                /* Kick users out of their select() loop. */
783                if (list_empty(&h->watch_list) &&
784                    (h->watch_pipe[1] != -1))
785                        while (write(h->watch_pipe[1], body, 1) != 1)
786                                continue;
787
788                list_add_tail(&msg->list, &h->watch_list);
789                pthread_cond_signal(&h->watch_condvar);
790
791                pthread_mutex_unlock(&h->watch_mutex);
792        } else {
793                pthread_mutex_lock(&h->reply_mutex);
794
795                /* There should only ever be one response pending! */
796                if (!list_empty(&h->reply_list)) {
797                        pthread_mutex_unlock(&h->reply_mutex);
798                        goto error;
799                }
800
801                list_add_tail(&msg->list, &h->reply_list);
802                pthread_cond_signal(&h->reply_condvar);
803
804                pthread_mutex_unlock(&h->reply_mutex);
805        }
806
807        return 0;
808
809 error:
810        saved_errno = errno;
811        free(msg);
812        free(body);
813        errno = saved_errno;
814        return -1;
815}
816
817static void *read_thread(void *arg)
818{
819        struct xs_handle *h = arg;
820
821        while (read_message(h) != -1)
822                continue;
823
824        return NULL;
825}
826
827/*
828 * Local variables:
829 *  c-file-style: "linux"
830 *  indent-tabs-mode: t
831 *  c-indent-level: 8
832 *  c-basic-offset: 8
833 *  tab-width: 8
834 * End:
835 */
Note: See TracBrowser for help on using the repository browser.