diff options
Diffstat (limited to 'fs/dlm/lowcomms.c')
| -rw-r--r-- | fs/dlm/lowcomms.c | 1538 |
1 files changed, 740 insertions, 798 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 59f64c596233..8b80ca0cd65f 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -63,41 +63,49 @@ #define NEEDED_RMEM (4*1024*1024) -/* Number of messages to send before rescheduling */ -#define MAX_SEND_MSG_COUNT 25 -#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) - struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ - struct mutex sock_mutex; + /* this semaphore is used to allow parallel recv/send in read + * lock mode. When we release a sock we need to held the write lock. + * + * However this is locking code and not nice. When we remove the + * othercon handling we can look into other mechanism to synchronize + * io handling to call sock_release() at the right time. + */ + struct rw_semaphore sock_lock; unsigned long flags; -#define CF_READ_PENDING 1 -#define CF_WRITE_PENDING 2 -#define CF_INIT_PENDING 4 +#define CF_APP_LIMITED 0 +#define CF_RECV_PENDING 1 +#define CF_SEND_PENDING 2 +#define CF_RECV_INTR 3 +#define CF_IO_STOP 4 #define CF_IS_OTHERCON 5 -#define CF_CLOSE 6 -#define CF_APP_LIMITED 7 -#define CF_CLOSING 8 -#define CF_SHUTDOWN 9 -#define CF_CONNECTED 10 -#define CF_RECONNECT 11 -#define CF_DELAY_CONNECT 12 -#define CF_EOF 13 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; - atomic_t writequeue_cnt; int retries; -#define MAX_CONNECT_RETRIES 3 struct hlist_node list; + /* due some connect()/accept() races we currently have this cross over + * connection attempt second connection for one node. + * + * There is a solution to avoid the race by introducing a connect + * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to + * connect. Otherside can connect but will only be considered that + * the other side wants to have a reconnect. + * + * However changing to this behaviour will break backwards compatible. + * In a DLM protocol major version upgrade we should remove this! + */ struct connection *othercon; - struct connection *sendcon; - struct work_struct rwork; /* Receive workqueue */ - struct work_struct swork; /* Send workqueue */ - wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ - unsigned char *rx_buf; - int rx_buflen; + struct work_struct rwork; /* receive worker */ + struct work_struct swork; /* send worker */ + unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; int rx_leftover; + int mark; + int addr_count; + int curr_addr_index; + struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; + spinlock_t addrs_lock; struct rcu_head rcu; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -136,13 +144,12 @@ struct dlm_msg { struct kref ref; }; -struct dlm_node_addr { - struct list_head list; +struct processqueue_entry { + unsigned char *buf; int nodeid; - int mark; - int addr_count; - int curr_addr_index; - struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; + int buflen; + + struct list_head list; }; struct dlm_proto_ops { @@ -157,10 +164,6 @@ struct dlm_proto_ops { int (*listen_validate)(void); void (*listen_sockopts)(struct socket *sock); int (*listen_bind)(struct socket *sock); - /* What to do to shutdown */ - void (*shutdown_action)(struct connection *con); - /* What to do to eof check */ - bool (*eof_condition)(struct connection *con); }; static struct listen_sock_callbacks { @@ -170,17 +173,13 @@ static struct listen_sock_callbacks { void (*sk_write_space)(struct sock *); } listen_sock; -static LIST_HEAD(dlm_node_addrs); -static DEFINE_SPINLOCK(dlm_node_addrs_spin); - static struct listen_connection listen_con; -static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; +static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; static int dlm_local_count; -int dlm_allow_conn; /* Work queues */ -static struct workqueue_struct *recv_workqueue; -static struct workqueue_struct *send_workqueue; +static struct workqueue_struct *io_workqueue; +static struct workqueue_struct *process_workqueue; static struct hlist_head connection_hash[CONN_HASH_SIZE]; static DEFINE_SPINLOCK(connections_lock); @@ -188,8 +187,45 @@ DEFINE_STATIC_SRCU(connections_srcu); static const struct dlm_proto_ops *dlm_proto_ops; +#define DLM_IO_SUCCESS 0 +#define DLM_IO_END 1 +#define DLM_IO_EOF 2 +#define DLM_IO_RESCHED 3 + static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); +static void process_dlm_messages(struct work_struct *work); + +static DECLARE_WORK(process_work, process_dlm_messages); +static DEFINE_SPINLOCK(processqueue_lock); +static bool process_dlm_messages_pending; +static LIST_HEAD(processqueue); + +bool dlm_lowcomms_is_running(void) +{ + return !!listen_con.sock; +} + +static void lowcomms_queue_swork(struct connection *con) +{ + assert_spin_locked(&con->writequeue_lock); + + if (!test_bit(CF_IO_STOP, &con->flags) && + !test_bit(CF_APP_LIMITED, &con->flags) && + !test_and_set_bit(CF_SEND_PENDING, &con->flags)) + queue_work(io_workqueue, &con->swork); +} + +static void lowcomms_queue_rwork(struct connection *con) +{ +#ifdef CONFIG_LOCKDEP + WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); +#endif + + if (!test_bit(CF_IO_STOP, &con->flags) && + !test_and_set_bit(CF_RECV_PENDING, &con->flags)) + queue_work(io_workqueue, &con->rwork); +} static void writequeue_entry_ctor(void *data) { @@ -214,15 +250,12 @@ static struct writequeue_entry *con_next_wq(struct connection *con) { struct writequeue_entry *e; - if (list_empty(&con->writequeue)) - return NULL; - - e = list_first_entry(&con->writequeue, struct writequeue_entry, - list); + e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, + list); /* if len is zero nothing is to send, if there are users filling * buffers we wait until the users are done so we can send more. */ - if (e->users || e->len == 0) + if (!e || e->users || e->len == 0) return NULL; return e; @@ -240,28 +273,15 @@ static struct connection *__find_con(int nodeid, int r) return NULL; } -static bool tcp_eof_condition(struct connection *con) -{ - return atomic_read(&con->writequeue_cnt); -} - -static int dlm_con_init(struct connection *con, int nodeid) +static void dlm_con_init(struct connection *con, int nodeid) { - con->rx_buflen = dlm_config.ci_buffer_size; - con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); - if (!con->rx_buf) - return -ENOMEM; - con->nodeid = nodeid; - mutex_init(&con->sock_mutex); + init_rwsem(&con->sock_lock); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); - atomic_set(&con->writequeue_cnt, 0); INIT_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); - init_waitqueue_head(&con->shutdown_wait); - - return 0; + spin_lock_init(&con->addrs_lock); } /* @@ -271,7 +291,7 @@ static int dlm_con_init(struct connection *con, int nodeid) static struct connection *nodeid2con(int nodeid, gfp_t alloc) { struct connection *con, *tmp; - int r, ret; + int r; r = nodeid_hash(nodeid); con = __find_con(nodeid, r); @@ -282,11 +302,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) if (!con) return NULL; - ret = dlm_con_init(con, nodeid); - if (ret) { - kfree(con); - return NULL; - } + dlm_con_init(con, nodeid); spin_lock(&connections_lock); /* Because multiple workqueues/threads calls this function it can @@ -298,7 +314,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) tmp = __find_con(nodeid, r); if (tmp) { spin_unlock(&connections_lock); - kfree(con->rx_buf); kfree(con); return tmp; } @@ -309,29 +324,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) return con; } -/* Loop round all connections */ -static void foreach_conn(void (*conn_func)(struct connection *c)) -{ - int i; - struct connection *con; - - for (i = 0; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry_rcu(con, &connection_hash[i], list) - conn_func(con); - } -} - -static struct dlm_node_addr *find_node_addr(int nodeid) -{ - struct dlm_node_addr *na; - - list_for_each_entry(na, &dlm_node_addrs, list) { - if (na->nodeid == nodeid) - return na; - } - return NULL; -} - static int addr_compare(const struct sockaddr_storage *x, const struct sockaddr_storage *y) { @@ -365,40 +357,47 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, unsigned int *mark) { struct sockaddr_storage sas; - struct dlm_node_addr *na; + struct connection *con; + int idx; if (!dlm_local_count) return -1; - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (na && na->addr_count) { - memcpy(&sas, na->addr[na->curr_addr_index], - sizeof(struct sockaddr_storage)); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; + } - if (try_new_addr) { - na->curr_addr_index++; - if (na->curr_addr_index == na->addr_count) - na->curr_addr_index = 0; - } + spin_lock(&con->addrs_lock); + if (!con->addr_count) { + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; } - spin_unlock(&dlm_node_addrs_spin); - if (!na) - return -EEXIST; + memcpy(&sas, &con->addr[con->curr_addr_index], + sizeof(struct sockaddr_storage)); - if (!na->addr_count) - return -ENOENT; + if (try_new_addr) { + con->curr_addr_index++; + if (con->curr_addr_index == con->addr_count) + con->curr_addr_index = 0; + } - *mark = na->mark; + *mark = con->mark; + spin_unlock(&con->addrs_lock); if (sas_out) memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); - if (!sa_out) + if (!sa_out) { + srcu_read_unlock(&connections_srcu, idx); return 0; + } - if (dlm_local_addr[0]->ss_family == AF_INET) { + if (dlm_local_addr[0].ss_family == AF_INET) { struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; ret4->sin_addr.s_addr = in4->sin_addr.s_addr; @@ -408,43 +407,46 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, ret6->sin6_addr = in6->sin6_addr; } + srcu_read_unlock(&connections_srcu, idx); return 0; } static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, unsigned int *mark) { - struct dlm_node_addr *na; - int rv = -EEXIST; - int addr_i; - - spin_lock(&dlm_node_addrs_spin); - list_for_each_entry(na, &dlm_node_addrs, list) { - if (!na->addr_count) - continue; - - for (addr_i = 0; addr_i < na->addr_count; addr_i++) { - if (addr_compare(na->addr[addr_i], addr)) { - *nodeid = na->nodeid; - *mark = na->mark; - rv = 0; - goto unlock; + struct connection *con; + int i, idx, addr_i; + + idx = srcu_read_lock(&connections_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(con, &connection_hash[i], list) { + WARN_ON_ONCE(!con->addr_count); + + spin_lock(&con->addrs_lock); + for (addr_i = 0; addr_i < con->addr_count; addr_i++) { + if (addr_compare(&con->addr[addr_i], addr)) { + *nodeid = con->nodeid; + *mark = con->mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return 0; + } } + spin_unlock(&con->addrs_lock); } } -unlock: - spin_unlock(&dlm_node_addrs_spin); - return rv; + srcu_read_unlock(&connections_srcu, idx); + + return -ENOENT; } -/* caller need to held dlm_node_addrs_spin lock */ -static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na, - const struct sockaddr_storage *addr) +static bool dlm_lowcomms_con_has_addr(const struct connection *con, + const struct sockaddr_storage *addr) { int i; - for (i = 0; i < na->addr_count; i++) { - if (addr_compare(na->addr[i], addr)) + for (i = 0; i < con->addr_count; i++) { + if (addr_compare(&con->addr[i], addr)) return true; } @@ -453,118 +455,82 @@ static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na, int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) { - struct sockaddr_storage *new_addr; - struct dlm_node_addr *new_node, *na; - bool ret; - - new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); - if (!new_node) - return -ENOMEM; + struct connection *con; + bool ret, idx; - new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); - if (!new_addr) { - kfree(new_node); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, GFP_NOFS); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); return -ENOMEM; } - memcpy(new_addr, addr, len); - - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (!na) { - new_node->nodeid = nodeid; - new_node->addr[0] = new_addr; - new_node->addr_count = 1; - new_node->mark = dlm_config.ci_mark; - list_add(&new_node->list, &dlm_node_addrs); - spin_unlock(&dlm_node_addrs_spin); + spin_lock(&con->addrs_lock); + if (!con->addr_count) { + memcpy(&con->addr[0], addr, sizeof(*addr)); + con->addr_count = 1; + con->mark = dlm_config.ci_mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return 0; } - ret = dlm_lowcomms_na_has_addr(na, addr); + ret = dlm_lowcomms_con_has_addr(con, addr); if (ret) { - spin_unlock(&dlm_node_addrs_spin); - kfree(new_addr); - kfree(new_node); + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return -EEXIST; } - if (na->addr_count >= DLM_MAX_ADDR_COUNT) { - spin_unlock(&dlm_node_addrs_spin); - kfree(new_addr); - kfree(new_node); + if (con->addr_count >= DLM_MAX_ADDR_COUNT) { + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return -ENOSPC; } - na->addr[na->addr_count++] = new_addr; - spin_unlock(&dlm_node_addrs_spin); - kfree(new_node); + memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); + srcu_read_unlock(&connections_srcu, idx); + spin_unlock(&con->addrs_lock); return 0; } /* Data available on socket or listen socket received a connect */ static void lowcomms_data_ready(struct sock *sk) { - struct connection *con; - - con = sock2con(sk); - if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); -} - -static void lowcomms_listen_data_ready(struct sock *sk) -{ - if (!dlm_allow_conn) - return; + struct connection *con = sock2con(sk); - queue_work(recv_workqueue, &listen_con.rwork); + set_bit(CF_RECV_INTR, &con->flags); + lowcomms_queue_rwork(con); } static void lowcomms_write_space(struct sock *sk) { - struct connection *con; - - con = sock2con(sk); - if (!con) - return; - - if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { - log_print("connected to node %d", con->nodeid); - queue_work(send_workqueue, &con->swork); - return; - } + struct connection *con = sock2con(sk); clear_bit(SOCK_NOSPACE, &con->sock->flags); + spin_lock_bh(&con->writequeue_lock); if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { con->sock->sk->sk_write_pending--; clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); } - queue_work(send_workqueue, &con->swork); -} - -static inline void lowcomms_connect_sock(struct connection *con) -{ - if (test_bit(CF_CLOSE, &con->flags)) - return; - queue_work(send_workqueue, &con->swork); - cond_resched(); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); } static void lowcomms_state_change(struct sock *sk) { /* SCTP layer is not calling sk_data_ready when the connection - * is done, so we catch the signal through here. Also, it - * doesn't switch socket state when entering shutdown, so we - * skip the write in that case. + * is done, so we catch the signal through here. */ - if (sk->sk_shutdown) { - if (sk->sk_shutdown == RCV_SHUTDOWN) - lowcomms_data_ready(sk); - } else if (sk->sk_state == TCP_ESTABLISHED) { - lowcomms_write_space(sk); - } + if (sk->sk_shutdown == RCV_SHUTDOWN) + lowcomms_data_ready(sk); +} + +static void lowcomms_listen_data_ready(struct sock *sk) +{ + queue_work(io_workqueue, &listen_con.rwork); } int dlm_lowcomms_connect_node(int nodeid) @@ -576,47 +542,49 @@ int dlm_lowcomms_connect_node(int nodeid) return 0; idx = srcu_read_lock(&connections_srcu); - con = nodeid2con(nodeid, GFP_NOFS); - if (!con) { + con = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!con)) { srcu_read_unlock(&connections_srcu, idx); - return -ENOMEM; + return -ENOENT; } - lowcomms_connect_sock(con); + down_read(&con->sock_lock); + if (!con->sock) { + spin_lock_bh(&con->writequeue_lock); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); + } + up_read(&con->sock_lock); srcu_read_unlock(&connections_srcu, idx); + cond_resched(); return 0; } int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) { - struct dlm_node_addr *na; + struct connection *con; + int idx; - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (!na) { - spin_unlock(&dlm_node_addrs_spin); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); return -ENOENT; } - na->mark = mark; - spin_unlock(&dlm_node_addrs_spin); - + spin_lock(&con->addrs_lock); + con->mark = mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return 0; } static void lowcomms_error_report(struct sock *sk) { - struct connection *con; - void (*orig_report)(struct sock *) = NULL; + struct connection *con = sock2con(sk); struct inet_sock *inet; - con = sock2con(sk); - if (con == NULL) - goto out; - - orig_report = listen_sock.sk_error_report; - inet = inet_sk(sk); switch (sk->sk_family) { case AF_INET: @@ -642,66 +610,25 @@ static void lowcomms_error_report(struct sock *sk) "invalid socket family %d set, " "sk_err=%d/%d\n", dlm_our_nodeid(), sk->sk_family, sk->sk_err, sk->sk_err_soft); - goto out; - } - - /* below sendcon only handling */ - if (test_bit(CF_IS_OTHERCON, &con->flags)) - con = con->sendcon; - - switch (sk->sk_err) { - case ECONNREFUSED: - set_bit(CF_DELAY_CONNECT, &con->flags); - break; - default: break; } - if (!test_and_set_bit(CF_RECONNECT, &con->flags)) - queue_work(send_workqueue, &con->swork); + dlm_midcomms_unack_msg_resend(con->nodeid); -out: - if (orig_report) - orig_report(sk); + listen_sock.sk_error_report(sk); } -/* Note: sk_callback_lock must be locked before calling this function. */ -static void save_listen_callbacks(struct socket *sock) +static void restore_callbacks(struct sock *sk) { - struct sock *sk = sock->sk; - - listen_sock.sk_data_ready = sk->sk_data_ready; - listen_sock.sk_state_change = sk->sk_state_change; - listen_sock.sk_write_space = sk->sk_write_space; - listen_sock.sk_error_report = sk->sk_error_report; -} - -static void restore_callbacks(struct socket *sock) -{ - struct sock *sk = sock->sk; +#ifdef CONFIG_LOCKDEP + WARN_ON_ONCE(!lockdep_sock_is_held(sk)); +#endif - lock_sock(sk); sk->sk_user_data = NULL; sk->sk_data_ready = listen_sock.sk_data_ready; sk->sk_state_change = listen_sock.sk_state_change; sk->sk_write_space = listen_sock.sk_write_space; sk->sk_error_report = listen_sock.sk_error_report; - release_sock(sk); -} - -static void add_listen_sock(struct socket *sock, struct listen_connection *con) -{ - struct sock *sk = sock->sk; - - lock_sock(sk); - save_listen_callbacks(sock); - con->sock = sock; - - sk->sk_user_data = con; - sk->sk_allocation = GFP_NOFS; - /* Install a data_ready callback */ - sk->sk_data_ready = lowcomms_listen_data_ready; - release_sock(sk); } /* Make a socket active */ @@ -713,10 +640,10 @@ static void add_sock(struct socket *sock, struct connection *con) con->sock = sock; sk->sk_user_data = con; - /* Install a data_ready callback */ sk->sk_data_ready = lowcomms_data_ready; sk->sk_write_space = lowcomms_write_space; - sk->sk_state_change = lowcomms_state_change; + if (dlm_config.ci_protocol == DLM_PROTO_SCTP) + sk->sk_state_change = lowcomms_state_change; sk->sk_allocation = GFP_NOFS; sk->sk_error_report = lowcomms_error_report; release_sock(sk); @@ -727,7 +654,7 @@ static void add_sock(struct socket *sock, struct connection *con) static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, int *addr_len) { - saddr->ss_family = dlm_local_addr[0]->ss_family; + saddr->ss_family = dlm_local_addr[0].ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; in4_addr->sin_port = cpu_to_be16(port); @@ -773,43 +700,67 @@ static void free_entry(struct writequeue_entry *e) } list_del(&e->list); - atomic_dec(&e->con->writequeue_cnt); kref_put(&e->ref, dlm_page_release); } static void dlm_close_sock(struct socket **sock) { - if (*sock) { - restore_callbacks(*sock); - sock_release(*sock); - *sock = NULL; + lock_sock((*sock)->sk); + restore_callbacks((*sock)->sk); + release_sock((*sock)->sk); + + sock_release(*sock); + *sock = NULL; +} + +static void allow_connection_io(struct connection *con) +{ + if (con->othercon) + clear_bit(CF_IO_STOP, &con->othercon->flags); + clear_bit(CF_IO_STOP, &con->flags); +} + +static void stop_connection_io(struct connection *con) +{ + if (con->othercon) + stop_connection_io(con->othercon); + + down_write(&con->sock_lock); + if (con->sock) { + lock_sock(con->sock->sk); + restore_callbacks(con->sock->sk); + + spin_lock_bh(&con->writequeue_lock); + set_bit(CF_IO_STOP, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + release_sock(con->sock->sk); + } else { + spin_lock_bh(&con->writequeue_lock); + set_bit(CF_IO_STOP, &con->flags); + spin_unlock_bh(&con->writequeue_lock); } + up_write(&con->sock_lock); + + cancel_work_sync(&con->swork); + cancel_work_sync(&con->rwork); } /* Close a remote connection and tidy up */ -static void close_connection(struct connection *con, bool and_other, - bool tx, bool rx) +static void close_connection(struct connection *con, bool and_other) { - bool closing = test_and_set_bit(CF_CLOSING, &con->flags); struct writequeue_entry *e; - if (tx && !closing && cancel_work_sync(&con->swork)) { - log_print("canceled swork for node %d", con->nodeid); - clear_bit(CF_WRITE_PENDING, &con->flags); - } - if (rx && !closing && cancel_work_sync(&con->rwork)) { - log_print("canceled rwork for node %d", con->nodeid); - clear_bit(CF_READ_PENDING, &con->flags); + if (con->othercon && and_other) + close_connection(con->othercon, false); + + down_write(&con->sock_lock); + if (!con->sock) { + up_write(&con->sock_lock); + return; } - mutex_lock(&con->sock_mutex); dlm_close_sock(&con->sock); - if (con->othercon && and_other) { - /* Will only re-enter once. */ - close_connection(con->othercon, false, tx, rx); - } - /* if we send a writequeue entry only a half way, we drop the * whole entry because reconnection and that we not start of the * middle of a msg which will confuse the other end. @@ -821,200 +772,209 @@ static void close_connection(struct connection *con, bool and_other, * our policy is to start on a clean state when disconnects, we don't * know what's send/received on transport layer in this case. */ - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); if (!list_empty(&con->writequeue)) { e = list_first_entry(&con->writequeue, struct writequeue_entry, list); if (e->dirty) free_entry(e); } - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); con->rx_leftover = 0; con->retries = 0; clear_bit(CF_APP_LIMITED, &con->flags); - clear_bit(CF_CONNECTED, &con->flags); - clear_bit(CF_DELAY_CONNECT, &con->flags); - clear_bit(CF_RECONNECT, &con->flags); - clear_bit(CF_EOF, &con->flags); - mutex_unlock(&con->sock_mutex); - clear_bit(CF_CLOSING, &con->flags); + clear_bit(CF_RECV_PENDING, &con->flags); + clear_bit(CF_SEND_PENDING, &con->flags); + up_write(&con->sock_lock); } -static void shutdown_connection(struct connection *con) +static struct processqueue_entry *new_processqueue_entry(int nodeid, + int buflen) { - int ret; - - flush_work(&con->swork); + struct processqueue_entry *pentry; - mutex_lock(&con->sock_mutex); - /* nothing to shutdown */ - if (!con->sock) { - mutex_unlock(&con->sock_mutex); - return; - } + pentry = kmalloc(sizeof(*pentry), GFP_NOFS); + if (!pentry) + return NULL; - set_bit(CF_SHUTDOWN, &con->flags); - ret = kernel_sock_shutdown(con->sock, SHUT_WR); - mutex_unlock(&con->sock_mutex); - if (ret) { - log_print("Connection %p failed to shutdown: %d will force close", - con, ret); - goto force_close; - } else { - ret = wait_event_timeout(con->shutdown_wait, - !test_bit(CF_SHUTDOWN, &con->flags), - DLM_SHUTDOWN_WAIT_TIMEOUT); - if (ret == 0) { - log_print("Connection %p shutdown timed out, will force close", - con); - goto force_close; - } + pentry->buf = kmalloc(buflen, GFP_NOFS); + if (!pentry->buf) { + kfree(pentry); + return NULL; } - return; + pentry->nodeid = nodeid; + return pentry; +} -force_close: - clear_bit(CF_SHUTDOWN, &con->flags); - close_connection(con, false, true, true); +static void free_processqueue_entry(struct processqueue_entry *pentry) +{ + kfree(pentry->buf); + kfree(pentry); } -static void dlm_tcp_shutdown(struct connection *con) +struct dlm_processed_nodes { + int nodeid; + + struct list_head list; +}; + +static void add_processed_node(int nodeid, struct list_head *processed_nodes) { - if (con->othercon) - shutdown_connection(con->othercon); - shutdown_connection(con); + struct dlm_processed_nodes *n; + + list_for_each_entry(n, processed_nodes, list) { + /* we already remembered this node */ + if (n->nodeid == nodeid) + return; + } + + /* if it's fails in worst case we simple don't send an ack back. + * We try it next time. + */ + n = kmalloc(sizeof(*n), GFP_NOFS); + if (!n) + return; + + n->nodeid = nodeid; + list_add(&n->list, processed_nodes); } -static int con_realloc_receive_buf(struct connection *con, int newlen) +static void process_dlm_messages(struct work_struct *work) { - unsigned char *newbuf; + struct dlm_processed_nodes *n, *n_tmp; + struct processqueue_entry *pentry; + LIST_HEAD(processed_nodes); - newbuf = kmalloc(newlen, GFP_NOFS); - if (!newbuf) - return -ENOMEM; + spin_lock(&processqueue_lock); + pentry = list_first_entry_or_null(&processqueue, + struct processqueue_entry, list); + if (WARN_ON_ONCE(!pentry)) { + spin_unlock(&processqueue_lock); + return; + } - /* copy any leftover from last receive */ - if (con->rx_leftover) - memmove(newbuf, con->rx_buf, con->rx_leftover); + list_del(&pentry->list); + spin_unlock(&processqueue_lock); - /* swap to new buffer space */ - kfree(con->rx_buf); - con->rx_buflen = newlen; - con->rx_buf = newbuf; + for (;;) { + dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, + pentry->buflen); + add_processed_node(pentry->nodeid, &processed_nodes); + free_processqueue_entry(pentry); + + spin_lock(&processqueue_lock); + pentry = list_first_entry_or_null(&processqueue, + struct processqueue_entry, list); + if (!pentry) { + process_dlm_messages_pending = false; + spin_unlock(&processqueue_lock); + break; + } - return 0; + list_del(&pentry->list); + spin_unlock(&processqueue_lock); + } + + /* send ack back after we processed couple of messages */ + list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { + list_del(&n->list); + dlm_midcomms_receive_done(n->nodeid); + kfree(n); + } } /* Data received from remote end */ -static int receive_from_sock(struct connection *con) +static int receive_from_sock(struct connection *con, int buflen) { + struct processqueue_entry *pentry; + int ret, buflen_real; struct msghdr msg; struct kvec iov; - int ret, buflen; - mutex_lock(&con->sock_mutex); + pentry = new_processqueue_entry(con->nodeid, buflen); + if (!pentry) + return DLM_IO_RESCHED; - if (con->sock == NULL) { - ret = -EAGAIN; - goto out_close; - } - - /* realloc if we get new buffer size to read out */ - buflen = dlm_config.ci_buffer_size; - if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { - ret = con_realloc_receive_buf(con, buflen); - if (ret < 0) - goto out_resched; - } + memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); - for (;;) { - /* calculate new buffer parameter regarding last receive and - * possible leftover bytes - */ - iov.iov_base = con->rx_buf + con->rx_leftover; - iov.iov_len = con->rx_buflen - con->rx_leftover; - - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; - ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, - msg.msg_flags); - trace_dlm_recv(con->nodeid, ret); - if (ret == -EAGAIN) - break; - else if (ret <= 0) - goto out_close; - - /* new buflen according readed bytes and leftover from last receive */ - buflen = ret + con->rx_leftover; - ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); - if (ret < 0) - goto out_close; - - /* calculate leftover bytes from process and put it into begin of - * the receive buffer, so next receive we have the full message - * at the start address of the receive buffer. - */ - con->rx_leftover = buflen - ret; - if (con->rx_leftover) { - memmove(con->rx_buf, con->rx_buf + ret, - con->rx_leftover); + /* calculate new buffer parameter regarding last receive and + * possible leftover bytes + */ + iov.iov_base = pentry->buf + con->rx_leftover; + iov.iov_len = buflen - con->rx_leftover; + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + clear_bit(CF_RECV_INTR, &con->flags); +again: + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); + trace_dlm_recv(con->nodeid, ret); + if (ret == -EAGAIN) { + lock_sock(con->sock->sk); + if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { + release_sock(con->sock->sk); + goto again; } + + clear_bit(CF_RECV_PENDING, &con->flags); + release_sock(con->sock->sk); + free_processqueue_entry(pentry); + return DLM_IO_END; + } else if (ret == 0) { + /* close will clear CF_RECV_PENDING */ + free_processqueue_entry(pentry); + return DLM_IO_EOF; + } else if (ret < 0) { + free_processqueue_entry(pentry); + return ret; } - dlm_midcomms_receive_done(con->nodeid); - mutex_unlock(&con->sock_mutex); - return 0; + /* new buflen according readed bytes and leftover from last receive */ + buflen_real = ret + con->rx_leftover; + ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, + buflen_real); + if (ret < 0) { + free_processqueue_entry(pentry); + return ret; + } -out_resched: - if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); - mutex_unlock(&con->sock_mutex); - return -EAGAIN; - -out_close: - if (ret == 0) { - log_print("connection %p got EOF from %d", - con, con->nodeid); - - if (dlm_proto_ops->eof_condition && - dlm_proto_ops->eof_condition(con)) { - set_bit(CF_EOF, &con->flags); - mutex_unlock(&con->sock_mutex); - } else { - mutex_unlock(&con->sock_mutex); - close_connection(con, false, true, false); + pentry->buflen = ret; - /* handling for tcp shutdown */ - clear_bit(CF_SHUTDOWN, &con->flags); - wake_up(&con->shutdown_wait); - } + /* calculate leftover bytes from process and put it into begin of + * the receive buffer, so next receive we have the full message + * at the start address of the receive buffer. + */ + con->rx_leftover = buflen_real - ret; + memmove(con->rx_leftover_buf, pentry->buf + ret, + con->rx_leftover); - /* signal to breaking receive worker */ - ret = -1; - } else { |
