ceph: generalize mon requests, add pool op support
authorYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 17 May 2010 19:40:28 +0000 (12:40 -0700)
committerSage Weil <sage@newdream.net>
Tue, 10 Aug 2010 21:41:25 +0000 (14:41 -0700)
Generalize the current statfs synchronous requests, and support pool_ops.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/mon_client.c
fs/ceph/mon_client.h

index 54fe01c50706a935da2a9611677320bebae8b967..b2a5a3e4a671c336fc7495d473792ddc25f71110 100644 (file)
@@ -349,7 +349,7 @@ out:
 }
 
 /*
- * statfs
+ * generic requests (e.g., statfs, poolop)
  */
 static struct ceph_mon_generic_request *__lookup_generic_req(
        struct ceph_mon_client *monc, u64 tid)
@@ -442,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
        return m;
 }
 
+static int do_generic_request(struct ceph_mon_client *monc,
+                             struct ceph_mon_generic_request *req)
+{
+       int err;
+
+       /* register request */
+       mutex_lock(&monc->mutex);
+       req->tid = ++monc->last_tid;
+       req->request->hdr.tid = cpu_to_le64(req->tid);
+       __insert_generic_request(monc, req);
+       monc->num_generic_requests++;
+       ceph_con_send(monc->con, ceph_msg_get(req->request));
+       mutex_unlock(&monc->mutex);
+
+       err = wait_for_completion_interruptible(&req->completion);
+
+       mutex_lock(&monc->mutex);
+       rb_erase(&req->node, &monc->generic_request_tree);
+       monc->num_generic_requests--;
+       mutex_unlock(&monc->mutex);
+
+       if (!err)
+               err = req->result;
+       return err;
+}
+
+/*
+ * statfs
+ */
 static void handle_statfs_reply(struct ceph_mon_client *monc,
                                struct ceph_msg *msg)
 {
@@ -468,7 +497,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
        return;
 
 bad:
-       pr_err("corrupt generic reply, no tid\n");
+       pr_err("corrupt generic reply, tid %llu\n", tid);
        ceph_msg_dump(msg);
 }
 
@@ -487,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 
        kref_init(&req->kref);
        req->buf = buf;
+       req->buf_len = sizeof(*buf);
        init_completion(&req->completion);
 
        err = -ENOMEM;
@@ -504,33 +534,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
        h->monhdr.session_mon_tid = 0;
        h->fsid = monc->monmap->fsid;
 
-       /* register request */
-       mutex_lock(&monc->mutex);
-       req->tid = ++monc->last_tid;
-       req->request->hdr.tid = cpu_to_le64(req->tid);
-       __insert_generic_request(monc, req);
-       monc->num_generic_requests++;
-       mutex_unlock(&monc->mutex);
+       err = do_generic_request(monc, req);
 
-       /* send request and wait */
-       ceph_con_send(monc->con, ceph_msg_get(req->request));
-       err = wait_for_completion_interruptible(&req->completion);
+out:
+       kref_put(&req->kref, release_generic_request);
+       return err;
+}
+
+/*
+ * pool ops
+ */
+static int get_poolop_reply_buf(const char *src, size_t src_len,
+                               char *dst, size_t dst_len)
+{
+       u32 buf_len;
+
+       if (src_len != sizeof(u32) + dst_len)
+               return -EINVAL;
+
+       buf_len = le32_to_cpu(*(u32 *)src);
+       if (buf_len != dst_len)
+               return -EINVAL;
+
+       memcpy(dst, src + sizeof(u32), dst_len);
+       return 0;
+}
+
+static void handle_poolop_reply(struct ceph_mon_client *monc,
+                               struct ceph_msg *msg)
+{
+       struct ceph_mon_generic_request *req;
+       struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+       u64 tid = le64_to_cpu(msg->hdr.tid);
+
+       if (msg->front.iov_len < sizeof(*reply))
+               goto bad;
+       dout("handle_poolop_reply %p tid %llu\n", msg, tid);
 
        mutex_lock(&monc->mutex);
-       rb_erase(&req->node, &monc->generic_request_tree);
-       monc->num_generic_requests--;
+       req = __lookup_generic_req(monc, tid);
+       if (req) {
+               if (req->buf_len &&
+                   get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
+                                    msg->front.iov_len - sizeof(*reply),
+                                    req->buf, req->buf_len) < 0) {
+                       mutex_unlock(&monc->mutex);
+                       goto bad;
+               }
+               req->result = le32_to_cpu(reply->reply_code);
+               get_generic_request(req);
+       }
        mutex_unlock(&monc->mutex);
+       if (req) {
+               complete(&req->completion);
+               put_generic_request(req);
+       }
+       return;
 
-       if (!err)
-               err = req->result;
+bad:
+       pr_err("corrupt generic reply, tid %llu\n", tid);
+       ceph_msg_dump(msg);
+}
+
+/*
+ * Do a synchronous pool op.
+ */
+int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+                       u32 pool, u64 snapid,
+                       char *buf, int len)
+{
+       struct ceph_mon_generic_request *req;
+       struct ceph_mon_poolop *h;
+       int err;
+
+       req = kzalloc(sizeof(*req), GFP_NOFS);
+       if (!req)
+               return -ENOMEM;
+
+       kref_init(&req->kref);
+       req->buf = buf;
+       req->buf_len = len;
+       init_completion(&req->completion);
+
+       err = -ENOMEM;
+       req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+       if (!req->request)
+               goto out;
+       req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+       if (!req->reply)
+               goto out;
+
+       /* fill out request */
+       req->request->hdr.version = cpu_to_le16(2);
+       h = req->request->front.iov_base;
+       h->monhdr.have_version = 0;
+       h->monhdr.session_mon = cpu_to_le16(-1);
+       h->monhdr.session_mon_tid = 0;
+       h->fsid = monc->monmap->fsid;
+       h->pool = cpu_to_le32(pool);
+       h->op = cpu_to_le32(op);
+       h->auid = 0;
+       h->snapid = cpu_to_le64(snapid);
+       h->name_len = 0;
+
+       err = do_generic_request(monc, req);
 
 out:
        kref_put(&req->kref, release_generic_request);
        return err;
 }
 
+int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+                           u32 pool, u64 *snapid)
+{
+       return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+                                  pool, 0, (char *)snapid, sizeof(*snapid));
+
+}
+
+int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+                           u32 pool, u64 snapid)
+{
+       return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+                                  pool, snapid, 0, 0);
+
+}
+
 /*
- * Resend pending statfs requests.
+ * Resend pending generic requests.
  */
 static void __resend_generic_request(struct ceph_mon_client *monc)
 {
@@ -783,6 +914,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                handle_statfs_reply(monc, msg);
                break;
 
+       case CEPH_MSG_POOLOP_REPLY:
+               handle_poolop_reply(monc, msg);
+               break;
+
        case CEPH_MSG_MON_MAP:
                ceph_monc_handle_map(monc, msg);
                break;
@@ -820,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
        case CEPH_MSG_MON_SUBSCRIBE_ACK:
                m = ceph_msg_get(monc->m_subscribe_ack);
                break;
+       case CEPH_MSG_POOLOP_REPLY:
        case CEPH_MSG_STATFS_REPLY:
                return get_generic_reply(con, hdr, skip);
        case CEPH_MSG_AUTH_REPLY:
index 174d794321d0af7cf709e4ff7395fedc381aba81..8e396f2c09637f29d0a82311244d036441a51c1c 100644 (file)
@@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
        struct rb_node node;
        int result;
        void *buf;
+       int buf_len;
        struct completion completion;
        struct ceph_msg *request;  /* original request */
        struct ceph_msg *reply;    /* and reply */
@@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
 
+extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+                                  u32 pool, u64 *snapid);
 
+extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+                                  u32 pool, u64 snapid);
 
 #endif