802
static int nodeid_warned(int nodeid, int num_nodes, int *warned)
806
for (i = 0; i < num_nodes; i++) {
811
if (warned[i] == nodeid)
817
void dlm_scan_waiters(struct dlm_ls *ls)
820
ktime_t zero = ktime_set(0, 0);
823
u32 debug_scanned = 0;
824
u32 debug_expired = 0;
828
if (!dlm_config.ci_waitwarn_us)
831
mutex_lock(&ls->ls_waiters_mutex);
833
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
834
if (ktime_equal(lkb->lkb_wait_time, zero))
839
us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
841
if (us < dlm_config.ci_waitwarn_us)
844
lkb->lkb_wait_time = zero;
847
if (us > debug_maxus)
851
num_nodes = ls->ls_num_nodes;
852
warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
854
memset(warned, 0, num_nodes * sizeof(int));
858
if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
861
log_error(ls, "waitwarn %x %lld %d us check connection to "
862
"node %d", lkb->lkb_id, (long long)us,
863
dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
865
mutex_unlock(&ls->ls_waiters_mutex);
871
log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
872
debug_scanned, debug_expired,
873
dlm_config.ci_waitwarn_us, (long long)debug_maxus);
802
876
/* add/remove lkb from global waiters list of lkb's waiting for
803
877
a reply from a remote node */
805
static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
879
static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
807
881
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
843
917
lkb->lkb_wait_count++;
844
918
lkb->lkb_wait_type = mstype;
919
lkb->lkb_wait_time = ktime_get();
920
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
846
922
list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
961
1037
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
964
if (ms != &ls->ls_stub_ms)
1040
if (ms->m_flags != DLM_IFL_STUB_MS)
965
1041
mutex_lock(&ls->ls_waiters_mutex);
966
1042
error = _remove_from_waiters(lkb, ms->m_type, ms);
967
if (ms != &ls->ls_stub_ms)
1043
if (ms->m_flags != DLM_IFL_STUB_MS)
968
1044
mutex_unlock(&ls->ls_waiters_mutex);
1157
1233
list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1158
1234
lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1159
1235
mutex_unlock(&ls->ls_timeout_mutex);
1237
if (!dlm_config.ci_waitwarn_us)
1240
mutex_lock(&ls->ls_waiters_mutex);
1241
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1242
if (ktime_to_us(lkb->lkb_wait_time))
1243
lkb->lkb_wait_time = ktime_get();
1245
mutex_unlock(&ls->ls_waiters_mutex);
1162
1248
/* lkb is master or local copy */
1376
1462
ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1377
1463
compatible with other granted locks */
1379
static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1465
static void munge_demoted(struct dlm_lkb *lkb)
1381
if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1382
log_print("munge_demoted %x invalid reply type %d",
1383
lkb->lkb_id, ms->m_type);
1387
1467
if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1388
1468
log_print("munge_demoted %x invalid modes gr %d rq %d",
1389
1469
lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2844
2924
struct dlm_mhandle *mh;
2845
2925
int to_nodeid, error;
2847
error = add_to_waiters(lkb, mstype);
2851
2927
to_nodeid = r->res_nodeid;
2929
error = add_to_waiters(lkb, mstype, to_nodeid);
2853
2933
error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2880
2960
/* down conversions go without a reply from the master */
2881
2961
if (!error && down_conversion(lkb)) {
2882
2962
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2963
r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2883
2964
r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2884
2965
r->res_ls->ls_stub_ms.m_result = 0;
2885
r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2886
2966
__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2951
3031
struct dlm_mhandle *mh;
2952
3032
int to_nodeid, error;
2954
error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
3034
to_nodeid = dlm_dir_nodeid(r);
3036
error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
2958
to_nodeid = dlm_dir_nodeid(r);
2960
3040
error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3071
3151
static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3153
if (ms->m_flags == DLM_IFL_STUB_MS)
3073
3156
lkb->lkb_sbflags = ms->m_sbflags;
3074
3157
lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3075
3158
(ms->m_flags & 0x0000FFFF);
3996
4079
dlm_put_lockspace(ls);
3999
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4082
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4083
struct dlm_message *ms_stub)
4001
4085
if (middle_conversion(lkb)) {
4003
ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4004
ls->ls_stub_ms.m_result = -EINPROGRESS;
4005
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4006
ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4007
_receive_convert_reply(lkb, &ls->ls_stub_ms);
4087
memset(ms_stub, 0, sizeof(struct dlm_message));
4088
ms_stub->m_flags = DLM_IFL_STUB_MS;
4089
ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4090
ms_stub->m_result = -EINPROGRESS;
4091
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4092
_receive_convert_reply(lkb, ms_stub);
4009
4094
/* Same special case as in receive_rcom_lock_args() */
4010
4095
lkb->lkb_grmode = DLM_LOCK_IV;
4045
4130
void dlm_recover_waiters_pre(struct dlm_ls *ls)
4047
4132
struct dlm_lkb *lkb, *safe;
4133
struct dlm_message *ms_stub;
4048
4134
int wait_type, stub_unlock_result, stub_cancel_result;
4136
ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
4138
log_error(ls, "dlm_recover_waiters_pre no mem");
4050
4142
mutex_lock(&ls->ls_waiters_mutex);
4052
4144
list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4053
log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4054
lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4146
/* exclude debug messages about unlocks because there can be so
4147
many and they aren't very interesting */
4149
if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4150
log_debug(ls, "recover_waiter %x nodeid %d "
4151
"msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4152
lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4056
4155
/* all outstanding lookups, regardless of destination will be
4057
4156
resent after recovery is done */
4099
4198
case DLM_MSG_CONVERT:
4100
recover_convert_waiter(ls, lkb);
4199
recover_convert_waiter(ls, lkb, ms_stub);
4103
4202
case DLM_MSG_UNLOCK:
4105
ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4106
ls->ls_stub_ms.m_result = stub_unlock_result;
4107
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4108
ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4109
_receive_unlock_reply(lkb, &ls->ls_stub_ms);
4204
memset(ms_stub, 0, sizeof(struct dlm_message));
4205
ms_stub->m_flags = DLM_IFL_STUB_MS;
4206
ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4207
ms_stub->m_result = stub_unlock_result;
4208
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4209
_receive_unlock_reply(lkb, ms_stub);
4110
4210
dlm_put_lkb(lkb);
4113
4213
case DLM_MSG_CANCEL:
4115
ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4116
ls->ls_stub_ms.m_result = stub_cancel_result;
4117
ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4118
ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4119
_receive_cancel_reply(lkb, &ls->ls_stub_ms);
4215
memset(ms_stub, 0, sizeof(struct dlm_message));
4216
ms_stub->m_flags = DLM_IFL_STUB_MS;
4217
ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4218
ms_stub->m_result = stub_cancel_result;
4219
ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4220
_receive_cancel_reply(lkb, ms_stub);
4120
4221
dlm_put_lkb(lkb);
4191
4293
ou = is_overlap_unlock(lkb);
4194
log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4195
lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4296
log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4297
lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4197
4299
/* At this point we assume that we won't get a reply to any
4198
4300
previous op or overlap op on this lock. First, do a big