libubus: add support for using channels master
authorFelix Fietkau <nbd@nbd.name>
Tue, 31 Dec 2024 11:54:06 +0000 (12:54 +0100)
committerFelix Fietkau <nbd@nbd.name>
Thu, 2 Jan 2025 12:40:21 +0000 (13:40 +0100)
A channel is a context that is directly connected to a peer instead of going
through ubusd. The use of this context is limited to calling ubus_invoke and
receiving requests not bound to any registered object.

The main use case for this is having a more stateful interaction between
processes. A service using channels can attach metadata to each individual
channel and keep track of its lifetime, which is not possible through
the regular subscribe/notify mechanism.
Using channels also improves request latency, since messages are passed
directly between processes.

Signed-off-by: Felix Fietkau <nbd@nbd.name>
libubus-io.c
libubus-obj.c
libubus-req.c
libubus-sub.c
libubus.c
libubus.h
ubusmsg.h

index d190b67795eef5ebf96dc58df5366cb2e88f5352..16c1c14e786eff73fa9dc4f4afd4f52b92de343a 100644 (file)
@@ -401,6 +401,9 @@ int ubus_reconnect(struct ubus_context *ctx, const char *path)
        struct blob_attr *buf;
        int ret = UBUS_STATUS_UNKNOWN_ERROR;
 
+       if (ubus_context_is_channel(ctx))
+               return -1;
+
        if (!path)
                path = UBUS_UNIX_SOCKET;
 
@@ -435,7 +438,7 @@ int ubus_reconnect(struct ubus_context *ctx, const char *path)
                goto out_free;
 
        ctx->local_id = hdr.hdr.peer;
-       if (!ctx->local_id)
+       if (ctx->local_id <= UBUS_CLIENT_ID_CHANNEL)
                goto out_free;
 
        ret = UBUS_STATUS_OK;
index 29cbb2b98e6eab48d8cd280e6ca2a596da1ff729..4a56110979d89e45becbc417771d2bd0b7f7fd48 100644 (file)
@@ -57,12 +57,13 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
                .fd = -1,
                .req_fd = fd,
        };
-
+       ubus_handler_t handler;
        int method;
        int ret;
        bool no_reply = false;
 
-       if (!obj) {
+       if ((!obj && !ubus_context_is_channel(ctx)) ||
+           (!ctx->request_handler && ubus_context_is_channel(ctx))) {
                ret = UBUS_STATUS_NOT_FOUND;
                goto send;
        }
@@ -77,6 +78,12 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
 
        req.peer = hdr->peer;
        req.seq = hdr->seq;
+
+       if (ubus_context_is_channel(ctx)) {
+               handler = ctx->request_handler;
+               goto found;
+       }
+
        req.object = obj->id;
        if (attrbuf[UBUS_ATTR_USER] && attrbuf[UBUS_ATTR_GROUP]) {
                req.acl.user = blobmsg_get_string(attrbuf[UBUS_ATTR_USER]);
@@ -86,8 +93,10 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
        for (method = 0; method < obj->n_methods; method++)
                if (!obj->methods[method].name ||
                    !strcmp(obj->methods[method].name,
-                           blob_data(attrbuf[UBUS_ATTR_METHOD])))
+                           blob_data(attrbuf[UBUS_ATTR_METHOD]))) {
+                       handler = obj->methods[method].handler;
                        goto found;
+               }
 
        /* not found */
        ret = UBUS_STATUS_METHOD_NOT_FOUND;
@@ -99,9 +108,8 @@ found:
                goto send;
        }
 
-       ret = obj->methods[method].handler(ctx, obj, &req,
-                                          blob_data(attrbuf[UBUS_ATTR_METHOD]),
-                                          attrbuf[UBUS_ATTR_DATA]);
+       ret = handler(ctx, obj, &req, blob_data(attrbuf[UBUS_ATTR_METHOD]),
+                     attrbuf[UBUS_ATTR_DATA]);
        if (req.req_fd >= 0)
                close(req.req_fd);
        if (req.deferred || no_reply)
@@ -211,6 +219,9 @@ int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj)
        struct ubus_request req;
        int ret;
 
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        blob_buf_init(&b, 0);
 
        if (obj->name && obj->type) {
@@ -258,6 +269,9 @@ int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj)
        struct ubus_request req;
        int ret;
 
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        blob_buf_init(&b, 0);
        blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
 
index 474aac28c69adff4195875aa0b4eeafa59ff2f3d..3e8d55c93f7d9adda3ae58a221103a0658b87244 100644 (file)
@@ -280,6 +280,9 @@ __ubus_notify_async(struct ubus_context *ctx, struct ubus_object *obj,
                    const char *type, struct blob_attr *msg,
                    struct ubus_notify_request *req, bool reply)
 {
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        memset(req, 0, sizeof(*req));
 
        blob_buf_init(&b, 0);
@@ -496,6 +499,9 @@ void __hidden ubus_process_req_msg(struct ubus_context *ctx, struct ubus_msghdr_
 
 int __ubus_monitor(struct ubus_context *ctx, const char *type)
 {
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        blob_buf_init(&b, 0);
        return ubus_invoke(ctx, UBUS_SYSTEM_OBJECT_MONITOR, type, b.head, NULL, NULL, 1000);
 }
index 80d1f1ac542469aa782d35f690c5b5d68b3d10e0..127a2de0f8d863338892136452431c7877282f98 100644 (file)
@@ -79,6 +79,9 @@ int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *s
        struct ubus_object *obj = &s->obj;
        int ret;
 
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        INIT_LIST_HEAD(&s->list);
        obj->methods = &watch_method;
        obj->n_methods = 1;
@@ -104,6 +107,9 @@ __ubus_subscribe_request(struct ubus_context *ctx, struct ubus_object *obj, uint
 {
        struct ubus_request req;
 
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        blob_buf_init(&b, 0);
        blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id);
        blob_put_int32(&b, UBUS_ATTR_TARGET, id);
index c9c3c6e5c2484b23b06ffc2407f2cbc476b6eb62..5d226600b38e6112535116a4586dd1bce0c7d9e9 100644 (file)
--- a/libubus.c
+++ b/libubus.c
@@ -14,6 +14,7 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <unistd.h>
+#include <fcntl.h>
 
 #include <libubox/blob.h>
 #include <libubox/blobmsg.h>
@@ -98,9 +99,12 @@ ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd)
                ubus_process_req_msg(ctx, buf, fd);
                break;
 
-       case UBUS_MSG_INVOKE:
        case UBUS_MSG_UNSUBSCRIBE:
        case UBUS_MSG_NOTIFY:
+               if (ubus_context_is_channel(ctx))
+                       break;
+               /* fallthrough */
+       case UBUS_MSG_INVOKE:
                if (ctx->stack_depth) {
                        ubus_queue_msg(ctx, buf);
                        break;
@@ -111,6 +115,9 @@ ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd)
                ctx->stack_depth--;
                break;
        case UBUS_MSG_MONITOR:
+               if (ubus_context_is_channel(ctx))
+                       break;
+
                if (ctx->monitor_cb)
                        ctx->monitor_cb(ctx, buf->hdr.seq, buf->data);
                break;
@@ -163,6 +170,9 @@ int ubus_lookup(struct ubus_context *ctx, const char *path,
 {
        struct ubus_lookup_request lookup;
 
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        blob_buf_init(&b, 0);
        if (path)
                blob_put_string(&b, UBUS_ATTR_OBJPATH, path);
@@ -193,6 +203,9 @@ int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id)
 {
        struct ubus_request req;
 
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        blob_buf_init(&b, 0);
        if (path)
                blob_put_string(&b, UBUS_ATTR_OBJPATH, path);
@@ -230,6 +243,9 @@ int ubus_register_event_handler(struct ubus_context *ctx,
        struct blob_buf b2 = {};
        int ret;
 
+       if (ubus_context_is_channel(ctx))
+               return UBUS_STATUS_INVALID_ARGUMENT;
+
        if (!obj->id) {
                obj->methods = &event_method;
                obj->n_methods = 1;
@@ -281,7 +297,8 @@ static void ubus_default_connection_lost(struct ubus_context *ctx)
                uloop_end();
 }
 
-int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
+static int
+__ubus_ctx_init(struct ubus_context *ctx)
 {
        uloop_init();
        memset(ctx, 0, sizeof(*ctx));
@@ -298,8 +315,16 @@ int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
 
        INIT_LIST_HEAD(&ctx->requests);
        INIT_LIST_HEAD(&ctx->pending);
-       INIT_LIST_HEAD(&ctx->auto_subscribers);
        avl_init(&ctx->objects, ubus_cmp_id, false, NULL);
+       return 0;
+}
+
+int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
+{
+       if (__ubus_ctx_init(ctx))
+           return -1;
+
+       INIT_LIST_HEAD(&ctx->auto_subscribers);
        if (ubus_reconnect(ctx, path)) {
                free(ctx->msgbuf.data);
                ctx->msgbuf.data = NULL;
@@ -309,6 +334,49 @@ int ubus_connect_ctx(struct ubus_context *ctx, const char *path)
        return 0;
 }
 
+int ubus_channel_connect(struct ubus_context *ctx, int fd,
+                        ubus_handler_t handler)
+{
+       if (__ubus_ctx_init(ctx))
+           return -1;
+
+       if (ctx->sock.fd >= 0) {
+               if (ctx->sock.registered)
+                       uloop_fd_delete(&ctx->sock);
+
+               close(ctx->sock.fd);
+       }
+
+       ctx->sock.eof = false;
+       ctx->sock.error = false;
+       ctx->sock.fd = fd;
+       ctx->local_id = UBUS_CLIENT_ID_CHANNEL;
+       ctx->request_handler = handler;
+
+       fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC);
+
+       return 0;
+}
+
+int ubus_channel_create(struct ubus_context *ctx, int *remote_fd,
+                       ubus_handler_t handler)
+{
+       int sfd[2];
+
+       if (socketpair(AF_UNIX, SOCK_STREAM, 0, sfd))
+               return -1;
+
+       if (ubus_channel_connect(ctx, sfd[0], handler) < 0) {
+               close(sfd[0]);
+               close(sfd[1]);
+               return -1;
+       }
+
+       *remote_fd = sfd[1];
+
+       return 0;
+}
+
 static void ubus_auto_reconnect_cb(struct uloop_timeout *timeout)
 {
        struct ubus_auto_conn *conn = container_of(timeout, struct ubus_auto_conn, timer);
index fcf62c86fc7918fb2c5869a05b9e17687ba52922..aa9263c8084d54d934f6aaf56379ca35b2e9edef 100644 (file)
--- a/libubus.h
+++ b/libubus.h
@@ -177,8 +177,15 @@ struct ubus_context {
        uint32_t msgbuf_data_len;
        int msgbuf_reduction_counter;
 
-       struct list_head auto_subscribers;
-       struct ubus_event_handler auto_subscribe_event_handler;
+       union {
+               struct {
+                       struct list_head auto_subscribers;
+                       struct ubus_event_handler auto_subscribe_event_handler;
+               };
+               struct {
+                       ubus_handler_t request_handler;
+               };
+       };
 };
 
 struct ubus_object_data {
@@ -253,6 +260,16 @@ struct ubus_context *ubus_connect(const char *path);
 int ubus_connect_ctx(struct ubus_context *ctx, const char *path);
 void ubus_auto_connect(struct ubus_auto_conn *conn);
 int ubus_reconnect(struct ubus_context *ctx, const char *path);
+int ubus_channel_connect(struct ubus_context *ctx, int fd,
+                        ubus_handler_t handler);
+int ubus_channel_create(struct ubus_context *ctx, int *remote_fd,
+                       ubus_handler_t handler);
+
+static inline bool
+ubus_context_is_channel(struct ubus_context *ctx)
+{
+    return ctx->local_id == UBUS_CLIENT_ID_CHANNEL;
+}
 
 /* call this only for struct ubus_context pointers returned by ubus_connect() */
 void ubus_free(struct ubus_context *ctx);
index b2df8dc295f84886dac864ee37cd46e24083eab3..8858511232299996c35e71022d359128f3d4cae3 100644 (file)
--- a/ubusmsg.h
+++ b/ubusmsg.h
@@ -21,6 +21,8 @@
 
 #define UBUS_MSG_CHUNK_SIZE    65536
 
+#define UBUS_CLIENT_ID_CHANNEL         1
+
 #define UBUS_SYSTEM_OBJECT_EVENT       1
 #define UBUS_SYSTEM_OBJECT_ACL         2
 #define UBUS_SYSTEM_OBJECT_MONITOR     3