summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2010-09-09 14:58:11 -0700
committerDavid S. Miller <davem@davemloft.net>2010-09-09 14:58:11 -0700
commitcf0ac2b8a759fecbefd80f890c6dbe80ba65fd95 (patch)
treed5feaeef197dc681d1cdab2e8070ac31f0c43141
parentf27e21a813e2c4ca74b30a5443602e75b146db9b (diff)
parent905d64c89e2a9d71d0606904b7c3908633db6072 (diff)
downloadlinux-cf0ac2b8a759fecbefd80f890c6dbe80ba65fd95.tar.gz
linux-cf0ac2b8a759fecbefd80f890c6dbe80ba65fd95.tar.bz2
linux-cf0ac2b8a759fecbefd80f890c6dbe80ba65fd95.zip
Merge branch 'for-davem' of git://oss.oracle.com/git/agrover/linux-2.6
-rw-r--r--include/linux/Kbuild1
-rw-r--r--include/linux/rds.h106
-rw-r--r--net/rds/af_rds.c26
-rw-r--r--net/rds/bind.c82
-rw-r--r--net/rds/cong.c8
-rw-r--r--net/rds/connection.c157
-rw-r--r--net/rds/ib.c194
-rw-r--r--net/rds/ib.h100
-rw-r--r--net/rds/ib_cm.c183
-rw-r--r--net/rds/ib_rdma.c314
-rw-r--r--net/rds/ib_recv.c549
-rw-r--r--net/rds/ib_send.c682
-rw-r--r--net/rds/ib_stats.c2
-rw-r--r--net/rds/ib_sysctl.c17
-rw-r--r--net/rds/info.c12
-rw-r--r--net/rds/iw.c4
-rw-r--r--net/rds/iw.h11
-rw-r--r--net/rds/iw_cm.c14
-rw-r--r--net/rds/iw_rdma.c1
-rw-r--r--net/rds/iw_recv.c24
-rw-r--r--net/rds/iw_send.c93
-rw-r--r--net/rds/iw_sysctl.c4
-rw-r--r--net/rds/loop.c31
-rw-r--r--net/rds/message.c118
-rw-r--r--net/rds/page.c5
-rw-r--r--net/rds/rdma.c339
-rw-r--r--net/rds/rdma.h85
-rw-r--r--net/rds/rdma_transport.c42
-rw-r--r--net/rds/rds.h187
-rw-r--r--net/rds/recv.c9
-rw-r--r--net/rds/send.c544
-rw-r--r--net/rds/stats.c6
-rw-r--r--net/rds/sysctl.c4
-rw-r--r--net/rds/tcp.c8
-rw-r--r--net/rds/tcp.h9
-rw-r--r--net/rds/tcp_connect.c2
-rw-r--r--net/rds/tcp_listen.c6
-rw-r--r--net/rds/tcp_recv.c14
-rw-r--r--net/rds/tcp_send.c66
-rw-r--r--net/rds/threads.c69
-rw-r--r--net/rds/transport.c19
-rw-r--r--net/rds/xlist.h80
42 files changed, 2614 insertions, 1613 deletions
diff --git a/include/linux/Kbuild b/include/linux/Kbuild
index 626b629429ff..c7fbf298ad68 100644
--- a/include/linux/Kbuild
+++ b/include/linux/Kbuild
@@ -302,6 +302,7 @@ header-y += quota.h
header-y += radeonfb.h
header-y += random.h
header-y += raw.h
+header-y += rds.h
header-y += reboot.h
header-y += reiserfs_fs.h
header-y += reiserfs_xattr.h
diff --git a/include/linux/rds.h b/include/linux/rds.h
index 7f3971d9fc5c..91950950aa59 100644
--- a/include/linux/rds.h
+++ b/include/linux/rds.h
@@ -73,6 +73,10 @@
#define RDS_CMSG_RDMA_MAP 3
#define RDS_CMSG_RDMA_STATUS 4
#define RDS_CMSG_CONG_UPDATE 5
+#define RDS_CMSG_ATOMIC_FADD 6
+#define RDS_CMSG_ATOMIC_CSWP 7
+#define RDS_CMSG_MASKED_ATOMIC_FADD 8
+#define RDS_CMSG_MASKED_ATOMIC_CSWP 9
#define RDS_INFO_FIRST 10000
#define RDS_INFO_COUNTERS 10000
@@ -89,9 +93,9 @@
#define RDS_INFO_LAST 10010
struct rds_info_counter {
- u_int8_t name[32];
- u_int64_t value;
-} __packed;
+ uint8_t name[32];
+ uint64_t value;
+} __attribute__((packed));
#define RDS_INFO_CONNECTION_FLAG_SENDING 0x01
#define RDS_INFO_CONNECTION_FLAG_CONNECTING 0x02
@@ -100,56 +104,48 @@ struct rds_info_counter {
#define TRANSNAMSIZ 16
struct rds_info_connection {
- u_int64_t next_tx_seq;
- u_int64_t next_rx_seq;
+ uint64_t next_tx_seq;
+ uint64_t next_rx_seq;
__be32 laddr;
__be32 faddr;
- u_int8_t transport[TRANSNAMSIZ]; /* null term ascii */
- u_int8_t flags;
-} __packed;
-
-struct rds_info_flow {
- __be32 laddr;
- __be32 faddr;
- u_int32_t bytes;
- __be16 lport;
- __be16 fport;
-} __packed;
+ uint8_t transport[TRANSNAMSIZ]; /* null term ascii */
+ uint8_t flags;
+} __attribute__((packed));
#define RDS_INFO_MESSAGE_FLAG_ACK 0x01
#define RDS_INFO_MESSAGE_FLAG_FAST_ACK 0x02
struct rds_info_message {
- u_int64_t seq;
- u_int32_t len;
+ uint64_t seq;
+ uint32_t len;
__be32 laddr;
__be32 faddr;
__be16 lport;
__be16 fport;
- u_int8_t flags;
-} __packed;
+ uint8_t flags;
+} __attribute__((packed));
struct rds_info_socket {
- u_int32_t sndbuf;
+ uint32_t sndbuf;
__be32 bound_addr;
__be32 connected_addr;
__be16 bound_port;
__be16 connected_port;
- u_int32_t rcvbuf;
- u_int64_t inum;
-} __packed;
+ uint32_t rcvbuf;
+ uint64_t inum;
+} __attribute__((packed));
struct rds_info_tcp_socket {
__be32 local_addr;
__be16 local_port;
__be32 peer_addr;
__be16 peer_port;
- u_int64_t hdr_rem;
- u_int64_t data_rem;
- u_int32_t last_sent_nxt;
- u_int32_t last_expected_una;
- u_int32_t last_seen_una;
-} __packed;
+ uint64_t hdr_rem;
+ uint64_t data_rem;
+ uint32_t last_sent_nxt;
+ uint32_t last_expected_una;
+ uint32_t last_seen_una;
+} __attribute__((packed));
#define RDS_IB_GID_LEN 16
struct rds_info_rdma_connection {
@@ -203,42 +199,69 @@ struct rds_info_rdma_connection {
* (so that the application does not have to worry about
* alignment).
*/
-typedef u_int64_t rds_rdma_cookie_t;
+typedef uint64_t rds_rdma_cookie_t;
struct rds_iovec {
- u_int64_t addr;
- u_int64_t bytes;
+ uint64_t addr;
+ uint64_t bytes;
};
struct rds_get_mr_args {
struct rds_iovec vec;
- u_int64_t cookie_addr;
+ uint64_t cookie_addr;
uint64_t flags;
};
struct rds_get_mr_for_dest_args {
struct sockaddr_storage dest_addr;
struct rds_iovec vec;
- u_int64_t cookie_addr;
+ uint64_t cookie_addr;
uint64_t flags;
};
struct rds_free_mr_args {
rds_rdma_cookie_t cookie;
- u_int64_t flags;
+ uint64_t flags;
};
struct rds_rdma_args {
rds_rdma_cookie_t cookie;
struct rds_iovec remote_vec;
- u_int64_t local_vec_addr;
- u_int64_t nr_local;
- u_int64_t flags;
- u_int64_t user_token;
+ uint64_t local_vec_addr;
+ uint64_t nr_local;
+ uint64_t flags;
+ uint64_t user_token;
+};
+
+struct rds_atomic_args {
+ rds_rdma_cookie_t cookie;
+ uint64_t local_addr;
+ uint64_t remote_addr;
+ union {
+ struct {
+ uint64_t compare;
+ uint64_t swap;
+ } cswp;
+ struct {
+ uint64_t add;
+ } fadd;
+ struct {
+ uint64_t compare;
+ uint64_t swap;
+ uint64_t compare_mask;
+ uint64_t swap_mask;
+ } m_cswp;
+ struct {
+ uint64_t add;
+ uint64_t nocarry_mask;
+ } m_fadd;
+ };
+ uint64_t flags;
+ uint64_t user_token;
};
struct rds_rdma_notify {
- u_int64_t user_token;
+ uint64_t user_token;
int32_t status;
};
@@ -257,5 +280,6 @@ struct rds_rdma_notify {
#define RDS_RDMA_USE_ONCE 0x0008 /* free MR after use */
#define RDS_RDMA_DONTWAIT 0x0010 /* Don't wait in SET_BARRIER */
#define RDS_RDMA_NOTIFY_ME 0x0020 /* Notify when operation completes */
+#define RDS_RDMA_SILENT 0x0040 /* Do not interrupt remote */
#endif /* IB_RDS_H */
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index aebfecbdb841..bb6ad81b671d 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -39,7 +39,15 @@
#include <net/sock.h>
#include "rds.h"
-#include "rdma.h"
+
+char *rds_str_array(char **array, size_t elements, size_t index)
+{
+ if ((index < elements) && array[index])
+ return array[index];
+ else
+ return "unknown";
+}
+EXPORT_SYMBOL(rds_str_array);
/* this is just used for stats gathering :/ */
static DEFINE_SPINLOCK(rds_sock_lock);
@@ -62,7 +70,7 @@ static int rds_release(struct socket *sock)
struct rds_sock *rs;
unsigned long flags;
- if (sk == NULL)
+ if (!sk)
goto out;
rs = rds_sk_to_rs(sk);
@@ -73,7 +81,15 @@ static int rds_release(struct socket *sock)
* with the socket. */
rds_clear_recv_queue(rs);
rds_cong_remove_socket(rs);
+
+ /*
+ * the binding lookup hash uses rcu, we need to
+ * make sure we sychronize_rcu before we free our
+ * entry
+ */
rds_remove_bound(rs);
+ synchronize_rcu();
+
rds_send_drop_to(rs, NULL);
rds_rdma_drop_keys(rs);
rds_notify_queue_get(rs, NULL);
@@ -83,6 +99,8 @@ static int rds_release(struct socket *sock)
rds_sock_count--;
spin_unlock_irqrestore(&rds_sock_lock, flags);
+ rds_trans_put(rs->rs_transport);
+
sock->sk = NULL;
sock_put(sk);
out:
@@ -514,7 +532,7 @@ out:
spin_unlock_irqrestore(&rds_sock_lock, flags);
}
-static void __exit rds_exit(void)
+static void rds_exit(void)
{
sock_unregister(rds_family_ops.family);
proto_unregister(&rds_proto);
@@ -529,7 +547,7 @@ static void __exit rds_exit(void)
}
module_exit(rds_exit);
-static int __init rds_init(void)
+static int rds_init(void)
{
int ret;
diff --git a/net/rds/bind.c b/net/rds/bind.c
index 5d95fc007f1a..2f6b3fcc79f8 100644
--- a/net/rds/bind.c
+++ b/net/rds/bind.c
@@ -34,45 +34,52 @@
#include <net/sock.h>
#include <linux/in.h>
#include <linux/if_arp.h>
+#include <linux/jhash.h>
#include "rds.h"
-/*
- * XXX this probably still needs more work.. no INADDR_ANY, and rbtrees aren't
- * particularly zippy.
- *
- * This is now called for every incoming frame so we arguably care much more
- * about it than we used to.
- */
+#define BIND_HASH_SIZE 1024
+static struct hlist_head bind_hash_table[BIND_HASH_SIZE];
static DEFINE_SPINLOCK(rds_bind_lock);
-static struct rb_root rds_bind_tree = RB_ROOT;
-static struct rds_sock *rds_bind_tree_walk(__be32 addr, __be16 port,
- struct rds_sock *insert)
+static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port)
+{
+ return bind_hash_table + (jhash_2words((u32)addr, (u32)port, 0) &
+ (BIND_HASH_SIZE - 1));
+}
+
+static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port,
+ struct rds_sock *insert)
{
- struct rb_node **p = &rds_bind_tree.rb_node;
- struct rb_node *parent = NULL;
struct rds_sock *rs;
+ struct hlist_node *node;
+ struct hlist_head *head = hash_to_bucket(addr, port);
u64 cmp;
u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port);
- while (*p) {
- parent = *p;
- rs = rb_entry(parent, struct rds_sock, rs_bound_node);
-
+ rcu_read_lock();
+ hlist_for_each_entry_rcu(rs, node, head, rs_bound_node) {
cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) |
be16_to_cpu(rs->rs_bound_port);
- if (needle < cmp)
- p = &(*p)->rb_left;
- else if (needle > cmp)
- p = &(*p)->rb_right;
- else
+ if (cmp == needle) {
+ rcu_read_unlock();
return rs;
+ }
}
+ rcu_read_unlock();
if (insert) {
- rb_link_node(&insert->rs_bound_node, parent, p);
- rb_insert_color(&insert->rs_bound_node, &rds_bind_tree);
+ /*
+ * make sure our addr and port are set before
+ * we are added to the list, other people
+ * in rcu will find us as soon as the
+ * hlist_add_head_rcu is done
+ */
+ insert->rs_bound_addr = addr;
+ insert->rs_bound_port = port;
+ rds_sock_addref(insert);
+
+ hlist_add_head_rcu(&insert->rs_bound_node, head);
}
return NULL;
}
@@ -86,15 +93,13 @@ static struct rds_sock *rds_bind_tree_walk(__be32 addr, __be16 port,
struct rds_sock *rds_find_bound(__be32 addr, __be16 port)
{
struct rds_sock *rs;
- unsigned long flags;
- spin_lock_irqsave(&rds_bind_lock, flags);
- rs = rds_bind_tree_walk(addr, port, NULL);
+ rs = rds_bind_lookup(addr, port, NULL);
+
if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD))
rds_sock_addref(rs);
else
rs = NULL;
- spin_unlock_irqrestore(&rds_bind_lock, flags);
rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr,
ntohs(port));
@@ -121,22 +126,15 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
do {
if (rover == 0)
rover++;
- if (rds_bind_tree_walk(addr, cpu_to_be16(rover), rs) == NULL) {
- *port = cpu_to_be16(rover);
+ if (!rds_bind_lookup(addr, cpu_to_be16(rover), rs)) {
+ *port = rs->rs_bound_port;
ret = 0;
+ rdsdebug("rs %p binding to %pI4:%d\n",
+ rs, &addr, (int)ntohs(*port));
break;
}
} while (rover++ != last);
- if (ret == 0) {
- rs->rs_bound_addr = addr;
- rs->rs_bound_port = *port;
- rds_sock_addref(rs);
-
- rdsdebug("rs %p binding to %pI4:%d\n",
- rs, &addr, (int)ntohs(*port));
- }
-
spin_unlock_irqrestore(&rds_bind_lock, flags);
return ret;
@@ -153,7 +151,7 @@ void rds_remove_bound(struct rds_sock *rs)
rs, &rs->rs_bound_addr,
ntohs(rs->rs_bound_port));
- rb_erase(&rs->rs_bound_node, &rds_bind_tree);
+ hlist_del_init_rcu(&rs->rs_bound_node);
rds_sock_put(rs);
rs->rs_bound_addr = 0;
}
@@ -184,7 +182,7 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
goto out;
trans = rds_trans_get_preferred(sin->sin_addr.s_addr);
- if (trans == NULL) {
+ if (!trans) {
ret = -EADDRNOTAVAIL;
rds_remove_bound(rs);
if (printk_ratelimit())
@@ -198,5 +196,9 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
out:
release_sock(sk);
+
+ /* we might have called rds_remove_bound on error */
+ if (ret)
+ synchronize_rcu();
return ret;
}
diff --git a/net/rds/cong.c b/net/rds/cong.c
index 0871a29f0780..75ea686f27d5 100644
--- a/net/rds/cong.c
+++ b/net/rds/cong.c
@@ -141,7 +141,7 @@ static struct rds_cong_map *rds_cong_from_addr(__be32 addr)
unsigned long flags;
map = kzalloc(sizeof(struct rds_cong_map), GFP_KERNEL);
- if (map == NULL)
+ if (!map)
return NULL;
map->m_addr = addr;
@@ -159,7 +159,7 @@ static struct rds_cong_map *rds_cong_from_addr(__be32 addr)
ret = rds_cong_tree_walk(addr, map);
spin_unlock_irqrestore(&rds_cong_lock, flags);
- if (ret == NULL) {
+ if (!ret) {
ret = map;
map = NULL;
}
@@ -205,7 +205,7 @@ int rds_cong_get_maps(struct rds_connection *conn)
conn->c_lcong = rds_cong_from_addr(conn->c_laddr);
conn->c_fcong = rds_cong_from_addr(conn->c_faddr);
- if (conn->c_lcong == NULL || conn->c_fcong == NULL)
+ if (!(conn->c_lcong && conn->c_fcong))
return -ENOMEM;
return 0;
@@ -221,7 +221,7 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
if (!test_and_set_bit(0, &conn->c_map_queued)) {
rds_stats_inc(s_cong_update_queued);
- queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+ rds_send_xmit(conn);
}
}
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 7619b671ca28..870992e08cae 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -37,7 +37,6 @@
#include "rds.h"
#include "loop.h"
-#include "rdma.h"
#define RDS_CONNECTION_HASH_BITS 12
#define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
@@ -63,18 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
} while (0)
-static inline int rds_conn_is_sending(struct rds_connection *conn)
-{
- int ret = 0;
-
- if (!mutex_trylock(&conn->c_send_lock))
- ret = 1;
- else
- mutex_unlock(&conn->c_send_lock);
-
- return ret;
-}
-
+/* rcu read lock must be held or the connection spinlock */
static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
__be32 laddr, __be32 faddr,
struct rds_transport *trans)
@@ -82,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
struct rds_connection *conn, *ret = NULL;
struct hlist_node *pos;
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
conn->c_trans == trans) {
ret = conn;
@@ -129,10 +117,11 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
{
struct rds_connection *conn, *parent = NULL;
struct hlist_head *head = rds_conn_bucket(laddr, faddr);
+ struct rds_transport *loop_trans;
unsigned long flags;
int ret;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
conn = rds_conn_lookup(head, laddr, faddr, trans);
if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
!is_outgoing) {
@@ -143,12 +132,12 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
parent = conn;
conn = parent->c_passive;
}
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
if (conn)
goto out;
conn = kmem_cache_zalloc(rds_conn_slab, gfp);
- if (conn == NULL) {
+ if (!conn) {
conn = ERR_PTR(-ENOMEM);
goto out;
}
@@ -159,7 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
- mutex_init(&conn->c_send_lock);
+ init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
@@ -175,7 +164,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
* can bind to the destination address then we'd rather the messages
* flow through loopback rather than either transport.
*/
- if (rds_trans_get_preferred(faddr)) {
+ loop_trans = rds_trans_get_preferred(faddr);
+ if (loop_trans) {
+ rds_trans_put(loop_trans);
conn->c_loopback = 1;
if (is_outgoing && trans->t_prefer_loopback) {
/* "outgoing" connection - and the transport
@@ -238,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
- hlist_add_head(&conn->c_hash_node, head);
+ hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn);
rds_conn_count++;
}
@@ -263,21 +254,91 @@ struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
}
EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
+void rds_conn_shutdown(struct rds_connection *conn)
+{
+ /* shut it down unless it's down already */
+ if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
+ /*
+ * Quiesce the connection mgmt handlers before we start tearing
+ * things down. We don't hold the mutex for the entire
+ * duration of the shutdown operation, else we may be
+ * deadlocking with the CM handler. Instead, the CM event
+ * handler is supposed to check for state DISCONNECTING
+ */
+ mutex_lock(&conn->c_cm_lock);
+ if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
+ && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
+ rds_conn_error(conn, "shutdown called in state %d\n",
+ atomic_read(&conn->c_state));
+ mutex_unlock(&conn->c_cm_lock);
+ return;
+ }
+ mutex_unlock(&conn->c_cm_lock);
+
+ wait_event(conn->c_waitq,
+ !test_bit(RDS_IN_XMIT, &conn->c_flags));
+
+ conn->c_trans->conn_shutdown(conn);
+ rds_conn_reset(conn);
+
+ if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
+ /* This can happen - eg when we're in the middle of tearing
+ * down the connection, and someone unloads the rds module.
+ * Quite reproduceable with loopback connections.
+ * Mostly harmless.
+ */
+ rds_conn_error(conn,
+ "%s: failed to transition to state DOWN, "
+ "current state is %d\n",
+ __func__,
+ atomic_read(&conn->c_state));
+ return;
+ }
+ }
+
+ /* Then reconnect if it's still live.
+ * The passive side of an IB loopback connection is never added
+ * to the conn hash, so we never trigger a reconnect on this
+ * conn - the reconnect is always triggered by the active peer. */
+ cancel_delayed_work_sync(&conn->c_conn_w);
+ rcu_read_lock();
+ if (!hlist_unhashed(&conn->c_hash_node)) {
+ rcu_read_unlock();
+ rds_queue_reconnect(conn);
+ } else {
+ rcu_read_unlock();
+ }
+}
+
+/*
+ * Stop and free a connection.
+ *
+ * This can only be used in very limited circumstances. It assumes that once
+ * the conn has been shutdown that no one else is referencing the connection.
+ * We can only ensure this in the rmmod path in the current code.
+ */
void rds_conn_destroy(struct rds_connection *conn)
{
struct rds_message *rm, *rtmp;
+ unsigned long flags;
rdsdebug("freeing conn %p for %pI4 -> "
"%pI4\n", conn, &conn->c_laddr,
&conn->c_faddr);
- hlist_del_init(&conn->c_hash_node);
+ /* Ensure conn will not be scheduled for reconnect */
+ spin_lock_irq(&rds_conn_lock);
+ hlist_del_init_rcu(&conn->c_hash_node);
+ spin_unlock_irq(&rds_conn_lock);
+ synchronize_rcu();
- /* wait for the rds thread to shut it down */
- atomic_set(&conn->c_state, RDS_CONN_ERROR);
- cancel_delayed_work(&conn->c_conn_w);
- queue_work(rds_wq, &conn->c_down_w);
- flush_workqueue(rds_wq);
+ /* shut the connection down */
+ rds_conn_drop(conn);
+ flush_work(&conn->c_down_w);
+
+ /* make sure lingering queued work won't try to ref the conn */
+ cancel_delayed_work_sync(&conn->c_send_w);
+ cancel_delayed_work_sync(&conn->c_recv_w);
/* tear down queued messages */
list_for_each_entry_safe(rm, rtmp,
@@ -302,7 +363,9 @@ void rds_conn_destroy(struct rds_connection *conn)
BUG_ON(!list_empty(&conn->c_retrans));
kmem_cache_free(rds_conn_slab, conn);
+ spin_lock_irqsave(&rds_conn_lock, flags);
rds_conn_count--;
+ spin_unlock_irqrestore(&rds_conn_lock, flags);
}
EXPORT_SYMBOL_GPL(rds_conn_destroy);
@@ -316,23 +379,23 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
struct list_head *list;
struct rds_connection *conn;
struct rds_message *rm;
- unsigned long flags;
unsigned int total = 0;
+ unsigned long flags;
size_t i;
len /= sizeof(struct rds_info_message);
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (want_send)
list = &conn->c_send_queue;
else
list = &conn->c_retrans;
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
/* XXX too lazy to maintain counts.. */
list_for_each_entry(rm, list, m_conn_item) {
@@ -343,11 +406,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
conn->c_faddr, 0);
}
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
lens->nr = total;
lens->each = sizeof(struct rds_info_message);
@@ -377,19 +439,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
uint64_t buffer[(item_len + 7) / 8];
struct hlist_head *head;
struct hlist_node *pos;
- struct hlist_node *tmp;
struct rds_connection *conn;
- unsigned long flags;
size_t i;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
lens->nr = 0;
lens->each = item_len;
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
/* XXX no c_lock usage.. */
if (!visitor(conn, buffer))
@@ -405,8 +465,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
lens->nr++;
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
}
EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
@@ -423,8 +482,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
sizeof(cinfo->transport));
cinfo->flags = 0;
- rds_conn_info_set(cinfo->flags,
- rds_conn_is_sending(conn), SENDING);
+ rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+ SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
@@ -444,12 +503,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
sizeof(struct rds_info_connection));
}
-int __init rds_conn_init(void)
+int rds_conn_init(void)
{
rds_conn_slab = kmem_cache_create("rds_connection",
sizeof(struct rds_connection),
0, 0, NULL);
- if (rds_conn_slab == NULL)
+ if (!rds_conn_slab)
return -ENOMEM;
rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
@@ -487,6 +546,18 @@ void rds_conn_drop(struct rds_connection *conn)
EXPORT_SYMBOL_GPL(rds_conn_drop);
/*
+ * If the connection is down, trigger a connect. We may have scheduled a
+ * delayed reconnect however - in this case we should not interfere.
+ */
+void rds_conn_connect_if_down(struct rds_connection *conn)
+{
+ if (rds_conn_state(conn) == RDS_CONN_DOWN &&
+ !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
+ queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+}
+EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
+
+/*
*