From afa57cce0aff82f4a7a0e509d4387ebc23dd3be7 Mon Sep 17 00:00:00 2001 From: Felix Fietkau Date: Tue, 31 Dec 2024 12:54:06 +0100 Subject: [PATCH] libubus: add support for using channels A channel is a context that is directly connected to a peer instead of going through ubusd. The use of this context is limited to calling ubus_invoke and receiving requests not bound to any registered object. The main use case for this is having a more stateful interaction between processes. A service using channels can attach metadata to each individual channel and keep track of its lifetime, which is not possible through the regular subscribe/notify mechanism. Using channels also improves request latency, since messages are passed directly between processes. Signed-off-by: Felix Fietkau --- libubus-io.c | 5 +++- libubus-obj.c | 26 +++++++++++++----- libubus-req.c | 6 +++++ libubus-sub.c | 6 +++++ libubus.c | 74 ++++++++++++++++++++++++++++++++++++++++++++++++--- libubus.h | 21 +++++++++++++-- ubusmsg.h | 2 ++ 7 files changed, 128 insertions(+), 12 deletions(-) diff --git a/libubus-io.c b/libubus-io.c index d190b67..16c1c14 100644 --- a/libubus-io.c +++ b/libubus-io.c @@ -401,6 +401,9 @@ int ubus_reconnect(struct ubus_context *ctx, const char *path) struct blob_attr *buf; int ret = UBUS_STATUS_UNKNOWN_ERROR; + if (ubus_context_is_channel(ctx)) + return -1; + if (!path) path = UBUS_UNIX_SOCKET; @@ -435,7 +438,7 @@ int ubus_reconnect(struct ubus_context *ctx, const char *path) goto out_free; ctx->local_id = hdr.hdr.peer; - if (!ctx->local_id) + if (ctx->local_id <= UBUS_CLIENT_ID_CHANNEL) goto out_free; ret = UBUS_STATUS_OK; diff --git a/libubus-obj.c b/libubus-obj.c index 29cbb2b..4a56110 100644 --- a/libubus-obj.c +++ b/libubus-obj.c @@ -57,12 +57,13 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr, .fd = -1, .req_fd = fd, }; - + ubus_handler_t handler; int method; int ret; bool no_reply = false; - if (!obj) { + if ((!obj && !ubus_context_is_channel(ctx)) || + (!ctx->request_handler && ubus_context_is_channel(ctx))) { ret = UBUS_STATUS_NOT_FOUND; goto send; } @@ -77,6 +78,12 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr, req.peer = hdr->peer; req.seq = hdr->seq; + + if (ubus_context_is_channel(ctx)) { + handler = ctx->request_handler; + goto found; + } + req.object = obj->id; if (attrbuf[UBUS_ATTR_USER] && attrbuf[UBUS_ATTR_GROUP]) { req.acl.user = blobmsg_get_string(attrbuf[UBUS_ATTR_USER]); @@ -86,8 +93,10 @@ ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr, for (method = 0; method < obj->n_methods; method++) if (!obj->methods[method].name || !strcmp(obj->methods[method].name, - blob_data(attrbuf[UBUS_ATTR_METHOD]))) + blob_data(attrbuf[UBUS_ATTR_METHOD]))) { + handler = obj->methods[method].handler; goto found; + } /* not found */ ret = UBUS_STATUS_METHOD_NOT_FOUND; @@ -99,9 +108,8 @@ found: goto send; } - ret = obj->methods[method].handler(ctx, obj, &req, - blob_data(attrbuf[UBUS_ATTR_METHOD]), - attrbuf[UBUS_ATTR_DATA]); + ret = handler(ctx, obj, &req, blob_data(attrbuf[UBUS_ATTR_METHOD]), + attrbuf[UBUS_ATTR_DATA]); if (req.req_fd >= 0) close(req.req_fd); if (req.deferred || no_reply) @@ -211,6 +219,9 @@ int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj) struct ubus_request req; int ret; + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + blob_buf_init(&b, 0); if (obj->name && obj->type) { @@ -258,6 +269,9 @@ int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj) struct ubus_request req; int ret; + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + blob_buf_init(&b, 0); blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); diff --git a/libubus-req.c b/libubus-req.c index 474aac2..3e8d55c 100644 --- a/libubus-req.c +++ b/libubus-req.c @@ -280,6 +280,9 @@ __ubus_notify_async(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, struct ubus_notify_request *req, bool reply) { + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + memset(req, 0, sizeof(*req)); blob_buf_init(&b, 0); @@ -496,6 +499,9 @@ void __hidden ubus_process_req_msg(struct ubus_context *ctx, struct ubus_msghdr_ int __ubus_monitor(struct ubus_context *ctx, const char *type) { + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + blob_buf_init(&b, 0); return ubus_invoke(ctx, UBUS_SYSTEM_OBJECT_MONITOR, type, b.head, NULL, NULL, 1000); } diff --git a/libubus-sub.c b/libubus-sub.c index 80d1f1a..127a2de 100644 --- a/libubus-sub.c +++ b/libubus-sub.c @@ -79,6 +79,9 @@ int ubus_register_subscriber(struct ubus_context *ctx, struct ubus_subscriber *s struct ubus_object *obj = &s->obj; int ret; + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + INIT_LIST_HEAD(&s->list); obj->methods = &watch_method; obj->n_methods = 1; @@ -104,6 +107,9 @@ __ubus_subscribe_request(struct ubus_context *ctx, struct ubus_object *obj, uint { struct ubus_request req; + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + blob_buf_init(&b, 0); blob_put_int32(&b, UBUS_ATTR_OBJID, obj->id); blob_put_int32(&b, UBUS_ATTR_TARGET, id); diff --git a/libubus.c b/libubus.c index c9c3c6e..5d22660 100644 --- a/libubus.c +++ b/libubus.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -98,9 +99,12 @@ ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd) ubus_process_req_msg(ctx, buf, fd); break; - case UBUS_MSG_INVOKE: case UBUS_MSG_UNSUBSCRIBE: case UBUS_MSG_NOTIFY: + if (ubus_context_is_channel(ctx)) + break; + /* fallthrough */ + case UBUS_MSG_INVOKE: if (ctx->stack_depth) { ubus_queue_msg(ctx, buf); break; @@ -111,6 +115,9 @@ ubus_process_msg(struct ubus_context *ctx, struct ubus_msghdr_buf *buf, int fd) ctx->stack_depth--; break; case UBUS_MSG_MONITOR: + if (ubus_context_is_channel(ctx)) + break; + if (ctx->monitor_cb) ctx->monitor_cb(ctx, buf->hdr.seq, buf->data); break; @@ -163,6 +170,9 @@ int ubus_lookup(struct ubus_context *ctx, const char *path, { struct ubus_lookup_request lookup; + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + blob_buf_init(&b, 0); if (path) blob_put_string(&b, UBUS_ATTR_OBJPATH, path); @@ -193,6 +203,9 @@ int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id) { struct ubus_request req; + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + blob_buf_init(&b, 0); if (path) blob_put_string(&b, UBUS_ATTR_OBJPATH, path); @@ -230,6 +243,9 @@ int ubus_register_event_handler(struct ubus_context *ctx, struct blob_buf b2 = {}; int ret; + if (ubus_context_is_channel(ctx)) + return UBUS_STATUS_INVALID_ARGUMENT; + if (!obj->id) { obj->methods = &event_method; obj->n_methods = 1; @@ -281,7 +297,8 @@ static void ubus_default_connection_lost(struct ubus_context *ctx) uloop_end(); } -int ubus_connect_ctx(struct ubus_context *ctx, const char *path) +static int +__ubus_ctx_init(struct ubus_context *ctx) { uloop_init(); memset(ctx, 0, sizeof(*ctx)); @@ -298,8 +315,16 @@ int ubus_connect_ctx(struct ubus_context *ctx, const char *path) INIT_LIST_HEAD(&ctx->requests); INIT_LIST_HEAD(&ctx->pending); - INIT_LIST_HEAD(&ctx->auto_subscribers); avl_init(&ctx->objects, ubus_cmp_id, false, NULL); + return 0; +} + +int ubus_connect_ctx(struct ubus_context *ctx, const char *path) +{ + if (__ubus_ctx_init(ctx)) + return -1; + + INIT_LIST_HEAD(&ctx->auto_subscribers); if (ubus_reconnect(ctx, path)) { free(ctx->msgbuf.data); ctx->msgbuf.data = NULL; @@ -309,6 +334,49 @@ int ubus_connect_ctx(struct ubus_context *ctx, const char *path) return 0; } +int ubus_channel_connect(struct ubus_context *ctx, int fd, + ubus_handler_t handler) +{ + if (__ubus_ctx_init(ctx)) + return -1; + + if (ctx->sock.fd >= 0) { + if (ctx->sock.registered) + uloop_fd_delete(&ctx->sock); + + close(ctx->sock.fd); + } + + ctx->sock.eof = false; + ctx->sock.error = false; + ctx->sock.fd = fd; + ctx->local_id = UBUS_CLIENT_ID_CHANNEL; + ctx->request_handler = handler; + + fcntl(ctx->sock.fd, F_SETFL, fcntl(ctx->sock.fd, F_GETFL) | O_NONBLOCK | O_CLOEXEC); + + return 0; +} + +int ubus_channel_create(struct ubus_context *ctx, int *remote_fd, + ubus_handler_t handler) +{ + int sfd[2]; + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sfd)) + return -1; + + if (ubus_channel_connect(ctx, sfd[0], handler) < 0) { + close(sfd[0]); + close(sfd[1]); + return -1; + } + + *remote_fd = sfd[1]; + + return 0; +} + static void ubus_auto_reconnect_cb(struct uloop_timeout *timeout) { struct ubus_auto_conn *conn = container_of(timeout, struct ubus_auto_conn, timer); diff --git a/libubus.h b/libubus.h index fcf62c8..aa9263c 100644 --- a/libubus.h +++ b/libubus.h @@ -177,8 +177,15 @@ struct ubus_context { uint32_t msgbuf_data_len; int msgbuf_reduction_counter; - struct list_head auto_subscribers; - struct ubus_event_handler auto_subscribe_event_handler; + union { + struct { + struct list_head auto_subscribers; + struct ubus_event_handler auto_subscribe_event_handler; + }; + struct { + ubus_handler_t request_handler; + }; + }; }; struct ubus_object_data { @@ -253,6 +260,16 @@ struct ubus_context *ubus_connect(const char *path); int ubus_connect_ctx(struct ubus_context *ctx, const char *path); void ubus_auto_connect(struct ubus_auto_conn *conn); int ubus_reconnect(struct ubus_context *ctx, const char *path); +int ubus_channel_connect(struct ubus_context *ctx, int fd, + ubus_handler_t handler); +int ubus_channel_create(struct ubus_context *ctx, int *remote_fd, + ubus_handler_t handler); + +static inline bool +ubus_context_is_channel(struct ubus_context *ctx) +{ + return ctx->local_id == UBUS_CLIENT_ID_CHANNEL; +} /* call this only for struct ubus_context pointers returned by ubus_connect() */ void ubus_free(struct ubus_context *ctx); diff --git a/ubusmsg.h b/ubusmsg.h index b2df8dc..8858511 100644 --- a/ubusmsg.h +++ b/ubusmsg.h @@ -21,6 +21,8 @@ #define UBUS_MSG_CHUNK_SIZE 65536 +#define UBUS_CLIENT_ID_CHANNEL 1 + #define UBUS_SYSTEM_OBJECT_EVENT 1 #define UBUS_SYSTEM_OBJECT_ACL 2 #define UBUS_SYSTEM_OBJECT_MONITOR 3 -- 2.30.2