diff options
| author | Linus Torvalds <torvalds@linux-foundation.org> | 2024-07-17 12:16:22 -0700 |
|---|---|---|
| committer | Linus Torvalds <torvalds@linux-foundation.org> | 2024-07-17 12:16:22 -0700 |
| commit | f097ef0e7625f70844ddaba60ca43d421db5b1b0 (patch) | |
| tree | acd10db2d9684ad221eb310802542be934cc2c27 | |
| parent | 586f14a6a182bbdb9404dc66464dcd8d0ac175a3 (diff) | |
| parent | 89b01913dc73d7c4b8440b1396909ccb7ec8c4b4 (diff) | |
| download | linux-f097ef0e7625f70844ddaba60ca43d421db5b1b0.tar.gz linux-f097ef0e7625f70844ddaba60ca43d421db5b1b0.tar.bz2 linux-f097ef0e7625f70844ddaba60ca43d421db5b1b0.zip | |
Merge tag 'dlm-6.11' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
Pull dlm updates from David Teigland:
- New flag DLM_LSFL_SOFTIRQ_SAFE can be set by code using dlm to
indicate callbacks can be run from softirq
- Change md-cluster to set DLM_LSFL_SOFTIRQ_SAFE
- Clean up for previous changes, e.g. unused code and parameters
- Remove custom pre-allocation of rsb structs which is unnecessary with
kmem caches
- Change idr to xarray for lkb structs in use
- Change idr to xarray for rsb structs being recovered
- Change outdated naming related to internal rsb states
- Fix some incorrect add/remove of rsb on scan list
- Use rcu to free rsb structs
* tag 'dlm-6.11' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
dlm: add rcu_barrier before destroy kmem cache
dlm: remove DLM_LSFL_SOFTIRQ from exflags
fs: dlm: remove unused struct 'dlm_processed_nodes'
md-cluster: use DLM_LSFL_SOFTIRQ for dlm_new_lockspace()
dlm: implement LSFL_SOFTIRQ_SAFE
dlm: introduce DLM_LSFL_SOFTIRQ_SAFE
dlm: use LSFL_FS to check for kernel lockspace
dlm: use rcu to avoid an extra rsb struct lookup
dlm: fix add_scan and del_scan usage
dlm: change list and timer names
dlm: move recover idr to xarray datastructure
dlm: move lkb idr to xarray datastructure
dlm: drop own rsb pre allocation mechanism
dlm: remove ls_local_handle from struct dlm_ls
dlm: remove unused parameter in dlm_midcomms_addr
dlm: don't kref_init rsbs created for toss list
dlm: remove scand leftovers
| -rw-r--r-- | drivers/md/md-cluster.c | 2 | ||||
| -rw-r--r-- | fs/dlm/ast.c | 172 | ||||
| -rw-r--r-- | fs/dlm/ast.h | 11 | ||||
| -rw-r--r-- | fs/dlm/config.c | 2 | ||||
| -rw-r--r-- | fs/dlm/debug_fs.c | 10 | ||||
| -rw-r--r-- | fs/dlm/dlm_internal.h | 60 | ||||
| -rw-r--r-- | fs/dlm/lock.c | 568 | ||||
| -rw-r--r-- | fs/dlm/lock.h | 7 | ||||
| -rw-r--r-- | fs/dlm/lockspace.c | 131 | ||||
| -rw-r--r-- | fs/dlm/lowcomms.c | 8 | ||||
| -rw-r--r-- | fs/dlm/lowcomms.h | 2 | ||||
| -rw-r--r-- | fs/dlm/member.c | 2 | ||||
| -rw-r--r-- | fs/dlm/memory.c | 10 | ||||
| -rw-r--r-- | fs/dlm/midcomms.c | 4 | ||||
| -rw-r--r-- | fs/dlm/midcomms.h | 2 | ||||
| -rw-r--r-- | fs/dlm/recover.c | 78 | ||||
| -rw-r--r-- | fs/dlm/recover.h | 2 | ||||
| -rw-r--r-- | fs/dlm/recoverd.c | 14 | ||||
| -rw-r--r-- | fs/dlm/user.c | 42 | ||||
| -rw-r--r-- | include/linux/dlm.h | 17 | ||||
| -rw-r--r-- | include/uapi/linux/dlm.h | 2 |
21 files changed, 584 insertions, 562 deletions
diff --git a/drivers/md/md-cluster.c b/drivers/md/md-cluster.c index 139fe2019c1d..c1ea214bfc91 100644 --- a/drivers/md/md-cluster.c +++ b/drivers/md/md-cluster.c @@ -887,7 +887,7 @@ static int join(struct mddev *mddev, int nodes) memset(str, 0, 64); sprintf(str, "%pU", mddev->uuid); ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, - 0, LVB_SIZE, &md_ls_ops, mddev, + DLM_LSFL_SOFTIRQ, LVB_SIZE, &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); if (ret) goto err; diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 59711486d801..742b30b61c19 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -18,35 +18,52 @@ #include "user.h" #include "ast.h" -static void dlm_callback_work(struct work_struct *work) +static void dlm_run_callback(uint32_t ls_id, uint32_t lkb_id, int8_t mode, + uint32_t flags, uint8_t sb_flags, int sb_status, + struct dlm_lksb *lksb, + void (*astfn)(void *astparam), + void (*bastfn)(void *astparam, int mode), + void *astparam, const char *res_name, + size_t res_length) { - struct dlm_callback *cb = container_of(work, struct dlm_callback, work); - - if (cb->flags & DLM_CB_BAST) { - trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name, - cb->res_length); - cb->bastfn(cb->astparam, cb->mode); - } else if (cb->flags & DLM_CB_CAST) { - trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status, - cb->sb_flags, cb->res_name, cb->res_length); - cb->lkb_lksb->sb_status = cb->sb_status; - cb->lkb_lksb->sb_flags = cb->sb_flags; - cb->astfn(cb->astparam); + if (flags & DLM_CB_BAST) { + trace_dlm_bast(ls_id, lkb_id, mode, res_name, res_length); + bastfn(astparam, mode); + } else if (flags & DLM_CB_CAST) { + trace_dlm_ast(ls_id, lkb_id, sb_status, sb_flags, res_name, + res_length); + lksb->sb_status = sb_status; + lksb->sb_flags = sb_flags; + astfn(astparam); } +} +static void dlm_do_callback(struct dlm_callback *cb) +{ + dlm_run_callback(cb->ls_id, cb->lkb_id, cb->mode, cb->flags, + cb->sb_flags, cb->sb_status, cb->lkb_lksb, + cb->astfn, cb->bastfn, cb->astparam, + cb->res_name, cb->res_length); dlm_free_cb(cb); } -int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags, - struct dlm_callback **cb) +static void dlm_callback_work(struct work_struct *work) +{ + struct dlm_callback *cb = container_of(work, struct dlm_callback, work); + + dlm_do_callback(cb); +} + +bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, int *copy_lvb) { struct dlm_rsb *rsb = lkb->lkb_resource; - int rv = DLM_ENQUEUE_CALLBACK_SUCCESS; struct dlm_ls *ls = rsb->res_ls; - int copy_lvb = 0; int prev_mode; + if (copy_lvb) + *copy_lvb = 0; + if (flags & DLM_CB_BAST) { /* if cb is a bast, it should be skipped if the blocking mode is * compatible with the last granted mode @@ -56,7 +73,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, log_debug(ls, "skip %x bast mode %d for cast mode %d", lkb->lkb_id, mode, lkb->lkb_last_cast_cb_mode); - goto out; + return true; } } @@ -74,7 +91,7 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, (prev_mode > mode && prev_mode > DLM_LOCK_PR)) { log_debug(ls, "skip %x add bast mode %d for bast mode %d", lkb->lkb_id, mode, prev_mode); - goto out; + return true; } } @@ -85,8 +102,10 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, prev_mode = lkb->lkb_last_cast_cb_mode; if (!status && lkb->lkb_lksb->sb_lvbptr && - dlm_lvb_operations[prev_mode + 1][mode + 1]) - copy_lvb = 1; + dlm_lvb_operations[prev_mode + 1][mode + 1]) { + if (copy_lvb) + *copy_lvb = 1; + } } lkb->lkb_last_cast_cb_mode = mode; @@ -96,11 +115,19 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, lkb->lkb_last_cb_mode = mode; lkb->lkb_last_cb_flags = flags; + return false; +} + +int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb) +{ + struct dlm_rsb *rsb = lkb->lkb_resource; + struct dlm_ls *ls = rsb->res_ls; + *cb = dlm_allocate_cb(); - if (!*cb) { - rv = DLM_ENQUEUE_CALLBACK_FAILURE; - goto out; - } + if (WARN_ON_ONCE(!*cb)) + return -ENOMEM; /* for tracing */ (*cb)->lkb_id = lkb->lkb_id; @@ -112,19 +139,34 @@ int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, (*cb)->mode = mode; (*cb)->sb_status = status; (*cb)->sb_flags = (sbflags & 0x000000FF); - (*cb)->copy_lvb = copy_lvb; (*cb)->lkb_lksb = lkb->lkb_lksb; - rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED; + return 0; +} + +static int dlm_get_queue_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb) +{ + int rv; + + rv = dlm_get_cb(lkb, flags, mode, status, sbflags, cb); + if (rv) + return rv; -out: - return rv; + (*cb)->astfn = lkb->lkb_astfn; + (*cb)->bastfn = lkb->lkb_bastfn; + (*cb)->astparam = lkb->lkb_astparam; + INIT_WORK(&(*cb)->work, dlm_callback_work); + + return 0; } void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, - uint32_t sbflags) + uint32_t sbflags) { - struct dlm_ls *ls = lkb->lkb_resource->res_ls; + struct dlm_rsb *rsb = lkb->lkb_resource; + struct dlm_ls *ls = rsb->res_ls; struct dlm_callback *cb; int rv; @@ -133,34 +175,36 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, return; } - rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, - &cb); - switch (rv) { - case DLM_ENQUEUE_CALLBACK_NEED_SCHED: - cb->astfn = lkb->lkb_astfn; - cb->bastfn = lkb->lkb_bastfn; - cb->astparam = lkb->lkb_astparam; - INIT_WORK(&cb->work, dlm_callback_work); - - spin_lock_bh(&ls->ls_cb_lock); - if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) + if (dlm_may_skip_callback(lkb, flags, mode, status, sbflags, NULL)) + return; + + spin_lock_bh(&ls->ls_cb_lock); + if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) list_add(&cb->list, &ls->ls_cb_delay); - else - queue_work(ls->ls_callback_wq, &cb->work); - spin_unlock_bh(&ls->ls_cb_lock); - break; - case DLM_ENQUEUE_CALLBACK_SUCCESS: - break; - case DLM_ENQUEUE_CALLBACK_FAILURE: - fallthrough; - default: - WARN_ON_ONCE(1); - break; + } else { + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) { + dlm_run_callback(ls->ls_global_id, lkb->lkb_id, mode, flags, + sbflags, status, lkb->lkb_lksb, + lkb->lkb_astfn, lkb->lkb_bastfn, + lkb->lkb_astparam, rsb->res_name, + rsb->res_length); + } else { + rv = dlm_get_queue_cb(lkb, flags, mode, status, sbflags, &cb); + if (!rv) + queue_work(ls->ls_callback_wq, &cb->work); + } } + spin_unlock_bh(&ls->ls_cb_lock); } int dlm_callback_start(struct dlm_ls *ls) { + if (!test_bit(LSFL_FS, &ls->ls_flags) || + test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) + return 0; + ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback", WQ_HIGHPRI | WQ_MEM_RECLAIM); if (!ls->ls_callback_wq) { @@ -178,13 +222,15 @@ void dlm_callback_stop(struct dlm_ls *ls) void dlm_callback_suspend(struct dlm_ls *ls) { - if (ls->ls_callback_wq) { - spin_lock_bh(&ls->ls_cb_lock); - set_bit(LSFL_CB_DELAY, &ls->ls_flags); - spin_unlock_bh(&ls->ls_cb_lock); + if (!test_bit(LSFL_FS, &ls->ls_flags)) + return; + + spin_lock_bh(&ls->ls_cb_lock); + set_bit(LSFL_CB_DELAY, &ls->ls_flags); + spin_unlock_bh(&ls->ls_cb_lock); + if (ls->ls_callback_wq) flush_workqueue(ls->ls_callback_wq); - } } #define MAX_CB_QUEUE 25 @@ -195,14 +241,18 @@ void dlm_callback_resume(struct dlm_ls *ls) int count = 0, sum = 0; bool empty; - if (!ls->ls_callback_wq) + if (!test_bit(LSFL_FS, &ls->ls_flags)) return; more: spin_lock_bh(&ls->ls_cb_lock); list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) { list_del(&cb->list); - queue_work(ls->ls_callback_wq, &cb->work); + if (test_bit(LSFL_SOFTIRQ, &ls->ls_flags)) + dlm_do_callback(cb); + else + queue_work(ls->ls_callback_wq, &cb->work); + count++; if (count == MAX_CB_QUEUE) break; diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h index 9093ff043bee..e2b86845d331 100644 --- a/fs/dlm/ast.h +++ b/fs/dlm/ast.h @@ -11,12 +11,11 @@ #ifndef __ASTD_DOT_H__ #define __ASTD_DOT_H__ -#define DLM_ENQUEUE_CALLBACK_NEED_SCHED 1 -#define DLM_ENQUEUE_CALLBACK_SUCCESS 0 -#define DLM_ENQUEUE_CALLBACK_FAILURE -1 -int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, - int status, uint32_t sbflags, - struct dlm_callback **cb); +bool dlm_may_skip_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, int *copy_lvb); +int dlm_get_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, + int status, uint32_t sbflags, + struct dlm_callback **cb); void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags); diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 517fa975dc5a..99952234799e 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -672,7 +672,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf, memcpy(addr, buf, len); - rv = dlm_midcomms_addr(cm->nodeid, addr, len); + rv = dlm_midcomms_addr(cm->nodeid, addr); if (rv) { kfree(addr); return rv; diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index 6ab3ed4074c6..7112958c2e5b 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -380,7 +380,7 @@ static const struct seq_operations format4_seq_ops; static int table_seq_show(struct seq_file *seq, void *iter_ptr) { - struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_rsbs_list); + struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_slow_list); if (seq->op == &format1_seq_ops) print_format1(rsb, seq); @@ -409,9 +409,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) } if (seq->op == &format4_seq_ops) - list = &ls->ls_toss; + list = &ls->ls_slow_inactive; else - list = &ls->ls_keep; + list = &ls->ls_slow_active; read_lock_bh(&ls->ls_rsbtbl_lock); return seq_list_start(list, *pos); @@ -423,9 +423,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) struct list_head *list; if (seq->op == &format4_seq_ops) - list = &ls->ls_toss; + list = &ls->ls_slow_inactive; else - list = &ls->ls_keep; + list = &ls->ls_slow_active; return seq_list_next(iter_ptr, list, pos); } diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 9085ba3b2f20..32d98e63d25e 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -36,7 +36,7 @@ #include <linux/miscdevice.h> #include <linux/rhashtable.h> #include <linux/mutex.h> -#include <linux/idr.h> +#include <linux/xarray.h> #include <linux/ratelimit.h> #include <linux/uaccess.h> @@ -316,26 +316,24 @@ struct dlm_rsb { int res_nodeid; int res_master_nodeid; int res_dir_nodeid; - int res_id; /* for ls_recover_idr */ + unsigned long res_id; /* for ls_recover_xa */ uint32_t res_lvbseq; uint32_t res_hash; unsigned long res_toss_time; uint32_t res_first_lkid; struct list_head res_lookup; /* lkbs waiting on first */ - union { - struct list_head res_hashchain; - struct rhash_head res_node; /* rsbtbl */ - }; + struct rhash_head res_node; /* rsbtbl */ struct list_head res_grantqueue; struct list_head res_convertqueue; struct list_head res_waitqueue; - struct list_head res_rsbs_list; + struct list_head res_slow_list; /* ls_slow_* */ + struct list_head res_scan_list; struct list_head res_root_list; /* used for recovery */ struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ - struct list_head res_toss_q_list; int res_recover_locks_count; + struct rcu_head rcu; char *res_lvbptr; char res_name[DLM_RESNAME_MAXLEN+1]; @@ -368,7 +366,8 @@ enum rsb_flags { RSB_RECOVER_CONVERT, RSB_RECOVER_GRANT, RSB_RECOVER_LVB_INVAL, - RSB_TOSS, + RSB_INACTIVE, + RSB_HASHED, /* set while rsb is on ls_rsbtbl */ }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) @@ -559,16 +558,8 @@ struct rcom_lock { char rl_lvb[]; }; -/* - * The max number of resources per rsbtbl bucket that shrink will attempt - * to remove in each iteration. - */ - -#define DLM_REMOVE_NAMES_MAX 8 - struct dlm_ls { struct list_head ls_list; /* list of lockspaces */ - dlm_lockspace_t *ls_local_handle; uint32_t ls_global_id; /* global unique lockspace ID */ uint32_t ls_generation; uint32_t ls_exflags; @@ -578,26 +569,21 @@ struct dlm_ls { wait_queue_head_t ls_count_wait; int ls_create_count; /* create/release refcount */ unsigned long ls_flags; /* LSFL_ */ - unsigned long ls_scan_time; struct kobject ls_kobj; - struct idr ls_lkbidr; - rwlock_t ls_lkbidr_lock; + struct xarray ls_lkbxa; + rwlock_t ls_lkbxa_lock; + /* an rsb is on rsbtl for primary locking functions, + and on a slow list for recovery/dump iteration */ struct rhashtable ls_rsbtbl; - rwlock_t ls_rsbtbl_lock; + rwlock_t ls_rsbtbl_lock; /* for ls_rsbtbl and ls_slow */ + struct list_head ls_slow_inactive; /* to iterate rsbtbl */ + struct list_head ls_slow_active; /* to iterate rsbtbl */ - struct list_head ls_toss; - struct list_head ls_keep; - - struct timer_list ls_timer; - /* this queue is ordered according the - * absolute res_toss_time jiffies time - * to mod_timer() with the first element - * if necessary. - */ - struct list_head ls_toss_q; - spinlock_t ls_toss_q_lock; + struct timer_list ls_scan_timer; /* based on first scan_list rsb toss_time */ + struct list_head ls_scan_list; /* rsbs ordered by res_toss_time */ + spinlock_t ls_scan_lock; spinlock_t ls_waiters_lock; struct list_head ls_waiters; /* lkbs needing a reply */ @@ -605,10 +591,6 @@ struct dlm_ls { spinlock_t ls_orphans_lock; struct list_head ls_orphans; - spinlock_t ls_new_rsb_spin; - int ls_new_rsb_count; - struct list_head ls_new_rsb; /* new rsb structs */ - struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ @@ -664,8 +646,8 @@ struct dlm_ls { struct list_head ls_recover_list; spinlock_t ls_recover_list_lock; int ls_recover_list_count; - struct idr ls_recover_idr; - spinlock_t ls_recover_idr_lock; + struct xarray ls_recover_xa; + spinlock_t ls_recover_xa_lock; wait_queue_head_t ls_wait_general; wait_queue_head_t ls_recover_lock_wait; spinlock_t ls_clear_proc_locks; @@ -716,6 +698,8 @@ struct dlm_ls { #define LSFL_CB_DELAY 9 #define LSFL_NODIR 10 #define LSFL_RECV_MSG_BLOCKED 11 +#define LSFL_FS 12 +#define LSFL_SOFTIRQ 13 #define DLM_PROC_FLAGS_CLOSING 1 #define DLM_PROC_FLAGS_COMPAT 2 diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f103b8c30592..8bee4f444afd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, const struct dlm_message *ms, bool local); static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); -static void toss_rsb(struct kref *kref); +static void deactivate_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -330,8 +330,8 @@ static inline unsigned long rsb_toss_jiffies(void) static inline void hold_rsb(struct dlm_rsb *r) { - /* rsbs in toss state never get referenced */ - WARN_ON(rsb_flag(r, RSB_TOSS)); + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); kref_get(&r->res_ref); } @@ -370,15 +370,12 @@ static inline int dlm_kref_put_write_lock_bh(struct kref *kref, return 0; } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ - static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; int rv; - rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, &ls->ls_rsbtbl_lock); if (rv) write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -389,82 +386,54 @@ void dlm_put_rsb(struct dlm_rsb *r) put_rsb(r); } -static int pre_rsb_struct(struct dlm_ls *ls) -{ - struct dlm_rsb *r1, *r2; - int count = 0; - - spin_lock_bh(&ls->ls_new_rsb_spin); - if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { - spin_unlock_bh(&ls->ls_new_rsb_spin); - return 0; - } - spin_unlock_bh(&ls->ls_new_rsb_spin); - - r1 = dlm_allocate_rsb(ls); - r2 = dlm_allocate_rsb(ls); - - spin_lock_bh(&ls->ls_new_rsb_spin); - if (r1) { - list_add(&r1->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; - } - if (r2) { - list_add(&r2->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; - } - count = ls->ls_new_rsb_count; - spin_unlock_bh(&ls->ls_new_rsb_spin); - - if (!count) - return -ENOMEM; - return 0; -} - /* connected with timer_delete_sync() in dlm_ls_stop() to stop * new timers when recovery is triggered and don't run them - * again until a dlm_timer_resume() tries it again. + * again until a resume_scan_timer() tries it again. */ -static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) +static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) { if (!dlm_locking_stopped(ls)) - mod_timer(&ls->ls_timer, jiffies); + mod_timer(&ls->ls_scan_timer, jiffies); } /* This function tries to resume the timer callback if a rsb - * is on the toss list and no timer is pending. It might that + * is on the scan list and no timer is pending. It might that * the first entry is on currently executed as timer callback * but we don't care if a timer queued up again and does * nothing. Should be a rare case. */ -void dlm_timer_resume(struct dlm_ls *ls) +void resume_scan_timer(struct dlm_ls *ls) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_toss_q_lock); - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - if (r && !timer_pending(&ls->ls_timer)) - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (r && !timer_pending(&ls->ls_scan_timer)) + enable_scan_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_scan_lock); } -/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ -static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) +/* ls_rsbtbl_lock must be held */ + +static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) { struct dlm_rsb *first; - spin_lock_bh(&ls->ls_toss_q_lock); + /* active rsbs should never be on the scan list */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + + spin_lock_bh(&ls->ls_scan_lock); r->res_toss_time = 0; /* if the rsb is not queued do nothing */ - if (list_empty(&r->res_toss_q_list)) + if (list_empty(&r->res_scan_list)) goto out; /* get the first element before delete */ - first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_del_init(&r->res_toss_q_list); + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_del_init(&r->res_scan_list); /* check if the first element was the rsb we deleted */ if (first == r) { /* try to get the new first element, if the list @@ -474,70 +443,59 @@ static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) * if the list isn't empty and a new first element got * in place, set the new timer expire time. */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!first) - timer_delete(&ls->ls_timer); + timer_delete(&ls->ls_scan_timer); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } out: - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } -/* Caller must held ls_rsbtbl_lock and need to be called every time - * when either the rsb enters toss state or the toss state changes - * the dir/master nodeid. - */ -static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) +static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) { int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *first; - /* If we're the directory record for this rsb, and - * we're not the master of it, then we need to wait - * for the master node to send us a dir remove for - * before removing the dir record. - */ - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) == our_nodeid)) { - rsb_delete_toss_timer(ls, r); - return; - } + /* A dir record for a remote master rsb should never be on the scan list. */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* An active rsb should never be on the scan list. */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); - spin_lock_bh(&ls->ls_toss_q_lock); + /* An rsb should not already be on the scan list. */ + WARN_ON(!list_empty(&r->res_scan_list)); + + spin_lock_bh(&ls->ls_scan_lock); /* set the new rsb absolute expire time in the rsb */ r->res_toss_time = rsb_toss_jiffies(); - if (list_empty(&ls->ls_toss_q)) { + if (list_empty(&ls->ls_scan_list)) { /* if the queue is empty add the element and it's * our new expire time */ - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); - __rsb_mod_timer(ls, r->res_toss_time); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + enable_scan_timer(ls, r->res_toss_time); } else { - /* check if the rsb was already queued, if so delete - * it from the toss queue - */ - if (!list_empty(&r->res_toss_q_list)) - list_del(&r->res_toss_q_list); - /* try to get the maybe new first element and then add * to this rsb with the oldest expire time to the end * of the queue. If the list was empty before this * rsb expire time is our next expiration if it wasn't * the now new first elemet is our new expiration time */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); if (!first) - __rsb_mod_timer(ls, r->res_toss_time); + enable_scan_timer(ls, r->res_toss_time); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } /* if we hit contention we do in 250 ms a retry to trylock. @@ -547,9 +505,11 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) */ #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) -void dlm_rsb_toss_timer(struct timer_list *timer) +/* Called by lockspace scan_timer to free unused rsb's. */ + +void dlm_rsb_scan(struct timer_list *timer) { - struct dlm_ls *ls = from_timer(ls, timer, ls_timer); + struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *r; int rv; @@ -557,76 +517,63 @@ void dlm_rsb_toss_timer(struct timer_list *timer) while (1) { /* interrupting point to leave iteration when * recovery waits for timer_delete_sync(), recovery - * will take care to delete everything in toss queue. + * will take care to delete everything in scan list. */ if (dlm_locking_stopped(ls)) break; - rv = spin_trylock(&ls->ls_toss_q_lock); + rv = spin_trylock(&ls->ls_scan_lock); if (!rv) { /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!r) { - /* nothing to do anymore next rsb queue will - * set next mod_timer() expire. - */ - spin_unlock(&ls->ls_toss_q_lock); + /* the next add_scan will enable the timer again */ + spin_unlock(&ls->ls_scan_lock); break; } - /* test if the first rsb isn't expired yet, if - * so we stop freeing rsb from toss queue as - * the order in queue is ascending to the - * absolute res_toss_time jiffies + /* + * If the first rsb is not yet expired, then stop because the + * list is sorted with nearest expiration first. */ if (time_before(jiffies, r->res_toss_time)) { /* rearm with the next rsb to expire in the future */ - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock(&ls->ls_toss_q_lock); + enable_scan_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_scan_lock); break; } /* in find_rsb_dir/nodir there is a reverse order of this * lock, however this is only a trylock if we hit some * possible contention we try it again. - * - * This lock synchronized while holding ls_toss_q_lock - * synchronize everything that rsb_delete_toss_timer() - * or rsb_mod_timer() can't run after this timer callback - * deletes the rsb from the ls_toss_q. Whereas the other - * holders have always a priority to run as this is only - * a caching handling and the other holders might to put - * this rsb out of the toss state. */ rv = write_trylock(&ls->ls_rsbtbl_lock); if (!rv) { - spin_unlock(&ls->ls_toss_q_lock); + spin_unlock(&ls->ls_scan_lock); /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - list_del(&r->res_rsbs_list); + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); - /* not necessary to held the ls_rsbtbl_lock when - * calling send_remove() - */ + /* ls_rsbtbl_lock is not needed when calling send_remove() */ write_unlock(&ls->ls_rsbtbl_lock); - /* remove the rsb out of the toss queue its gone - * drom DLM now - */ - list_del_init(&r->res_toss_q_list); - spin_unlock(&ls->ls_toss_q_lock); + list_del_init(&r->res_scan_list); + spin_unlock(&ls->ls_scan_lock); - /* no rsb in this state should ever run a timer */ + /* An rsb that is a dir record for a remote master rsb + * cannot be removed, and should not have a timer enabled. + */ WARN_ON(!dlm_no_directory(ls) && (r->res_master_nodeid != our_nodeid) && (dlm_dir_nodeid(r) == our_nodeid)); @@ -640,7 +587,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer) |
