#include <linux/ceph/ceph_debug.h>
#include <linux/spinlock.h>
#include <linux/fs_struct.h>
#include <linux/namei.h>
#include <linux/slab.h>
#include <linux/sched.h>
#include <linux/xattr.h>
#include "super.h"
#include "mds_client.h"
/*
* Directory operations: readdir, lookup, create, link, unlink,
* rename, etc.
*/
/*
* Ceph MDS operations are specified in terms of a base ino and
* relative path. Thus, the client can specify an operation on a
* specific inode (e.g., a getattr due to fstat(2)), or as a path
* relative to, say, the root directory.
*
* Normally, we limit ourselves to strict inode ops (no path component)
* or dentry operations (a single path component relative to an ino). The
* exception to this is open_root_dentry(), which will open the mount
* point by name.
*/
const struct dentry_operations ceph_dentry_ops;
/*
* Initialize ceph dentry state.
*/
int ceph_init_dentry(struct dentry *dentry)
{
struct ceph_dentry_info *di;
if (dentry->d_fsdata)
return 0;
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
if (!di)
return -ENOMEM; /* oh well */
spin_lock(&dentry->d_lock);
if (dentry->d_fsdata) {
/* lost a race */
kmem_cache_free(ceph_dentry_cachep, di);
goto out_unlock;
}
if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP)
d_set_d_op(dentry, &ceph_dentry_ops);
else if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
else
d_set_d_op(dentry, &ceph_snap_dentry_ops);
di->dentry = dentry;
di->lease_session = NULL;
di->time = jiffies;
/* avoid reordering d_fsdata setup so that the check above is safe */
smp_mb();
dentry->d_fsdata = di;
ceph_dentry_lru_add(dentry);
out_unlock:
spin_unlock(&dentry->d_lock);
return 0;
}
/*
* for f_pos for readdir:
* - hash order:
* (0xff << 52) | ((24 bits hash) << 28) |
* (the nth entry has hash collision);
* - frag+name order;
* ((frag value) << 28) | (the nth entry in frag);
*/
#define OFFSET_BITS 28
#define OFFSET_MASK ((1 << OFFSET_BITS) - 1)
#define HASH_ORDER (0xffull << (OFFSET_BITS + 24))
loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
{
loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
if (hash_order)
fpos |= HASH_ORDER;
return fpos;
}
static bool is_hash_order(loff_t p)
{
return (p & HASH_ORDER) == HASH_ORDER;
}
static unsigned fpos_frag(loff_t p)
{
return p >> OFFSET_BITS;
}
static unsigned fpos_hash(loff_t p)
{
return ceph_frag_value(fpos_frag(p));
}
static unsigned fpos_off(loff_t p)
{
return p & OFFSET_MASK;
}
static int fpos_cmp(loff_t l, loff_t r)
{
int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
if (v)
return v;
return (int)(fpos_off(l) - fpos_off(r));
}
/*
* make note of the last dentry we read, so we can
* continue at the same lexicographical point,
* regardless of what dir changes take place on the
* server.
*/
static int note_last_dentry(struct ceph_file_info *fi, const char *name,
int len, unsigned next_offset)
{
char *buf = kmalloc(len+1, GFP_KERNEL);
if (!buf)
return -ENOMEM;
kfree(fi->last_name);
fi->last_name = buf;
memcpy(fi->last_name, name, len);
fi->last_name[len] = 0;
fi->next_offset = next_offset;
dout("note_last_dentry '%s'\n", fi->last_name);
return 0;
}
static struct dentry *
__dcache_find_get_entry(struct dentry *parent, u64 idx,
struct ceph_readdir_cache_control *cache_ctl)
{
struct inode *dir = d_inode(parent);
struct dentry *dentry;
unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
loff_t ptr_pos = idx * sizeof(struct dentry *);
pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
if (ptr_pos >= i_size_read(dir))
return NULL;
if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
ceph_readdir_cache_release(cache_ctl);
cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
if (!cache_ctl->page) {
dout(" page %lu not found\n", ptr_pgoff);
return ERR_PTR(-EAGAIN);
}
/* reading/filling the cache are serialized by
i_mutex, no need to use page lock */
unlock_page(cache_ctl->page);
cache_ctl->dentries = kmap(cache_ctl->page);
}
cache_ctl->index = idx & idx_mask;
rcu_read_lock();
spin_lock(&parent->d_lock);
/* check i_size again here, because empty directory can be
* marked as complete while not holding the i_mutex. */
if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
dentry = cache_ctl->dentries[cache_ctl->index];
else
dentry = NULL;
spin_unlock(&parent->d_lock);
if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
dentry = NULL;
rcu_read_unlock();
return dentry ? : ERR_PTR(-EAGAIN);
}
/*
* When possible, we try to satisfy a readdir by peeking at the
* dcache. We make this work by carefully ordering dentries on
* d_child when we initially get results back from the MDS, and
* falling back to a "normal" sync readdir if any dentries in the dir
* are dropped.
*
* Complete dir indicates that we have all dentries in the dir. It is
* defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
* the MDS if/when the directory is modified).
*/
static int __dcache_readdir(struct file *file, struct dir_context *ctx,
u32 shared_gen)
{
struct ceph_file_info *fi = file->private_data;
struct dentry *parent = file->f_path.dentry;
struct inode *dir = d_inode(parent);
struct dentry *dentry, *last = NULL;
struct ceph_dentry_info *di;
struct ceph_readdir_cache_control cache_ctl = {};
u64 idx = 0;
int err = 0;
dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
/* search start position */
if (ctx->pos > 2) {
u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
while (count > 0) {
u64 step = count >> 1;
dentry = __dcache_find_get_entry(parent, idx + step,
&cache_ctl);
if (!dentry) {
/* use linar search */
idx = 0;
break;
}
if (IS_ERR(dentry)) {
err = PTR_ERR(dentry);
goto out;
}
di = ceph_dentry(dentry);
spin_lock(&dentry->d_lock);
if (fpos_cmp(di->offset, ctx->pos) < 0) {
idx += step + 1;
count -= step + 1;
} else {
count = step;
}
spin_unlock(&dentry->d_lock);
dput(dentry);
}
dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
}
for (;;) {
bool emit_dentry = false;
dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
if (!dentry) {
fi->flags |= CEPH_F_ATEND;
err = 0;
break;
}
if (IS_ERR(dentry)) {
err = PTR_ERR(dentry);
goto out;
}
di = ceph_dentry(dentry);
spin_lock(&dentry->d_lock);
if (di->lease_shared_gen == shared_gen &&
d_really_is_positive(dentry) &&
fpos_cmp(ctx->pos, di->offset) <= 0) {
emit_dentry = true;
}
spin_unlock(&dentry->d_lock);
if (emit_dentry) {
dout(" %llx dentry %p %pd %p\n", di->offset,
|