2
* Module: msdtc_enlist.cpp
5
* This module contains routines related to
6
* the enlistment in MSDTC.
11
#ifdef _HANDLE_ENLIST_IN_DTC_
15
#define _WIN32_WINNT 0x0400
16
#endif /* _WIN32_WINNT */
18
#define WIN32_LEAN_AND_MEAN
21
/*#include <Txdtc.h>*/
22
#include "connection.h"
24
/*#define _SLEEP_FOR_TEST_*/
35
#include "dlg_specific.h"
37
#include "pgapifunc.h"
41
HINSTANCE s_hModule; /* Saved module handle. */
43
/* This is where the Driver Manager attaches to this Driver */
45
DllMain(HANDLE hInst, ULONG ul_reason_for_call, LPVOID lpReserved)
47
switch (ul_reason_for_call)
49
case DLL_PROCESS_ATTACH:
50
s_hModule = (HINSTANCE) hInst; /* Save for dialog boxes
57
static class INIT_CRIT
60
CRITICAL_SECTION life_cs;
61
CRITICAL_SECTION map_cs;
63
InitializeCriticalSection(&life_cs);
64
InitializeCriticalSection(&map_cs);
67
DeleteCriticalSection(&life_cs);
68
DeleteCriticalSection(&map_cs);
71
#define LIFELOCK_ACQUIRE EnterCriticalSection(&init_crit.life_cs)
72
#define LIFELOCK_RELEASE LeaveCriticalSection(&init_crit.life_cs)
73
#define MLOCK_ACQUIRE EnterCriticalSection(&init_crit.map_cs)
74
#define MLOCK_RELEASE LeaveCriticalSection(&init_crit.map_cs)
77
static const char *XidToText(const XID &xid, char *rtext)
79
int glen = xid.gtrid_length, blen = xid.bqual_length;
82
for (i = 0, j = 0; i < glen; i++, j += 2)
83
sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]);
84
strcat(rtext, "-"); j++;
85
for (; i < glen + blen; i++, j += 2)
86
sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]);
90
static LONG g_cComponents = 0;
91
static LONG g_cServerLocks = 0;
94
// �ȉ���ITransactionResourceAsync�I�u�W�F�N�g�͔C�ӂ̃X���b�h����
95
// ���R�ɃA�N�Z�X�\�Ȃ悤�Ɏ�������B�eRequest�̌��ʂ�Ԃ����߂�
96
// �g�p����ITransactionEnlistmentAsync�C���^�[�t�F�C�X�����̂悤��
97
// ��������Ă���i�Ǝv����A���L�Q�Ɓj�̂ŌĂяo����COM�̃A�p�[
98
// �g�����g���ӎ�����(CoMarshalInterThreadInterfaceInStream/CoGetIn
99
// terfaceAndReleaseStream���g�p����j�K�v�͂Ȃ��B
100
// ����DLL���Ŏg�p����ITransactionResourceAsync��ITransactionEnlist
101
// mentAsync�̃C���^�[�t�F�C�X�|�C���^�[�͔C�ӂ̃X���b�h���璼�ڎg�p
105
// OLE Transactions Standard
107
// OLE Transactions is the Microsoft interface standard for transaction
108
// management. Applications use OLE Transactions-compliant interfaces to
109
// initiate, commit, abort, and inquire about transactions. Resource
110
// managers use OLE Transactions-compliant interfaces to enlist in
111
// transactions, to propagate transactions to other resource managers,
112
// to propagate transactions from process to process or from system to
113
// system, and to participate in the two-phase commit protocol.
115
// The Microsoft DTC system implements most OLE Transactions-compliant
116
// objects, interfaces, and methods. Resource managers that wish to use
117
// OLE Transactions must implement some OLE Transactions-compliant objects,
118
// interfaces, and methods.
120
// The OLE Transactions specification is based on COM but it differs in the
121
// following respects:
123
// OLE Transactions objects cannot be created using the COM CoCreate APIs.
124
// References to OLE Transactions objects are always direct. Therefore,
125
// no proxies or stubs are created for inter-apartment, inter-process,
126
// or inter-node calls and OLE Transactions references cannot be marshaled
127
// using standard COM marshaling.
128
// All references to OLE Transactions objects and their sinks are completely
129
// free threaded and cannot rely upon COM concurrency control models.
130
// For example, you cannot pass a reference to an IResourceManagerSink
131
// interface on a single-threaded apartment and expect the callback to occur
132
// only on the same single-threaded apartment.
134
/*#define _LOCK_DEBUG_ */
135
class IAsyncPG : public ITransactionResourceAsync
137
friend class AsyncThreads;
139
IDtcToXaHelperSinglePipe *helper;
141
ConnectionClass *conn;
142
ConnectionClass *xaconn;
144
CRITICAL_SECTION as_spin; // to make this object Both
145
CRITICAL_SECTION as_exec; // to make this object Both
149
HRESULT prepare_result;
150
bool requestAccepted;
151
HRESULT commit_result;
155
#endif /* _LOCK_DEBUG_ */
164
ITransactionEnlistmentAsync *enlist;
166
HRESULT STDMETHODCALLTYPE QueryInterface(REFIID iid, void ** ppvObject);
167
ULONG STDMETHODCALLTYPE AddRef(void);
168
ULONG STDMETHODCALLTYPE Release(void);
170
HRESULT STDMETHODCALLTYPE PrepareRequest(BOOL fRetaining,
174
HRESULT STDMETHODCALLTYPE CommitRequest(DWORD grfRM, XACTUOW * pNewUOW);
175
HRESULT STDMETHODCALLTYPE AbortRequest(BOID * pboidReason,
178
HRESULT STDMETHODCALLTYPE TMDown(void);
181
void SetHelper(IDtcToXaHelperSinglePipe *pHelper, DWORD dwRMCookie) {helper = pHelper; RMCookie = dwRMCookie;}
183
HRESULT RequestExec(DWORD type, HRESULT res);
184
HRESULT ReleaseConnection(void);
185
void SetConnection(ConnectionClass *sconn) {SLOCK_ACQUIRE(); conn = sconn; SLOCK_RELEASE();}
186
void SetXid(const XID *ixid) {SLOCK_ACQUIRE(); xid = *ixid; SLOCK_RELEASE();}
190
void SLOCK_ACQUIRE() {forcelog("SLOCK_ACQUIRE %d\n", spin_cnt); EnterCriticalSection(&as_spin); spin_cnt++;}
191
void SLOCK_RELEASE() {forcelog("SLOCK_RELEASE=%d\n", spin_cnt); LeaveCriticalSection(&as_spin); spin_cnt--;}
193
void SLOCK_ACQUIRE() {EnterCriticalSection(&as_spin);}
194
void SLOCK_RELEASE() {LeaveCriticalSection(&as_spin);}
195
#endif /* _LOCK_DEBUG_ */
196
void ELOCK_ACQUIRE() {EnterCriticalSection(&as_exec);}
197
void ELOCK_RELEASE() {LeaveCriticalSection(&as_exec);}
198
ConnectionClass *getLockedXAConn(void);
199
ConnectionClass *generateXAConn(bool spinAcquired);
200
void SetPrepareResult(HRESULT res) {SLOCK_ACQUIRE(); prepared = true; prepare_result = res; SLOCK_RELEASE();}
201
void SetDone(HRESULT);
202
void Reset_eThread(int idx) {SLOCK_ACQUIRE(); eThread[idx] = NULL; SLOCK_RELEASE();}
203
void Wait_pThread(bool slock_hold);
204
void Wait_cThread(bool slock_hold, bool once);
208
// For thread control.
216
AsyncWait(IAsyncPG *async, DWORD itype) : obj(async), type(itype), waiting_count(0) {}
217
AsyncWait(const AsyncWait &a_th) : obj(a_th.obj), type(a_th.type), waiting_count(a_th.waiting_count) {}
219
IAsyncPG *GetObj() const {return obj;}
220
DWORD GetType() const {return type;}
221
int WaitCount() const {return waiting_count;}
222
int StartWaiting() {return ++waiting_count;}
223
int StopWaiting() {return --waiting_count;}
226
// List of threads invoked from IAsyncPG objects.
230
static std::map <HANDLE, AsyncWait> th_list;
232
static void insert(HANDLE, IAsyncPG *, DWORD);
233
static void CleanupThreads(DWORD millisecond);
234
static bool WaitThread(IAsyncPG *, DWORD type, DWORD millisecond);
238
#define SYNC_AUTOCOMMIT(conn) (SQL_AUTOCOMMIT_OFF != conn->connInfo.autocommit_public ? (conn->transact_status |= CONN_IN_AUTOCOMMIT) : (conn->transact_status &= ~CONN_IN_AUTOCOMMIT))
240
IAsyncPG::IAsyncPG(void) : helper(NULL), RMCookie(0), enlist(NULL), conn(NULL), xaconn(NULL), refcnt(1), prepared(false), requestAccepted(false)
242
InterlockedIncrement(&g_cComponents);
243
InitializeCriticalSection(&as_spin);
244
InitializeCriticalSection(&as_exec);
245
eThread[0] = eThread[1] = eThread[2] = NULL;
246
memset(&xid, 0, sizeof(xid));
250
#endif /* _LOCK_DEBUG_ */
254
// invoked from *delete*.
255
// When entered ELOCK -> LIFELOCK -> SLOCK are acquired
256
// and they are released.
258
IAsyncPG::~IAsyncPG(void)
260
ConnectionClass *fconn = NULL;
270
xaconn->asdum = NULL;
276
PGAPI_FreeConnect((HDBC) fconn);
277
DeleteCriticalSection(&as_spin);
279
DeleteCriticalSection(&as_exec);
280
InterlockedDecrement(&g_cComponents);
282
HRESULT STDMETHODCALLTYPE IAsyncPG::QueryInterface(REFIID riid, void ** ppvObject)
284
forcelog("%x QueryInterface called\n", this);
285
if (riid == IID_IUnknown || riid == IID_ITransactionResourceAsync)
292
return E_NOINTERFACE;
295
// acquire/releases SLOCK.
297
ULONG STDMETHODCALLTYPE IAsyncPG::AddRef(void)
299
mylog("%x->AddRef called\n", this);
306
// acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.
308
ULONG STDMETHODCALLTYPE IAsyncPG::Release(void)
310
mylog("%x->Release called refcnt=%d\n", this, refcnt);
321
mylog("delete %x\n", this);
338
// Acquire/release [MLOCK -> ] SLOCK.
340
void IAsyncPG::Wait_pThread(bool slock_hold)
342
mylog("Wait_pThread %d in\n", slock_hold);
344
int wait_idx = PrepareExec;
348
while (NULL != eThread[wait_idx])
350
wThread = eThread[wait_idx];
352
th_found = AsyncThreads::WaitThread(this, wait_idx, 2000);
359
mylog("Wait_pThread out\n");
363
// Acquire/releases [MLOCK -> ] SLOCK.
365
void IAsyncPG::Wait_cThread(bool slock_hold, bool once)
371
mylog("Wait_cThread %d,%d in\n", slock_hold, once);
374
if (NULL != eThread[CommitExec])
375
wait_idx = CommitExec;
377
wait_idx = AbortExec;
378
while (NULL != eThread[wait_idx])
380
wThread = eThread[wait_idx];
382
th_found = AsyncThreads::WaitThread(this, wait_idx, 2000);
384
if (once || th_found)
389
mylog("Wait_cThread out\n");
392
/* Processing Prepare/Commit Request */
401
// Acquire/releases LIFELOCK -> SLOCK.
402
// may acquire/release ELOCK.
404
void IAsyncPG::SetDone(HRESULT res)
409
requestAccepted = true;
416
SYNC_AUTOCOMMIT(conn);
424
xaconn->asdum = NULL;
425
PGAPI_FreeConnect(xaconn);
438
// Acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.
440
ConnectionClass *IAsyncPG::generateXAConn(bool spinAcquired)
444
if (prepared && !xaconn)
450
if (prepared && !xaconn)
452
PGAPI_AllocConnect(conn->henv, (HDBC *) &xaconn);
453
memcpy(&xaconn->connInfo, &conn->connInfo, sizeof(ConnInfo));
455
SYNC_AUTOCOMMIT(conn);
459
CC_connect(xaconn, AUTH_REQ_OK, NULL);
475
// ELOCK is acquired.
477
// Acquire/releases SLOCK.
478
// Try to acquire CONNLOCK also.
481
// ELOCK is kept acquired.
482
// If the return connection != NULL
483
// the CONNLOCK for the connection is acquired.
485
ConnectionClass *IAsyncPG::getLockedXAConn()
488
if (!xaconn && conn && !CC_is_in_trans(conn))
490
if (TRY_ENTER_CONN_CS(conn))
492
if (CC_is_in_trans(conn))
503
generateXAConn(true);
505
ENTER_CONN_CS(xaconn);
510
// Acquire/release ELOCK [ -> MLOCK] -> SLOCK.
512
HRESULT IAsyncPG::RequestExec(DWORD type, HRESULT res)
515
bool bReleaseEnlist = false;
516
ConnectionClass *econn;
518
char pgxid[258], cmd[512];
520
mylog("%x->RequestExec type=%d\n", this, type);
521
XidToText(xid, pgxid);
522
#ifdef _SLEEP_FOR_TEST_
524
#endif /* _SLEEP_FOR_TEST_ */
529
if (XACT_S_SINGLEPHASE == res)
531
if (!CC_commit(conn))
533
bReleaseEnlist = true;
535
else if (E_FAIL != res)
537
snprintf(cmd, sizeof(cmd), "PREPARE TRANSACTION '%s'", pgxid);
538
qres = CC_send_query(conn, cmd, NULL, 0, NULL);
539
if (!QR_command_maybe_successful(qres))
543
ret = enlist->PrepareRequestDone(res, NULL, NULL);
544
SetPrepareResult(res);
550
econn = getLockedXAConn();
553
snprintf(cmd, sizeof(cmd), "COMMIT PREPARED '%s'", pgxid);
554
qres = CC_send_query(econn, cmd, NULL, 0, NULL);
555
if (!QR_command_maybe_successful(qres))
558
LEAVE_CONN_CS(econn);
562
ret = enlist->CommitRequestDone(res);
563
bReleaseEnlist = true;
569
econn = getLockedXAConn();
572
snprintf(cmd, sizeof(cmd), "ROLLBACK PREPARED '%s'", pgxid);
573
qres = CC_send_query(econn, cmd, NULL, 0, NULL);
574
if (!QR_command_maybe_successful(qres))
577
LEAVE_CONN_CS(econn);
581
ret = enlist->AbortRequestDone(res);
582
bReleaseEnlist = true;
589
helper->ReleaseRMCookie(RMCookie, TRUE);
593
mylog("%x->Done ret=%d\n", this, ret);
598
// Acquire/releses [MLOCK -> ] SLOCK
599
// or [ELOCK -> LIFELOCK -> ] SLOCK.
601
HRESULT IAsyncPG::ReleaseConnection(void)
603
mylog("%x->ReleaseConnection\n", this);
604
ConnectionClass *iconn;
611
if (NULL != eThread[CommitExec] || NULL != eThread[AbortExec] || requestAccepted)
615
Wait_cThread(true, true);
622
Wait_cThread(true, false);
624
if (conn && CONN_CONNECTED == conn->status && !done)
626
generateXAConn(true);
633
mylog("%x->ReleaseConnection exit\n", this);
638
// Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.
640
EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para);
641
HRESULT STDMETHODCALLTYPE IAsyncPG::PrepareRequest(BOOL fRetaining, DWORD grfRM,
642
BOOL fWantMoniker, BOOL fSinglePhase)
647
mylog("%x PrepareRequest called grhRM=%d enl=%x\n", this, grfRM, enlist);
649
if (0 != CC_get_errornumber(conn))
656
res = XACT_S_SINGLEPHASE;
657
mylog("XACT is singlePhase\n");
664
#ifdef _SLEEP_FOR_TEST_
666
#endif /* _SLEEP_FOR_TEST_ */
667
reqp = new RequestPara;
668
reqp->type = PrepareExec;
669
reqp->lpr = (LPVOID) this;
672
HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
680
AsyncThreads::insert(hThread, this, reqp->type);
687
// Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.
689
HRESULT STDMETHODCALLTYPE IAsyncPG::CommitRequest(DWORD grfRM, XACTUOW * pNewUOW)
691
HRESULT res = S_OK, ret = S_OK;
693
mylog("%x CommitRequest called grfRM=%d enl=%x\n", this, grfRM, enlist);
698
else if (S_OK != prepare_result)
705
#ifdef _SLEEP_FOR_TEST_
707
#endif /* _SLEEP_FOR_TEST_ */
708
reqp = new RequestPara;
709
reqp->type = CommitExec;
710
reqp->lpr = (LPVOID) this;
713
HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
722
AsyncThreads::insert(hThread, this, reqp->type);
724
mylog("CommitRequest ret=%d\n", ret);
725
requestAccepted = true;
731
// Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.
733
HRESULT STDMETHODCALLTYPE IAsyncPG::AbortRequest(BOID * pboidReason, BOOL fRetaining,
736
HRESULT res = S_OK, ret = S_OK;
739
mylog("%x AbortRequest called\n", this);
742
if (!prepared && conn)
744
reqp = new RequestPara;
745
reqp->type = AbortExec;
746
reqp->lpr = (LPVOID) this;
749
HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
758
AsyncThreads::insert(hThread, this, reqp->type);
760
mylog("AbortRequest ret=%d\n", ret);
761
requestAccepted = true;
766
HRESULT STDMETHODCALLTYPE IAsyncPG::TMDown(void)
768
forcelog("%x TMDown called\n", this);
773
// Acquire/releases MLOCK -> SLOCK.
775
std::map<HANDLE, AsyncWait> AsyncThreads::th_list;
776
void AsyncThreads::insert(HANDLE th, IAsyncPG *obj, DWORD type)
780
th_list.insert(std::pair<HANDLE, AsyncWait>(th, AsyncWait(obj, type)));
781
obj->SLOCK_ACQUIRE();
782
obj->eThread[type] = th;
783
obj->SLOCK_RELEASE();
788
// Acquire/releases MLOCK -> SLOCK.
790
bool AsyncThreads::WaitThread(IAsyncPG *obj, DWORD type, DWORD millisecond)
798
std::map<HANDLE, AsyncWait>::iterator p;
799
for (p = th_list.begin(); p != th_list.end(); p++)
801
gtype = p->second.GetType();
802
typematch = (gtype == type);
803
if (p->second.GetObj() == obj && typematch)
812
forcelog("WaitThread thread(%x, %d) not found\n", obj, type);
815
p->second.StartWaiting();
818
DWORD ret = WaitForSingleObject(th, millisecond);
820
wait_count = p->second.StopWaiting();
821
if (WAIT_OBJECT_0 == ret)
823
IAsyncPG *async = p->second.GetObj();
825
if (type >= 0 && type <= IAsyncPG::AbortExec)
826
async->Reset_eThread(type);
832
if (type >= IAsyncPG::CommitExec)
845
void AsyncThreads::CleanupThreads(DWORD millisecond)
851
if (msize = th_list.size(), msize <= 0)
857
mylog("CleanupThreads size=%d\n", msize);
858
HANDLE *hds = new HANDLE[msize];
859
std::map<HANDLE, AsyncWait>::iterator p;
860
for (p = th_list.begin(), nCount = 0; p != th_list.end(); p++)
862
hds[nCount++] = p->first;
863
p->second.StartWaiting();
869
DWORD ret = WaitForMultipleObjects(nCount, hds, 0, millisecond);
872
HANDLE th = hds[ret];
874
p = th_list.find(th);
875
if (p != th_list.end())
877
int wait_count = p->second.StopWaiting();
878
DWORD type = p->second.GetType();
879
IAsyncPG * async = p->second.GetObj();
881
if (type >= IAsyncPG::PrepareExec && type <= IAsyncPG::AbortExec)
882
async->Reset_eThread(type);
888
if (type >= IAsyncPG::CommitExec)
898
for (i = ret; i < (int) nCount - 1; i++)
902
for (i = 0; i < (int) nCount; i++)
904
p = th_list.find(hds[i]);
905
if (p != th_list.end())
906
p->second.StopWaiting();
912
EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para)
914
RequestPara *reqp = (RequestPara *) para;
915
DWORD type = reqp->type;
916
IAsyncPG *async = (IAsyncPG *) reqp->lpr;
917
HRESULT res = reqp->res, ret;
919
mylog("DtcRequestExec type=%d", reqp->type);
921
ret = async->RequestExec(type, res);
922
mylog(" Done ret=%d\n", ret);
926
CSTR regKey = "SOFTWARE\\Microsoft\\MSDTC\\XADLL";
928
RETCODE static EnlistInDtc_1pipe(ConnectionClass *conn, ITransaction *pTra, ITransactionDispenser *pDtc)
930
CSTR func = "EnlistInDtc_1pipe";
931
static IDtcToXaHelperSinglePipe *pHelper = NULL;
932
ITransactionResourceAsync *pRes = NULL;
941
res = pDtc->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void **) &pHelper);
942
if (res != S_OK || !pHelper)
944
forcelog("DtcToXaHelperSingelPipe get error %d\n", res);
949
res = (NULL != (asdum = new IAsyncPG)) ? S_OK : E_FAIL;
952
mylog("CoCreateInstance error %d\n", res);
956
mylog("dllname=%s dsn=%s\n", GetXaLibName(), conn->connInfo.dsn); res = 0;
959
ConnInfo *ci = &(conn->connInfo);
961
snprintf(dtcname, sizeof(dtcname), "DRIVER={%s};SERVER=%s;PORT=%s;DATABASE=%s;UID=%s;PWD=%s;" ABBR_SSLMODE "=%s",
962
ci->drivername, ci->server, ci->port, ci->database, ci->username, ci->password, ci->sslmode);
964
res = pHelper->XARMCreate(dtcname, (char *) GetXaLibName(), &dwRMCookie);
967
mylog("XARMCreate error code=%x\n", res);
968
if (XACT_E_XA_TX_DISABLED == res)
970
CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMcreate error:Please enable XA transaction in MSDTC security configuration", func);
979
ret = ::RegOpenKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, KEY_QUERY_VALUE | KEY_SET_VALUE, &sKey);
980
if (ERROR_SUCCESS != ret)
981
ret = ::RegCreateKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, NULL, REG_OPTION_NON_VOLATILE, KEY_ALL_ACCESS, NULL, &sKey, NULL);
982
if (ERROR_SUCCESS == ret)
984
switch (ret = ::RegQueryValueEx(sKey, "XADLL", NULL, NULL, NULL, &rSize))
990
ret = ::RegSetValueEx(sKey, GetXaLibName(), 0, REG_SZ,
991
(CONST BYTE *) GetXaLibPath(), (DWORD) strlen(GetXaLibPath()) + 1);
992
if (ERROR_SUCCESS == ret)
997
CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMCreate error:Please register HKLM\\SOFTWARE\\Microsoft\\MSDTC\\XADLL", func);
1000
::RegCloseKey(sKey);
1004
CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "MSDTC XARMCreate error", func);
1007
res = pHelper->ConvertTridToXID((DWORD *) pTra, dwRMCookie, &xid);
1010
mylog("ConvertTridToXid error %d\n", res);
1015
XidToText(xid, pgxid);
1016
mylog("ConvertTridToXID -> %s\n", pgxid);
1018
asdum->SetXid(&xid);
1019
/* Create an IAsyncPG instance by myself */
1020
/* DLLGetClassObject(GUID_IAsyncPG, IID_ITransactionResourceAsync, (void **) &asdum); */
1022
asdum->SetHelper(pHelper, dwRMCookie);
1023
res = pHelper->EnlistWithRM(dwRMCookie, pTra, asdum, &asdum->enlist);
1026
mylog("EnlistWithRM error %d\n", res);
1027
pHelper->ReleaseRMCookie(dwRMCookie, TRUE);
1031
mylog("asdum=%p start transaction\n", asdum);
1032
CC_set_autocommit(conn, FALSE);
1033
asdum->SetConnection(conn);
1034
conn->asdum = asdum;
1040
EXTERN_C RETCODE EnlistInDtc(ConnectionClass *conn, void *pTra, int method)
1042
static ITransactionDispenser *pDtc = NULL;
1046
IAsyncPG *asdum = (IAsyncPG *) conn->asdum;
1049
/* asdum->Release(); */
1052
SYNC_AUTOCOMMIT(conn);
1055
if (CC_is_in_trans(conn))
1063
res = DtcGetTransactionManager(NULL, NULL, IID_ITransactionDispenser,
1064
0, 0, NULL, (void **) &pDtc);
1065
if (res != S_OK || !pDtc)
1067
forcelog("TransactionManager get error %d\n", res);
1071
return EnlistInDtc_1pipe(conn, (ITransaction *) pTra, pDtc);
1074
EXTERN_C RETCODE DtcOnDisconnect(ConnectionClass *conn)
1076
mylog("DtcOnDisconnect\n");
1078
IAsyncPG *asdum = (IAsyncPG *) conn->asdum;
1083
asdum->ReleaseConnection();
1091
EXTERN_C RETCODE DtcOnRelease(void)
1093
AsyncThreads::CleanupThreads(2000);
1098
#endif /* _HANDLE_ENLIST_IN_DTC_ */
1
/* Module: msdtc_enlist.cpp
4
* This module contains routines related to
5
* the enlistment in MSDTC.
10
#ifdef _HANDLE_ENLIST_IN_DTC_
14
#define _WIN32_WINNT 0x0400
15
#endif /* _WIN32_WINNT */
17
#define WIN32_LEAN_AND_MEAN
20
/*#include <Txdtc.h>*/
21
#define _PGDTC_FUNCS_IMPORT_
24
/*#define _SLEEP_FOR_TEST_*/
35
#define _MYLOG_FUNCS_IMPORT_
41
#define snprintf _snprintf
45
/* Define a type for defining a constant string expression */
47
#define CSTR static const char * const
51
HINSTANCE s_hModule; /* Saved module handle. */
53
/* This is where the Driver Manager attaches to this Driver */
55
DllMain(HANDLE hInst, ULONG ul_reason_for_call, LPVOID lpReserved)
57
switch (ul_reason_for_call)
59
case DLL_PROCESS_ATTACH:
60
s_hModule = (HINSTANCE) hInst; /* Save for dialog boxes
68
* A comment About locks used in this module
70
* the locks should be acquired with stronger to weaker order.
72
* 1:ELOCK -- the strongest per IAsyncPG object lock
73
* When the *isolated* or *dtcconn* member of an IAsyncPG object
74
* is changed, this lock should be held.
75
* While an IAsyncPG object accesses a psqlodbc connection,
76
* this lock should be held.
78
* 2:[CONN_CS] -- per psqlodbc connection lock
79
* This lock would be held for a pretty long time while accessing
80
* the psqlodbc connection assigned to an IAsyncPG object. You
81
* can use the connecion safely by holding a ELOCK for the
82
* IAsyncPG object because the assignment is ensured to be
83
* fixed while the ELOCK is held.
85
* 3:LIFELOCK -- a global lock to ensure the lives of IAsyncPG objects
86
* While this lock is held, IAsyncPG objects would never die.
88
* 4:SLOCK -- the short term per IAsyncPG object lock
89
* When any member of an IAsyncPG object is changed, this lock
93
// #define _LOCK_DEBUG_
94
static class INIT_CRIT
97
CRITICAL_SECTION life_cs; /* for asdum member of ConnectionClass */
99
InitializeCriticalSection(&life_cs);
102
DeleteCriticalSection(&life_cs);
106
#define LIFELOCK_ACQUIRE (forcelog("LIFELOCK_ACQUIRE\n"), EnterCriticalSection(&init_crit.life_cs), forcelog("LIFELOCK ACQUIRED\n"))
107
#define LIFELOCK_RELEASE (forcelog("LIFELOCK_RELEASE\n"), LeaveCriticalSection(&init_crit.life_cs))
109
#define LIFELOCK_ACQUIRE EnterCriticalSection(&init_crit.life_cs)
110
#define LIFELOCK_RELEASE LeaveCriticalSection(&init_crit.life_cs)
114
* Some helper macros about connection handling.
116
#define CONN_CS_ACQUIRE(conn) PgDtc_lock_cntrl((conn), TRUE, FALSE)
117
#define TRY_CONN_CS_ACQUIRE(conn) PgDtc_lock_cntrl((conn), TRUE, TRUE)
118
#define CONN_CS_RELEASE(conn) PgDtc_lock_cntrl((conn), FALSE, FALSE)
120
#define CONN_IS_IN_TRANS(conn) PgDtc_get_property((conn), inTrans)
123
static const char *XidToText(const XID &xid, char *rtext)
125
int glen = xid.gtrid_length, blen = xid.bqual_length;
128
for (i = 0, j = 0; i < glen; i++, j += 2)
129
sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]);
130
strcat(rtext, "-"); j++;
131
for (; i < glen + blen; i++, j += 2)
132
sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]);
136
static LONG g_cComponents = 0;
137
static LONG g_cServerLocks = 0;
140
// �ȉ���ITransactionResourceAsync�I�u�W�F�N�g�͔C�ӂ̃X���b�h����
141
// ���R�ɃA�N�Z�X�\�Ȃ悤�Ɏ�������B�eRequest�̌��ʂ�Ԃ����߂�
142
// �g�p����ITransactionEnlistmentAsync�C���^�[�t�F�C�X�����̂悤��
143
// ��������Ă���i�Ǝv����A���L�Q�Ɓj�̂ŌĂяo����COM�̃A�p�[
144
// �g�����g���ӎ�����(CoMarshalInterThreadInterfaceInStream/CoGetIn
145
// terfaceAndReleaseStream���g�p����j�K�v�͂Ȃ��B
146
// ����DLL���Ŏg�p����ITransactionResourceAsync��ITransactionEnlist
147
// mentAsync�̃C���^�[�t�F�C�X�|�C���^�[�͔C�ӂ̃X���b�h���璼�ڎg�p
151
// OLE Transactions Standard
153
// OLE Transactions is the Microsoft interface standard for transaction
154
// management. Applications use OLE Transactions-compliant interfaces to
155
// initiate, commit, abort, and inquire about transactions. Resource
156
// managers use OLE Transactions-compliant interfaces to enlist in
157
// transactions, to propagate transactions to other resource managers,
158
// to propagate transactions from process to process or from system to
159
// system, and to participate in the two-phase commit protocol.
161
// The Microsoft DTC system implements most OLE Transactions-compliant
162
// objects, interfaces, and methods. Resource managers that wish to use
163
// OLE Transactions must implement some OLE Transactions-compliant objects,
164
// interfaces, and methods.
166
// The OLE Transactions specification is based on COM but it differs in the
167
// following respects:
169
// OLE Transactions objects cannot be created using the COM CoCreate APIs.
170
// References to OLE Transactions objects are always direct. Therefore,
171
// no proxies or stubs are created for inter-apartment, inter-process,
172
// or inter-node calls and OLE Transactions references cannot be marshaled
173
// using standard COM marshaling.
174
// All references to OLE Transactions objects and their sinks are completely
175
// free threaded and cannot rely upon COM concurrency control models.
176
// For example, you cannot pass a reference to an IResourceManagerSink
177
// interface on a single-threaded apartment and expect the callback to occur
178
// only on the same single-threaded apartment.
180
class IAsyncPG : public ITransactionResourceAsync
183
IDtcToXaHelperSinglePipe *helper;
187
CRITICAL_SECTION as_spin; // to make this object Both
188
CRITICAL_SECTION as_exec; // to make this object Both
196
bool requestAccepted;
197
HRESULT prepare_result;
198
HRESULT commit_result;
202
#endif /* _LOCK_DEBUG_ */
211
ITransactionEnlistmentAsync *enlist;
213
HRESULT STDMETHODCALLTYPE QueryInterface(REFIID iid, void ** ppvObject);
214
ULONG STDMETHODCALLTYPE AddRef(void);
215
ULONG STDMETHODCALLTYPE Release(void);
217
HRESULT STDMETHODCALLTYPE PrepareRequest(BOOL fRetaining,
221
HRESULT STDMETHODCALLTYPE CommitRequest(DWORD grfRM, XACTUOW * pNewUOW);
222
HRESULT STDMETHODCALLTYPE AbortRequest(BOID * pboidReason,
225
HRESULT STDMETHODCALLTYPE TMDown(void);
228
void SetHelper(IDtcToXaHelperSinglePipe *pHelper, DWORD dwRMCookie) {helper = pHelper; RMCookie = dwRMCookie;}
230
HRESULT RequestExec(DWORD type, HRESULT res);
231
HRESULT ReleaseConnection(void);
232
void SetConnection(void *sconn) {SLOCK_ACQUIRE(); dtcconn = sconn; SLOCK_RELEASE();}
233
void SetXid(const XID *ixid) {SLOCK_ACQUIRE(); xid = *ixid; SLOCK_RELEASE();}
234
void *separateXAConn(bool spinAcquired, bool continueConnection);
235
bool CloseThread(DWORD type);
239
void SLOCK_ACQUIRE() {forcelog("SLOCK_ACQUIRE %d\n", spin_cnt); EnterCriticalSection(&as_spin); spin_cnt++;}
240
void SLOCK_RELEASE() {forcelog("SLOCK_RELEASE=%d\n", spin_cnt); LeaveCriticalSection(&as_spin); spin_cnt--;}
241
void ELOCK_ACQUIRE() {forcelog("%p->ELOCK_ACQUIRE\n", this); EnterCriticalSection(&as_exec); forcelog("ELOCK ACQUIRED\n");}
242
void ELOCK_RELEASE() {forcelog("ELOCK_RELEASE\n"); LeaveCriticalSection(&as_exec);}
244
void SLOCK_ACQUIRE() {EnterCriticalSection(&as_spin);}
245
void SLOCK_RELEASE() {LeaveCriticalSection(&as_spin);}
246
void ELOCK_ACQUIRE() {EnterCriticalSection(&as_exec);}
247
void ELOCK_RELEASE() {LeaveCriticalSection(&as_exec);}
248
#endif /* _LOCK_DEBUG_ */
249
void *getLockedXAConn(void);
250
void *generateXAConn(bool spinAcquired);
251
void *isolateXAConn(bool spinAcquired, bool continueConnection);
252
void SetPrepareResult(HRESULT res) {SLOCK_ACQUIRE(); prepared = true; prepare_result = res; SLOCK_RELEASE();}
253
void SetDone(HRESULT);
254
void Wait_pThread(bool slock_hold);
255
void Wait_cThread(bool slock_hold, bool once);
259
IAsyncPG::IAsyncPG(void) : helper(NULL), RMCookie(0), enlist(NULL), dtcconn(NULL), refcnt(1), isolated(false), done(false), abort(false), prepared(false), requestAccepted(false)
261
InterlockedIncrement(&g_cComponents);
262
InitializeCriticalSection(&as_spin);
263
InitializeCriticalSection(&as_exec);
264
eThread[0] = eThread[1] = eThread[2] = NULL;
265
eFin[0] = eFin[1] = eFin[2] = false;
266
memset(&xid, 0, sizeof(xid));
270
#endif /* _LOCK_DEBUG_ */
274
// invoked from *delete*.
275
// When entered ELOCK -> LIFELOCK -> SLOCK are held
276
// and they are released.
278
IAsyncPG::~IAsyncPG(void)
286
PgDtc_set_async(dtcconn, NULL);
293
mylog("IAsyncPG Destructor is freeing the connection\n");
294
PgDtc_free_connect(fconn);
296
DeleteCriticalSection(&as_spin);
298
DeleteCriticalSection(&as_exec);
299
InterlockedDecrement(&g_cComponents);
301
HRESULT STDMETHODCALLTYPE IAsyncPG::QueryInterface(REFIID riid, void ** ppvObject)
303
forcelog("%p QueryInterface called\n", this);
304
if (riid == IID_IUnknown || riid == IID_ITransactionResourceAsync)
311
return E_NOINTERFACE;
314
// acquire/releases SLOCK.
316
ULONG STDMETHODCALLTYPE IAsyncPG::AddRef(void)
318
mylog("%p->AddRef called\n", this);
325
// acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.
327
ULONG STDMETHODCALLTYPE IAsyncPG::Release(void)
329
mylog("%p->Release called refcnt=%d\n", this, refcnt);
340
mylog("delete %p\n", this);
356
// Acquire/release SLOCK.
358
void IAsyncPG::Wait_pThread(bool slock_hold)
360
mylog("Wait_pThread %d in\n", slock_hold);
362
int wait_idx = PrepareExec;
367
while (NULL != (wThread = eThread[wait_idx]) && !eFin[wait_idx])
370
ret = WaitForSingleObject(wThread, 2000);
372
if (WAIT_TIMEOUT != ret)
373
eFin[wait_idx] = true;
377
mylog("Wait_pThread out\n");
381
// Acquire/releases SLOCK.
383
void IAsyncPG::Wait_cThread(bool slock_hold, bool once)
389
mylog("Wait_cThread %d,%d in\n", slock_hold, once);
392
if (NULL != eThread[CommitExec])
393
wait_idx = CommitExec;
395
wait_idx = AbortExec;
396
while (NULL != (wThread = eThread[wait_idx]) && !eFin[wait_idx])
399
ret = WaitForSingleObject(wThread, 2000);
401
if (WAIT_TIMEOUT != ret)
402
eFin[wait_idx] = true;
408
mylog("Wait_cThread out\n");
411
/* Processing Prepare/Commit Request */
420
// Acquire/releases LIFELOCK -> SLOCK.
421
// may acquire/release ELOCK.
423
void IAsyncPG::SetDone(HRESULT res)
431
requestAccepted = true;
435
PgDtc_set_async(dtcconn, NULL);
443
mylog("Freeing isolated connection=%p\n", dtcconn);
444
PgDtc_free_connect(dtcconn);
464
// Acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.
466
void *IAsyncPG::generateXAConn(bool spinAcquired)
468
mylog("generateXAConn isolated=%d dtcconn=%p\n", isolated, dtcconn);
471
if (isolated || done)
480
if (dtcconn && !isolated && !done && prepared)
482
void *sconn = dtcconn;
484
dtcconn = PgDtc_isolate(sconn, useAnotherRoom);
488
// PgDtc_connect(dtcconn); may be called in getLockedXAConn
500
// Acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.
502
void *IAsyncPG::isolateXAConn(bool spinAcquired, bool continueConnection)
506
mylog("isolateXAConn isolated=%d dtcconn=%p\n", isolated, dtcconn);
509
if (isolated || done || NULL == dtcconn)
518
if (isolated || done || NULL == dtcconn)
527
dtcconn = PgDtc_isolate(sconn, continueConnection ? 0 : disposingConnection);
532
if (continueConnection)
534
PgDtc_connect(sconn);
541
// Acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.
543
void *IAsyncPG::separateXAConn(bool spinAcquired, bool continueConnection)
545
mylog("%s isolated=%d dtcconn=%p\n", __FUNCTION__, isolated, dtcconn);
549
return generateXAConn(true);
551
return isolateXAConn(true, continueConnection);
558
// Acquire/releases SLOCK.
559
// Try to acquire CONN_CS also.
562
// ELOCK is kept held.
563
// If the return connection != NULL
564
// the CONN_CS lock for the connection is held.
566
void *IAsyncPG::getLockedXAConn()
569
while (!done && !isolated && NULL != dtcconn)
572
* Note that COMMIT/ROLLBACK PREPARED command should be
573
* issued outside the transaction.
575
if (!prepared || !CONN_IS_IN_TRANS(dtcconn))
577
if (TRY_CONN_CS_ACQUIRE(dtcconn))
579
if (prepared && CONN_IS_IN_TRANS(dtcconn))
581
CONN_CS_RELEASE(dtcconn);
587
separateXAConn(true, true);
588
SLOCK_ACQUIRE(); // SLOCK was released by separateXAConn()
591
if (isolated && NULL != dtcconn)
593
CONN_CS_ACQUIRE(dtcconn);
594
if (!PgDtc_get_property(dtcconn, connected))
595
PgDtc_connect(dtcconn);
601
// Acquire/release ELOCK -> SLOCK.
603
HRESULT IAsyncPG::RequestExec(DWORD type, HRESULT res)
606
bool bReleaseEnlist = false;
610
mylog("%p->RequestExec type=%d conn=%p\n", this, type, dtcconn);
611
XidToText(xid, pgxid);
612
#ifdef _SLEEP_FOR_TEST_
614
#endif /* _SLEEP_FOR_TEST_ */
619
if (done || NULL == dtcconn)
624
if (econn = getLockedXAConn(), NULL != econn)
626
PgDtc_set_property(econn, inprogress, (void *) 1);
628
PgDtc_one_phase_operation(econn, ABORT_GLOBAL_TRANSACTION);
629
else if (XACT_S_SINGLEPHASE == res)
631
if (!PgDtc_one_phase_operation(econn, ONE_PHASE_COMMIT))
636
if (!PgDtc_two_phase_operation(econn, PREPARE_TRANSACTION, pgxid))
639
PgDtc_set_property(econn, inprogress, (void *) 0);
640
CONN_CS_RELEASE(econn);
645
bReleaseEnlist = true;
647
PgDtc_set_property(dtcconn, prepareRequested, (void *) 0);
648
ret = enlist->PrepareRequestDone(res, NULL, NULL);
649
SetPrepareResult(res);
655
econn = getLockedXAConn();
658
PgDtc_set_property(econn, inprogress, (void *) 1);
659
if (!PgDtc_two_phase_operation(econn, COMMIT_PREPARED, pgxid))
661
PgDtc_set_property(econn, inprogress, (void *) 0);
662
CONN_CS_RELEASE(econn);
666
ret = enlist->CommitRequestDone(res);
667
bReleaseEnlist = true;
671
if (prepared && !done)
673
econn = getLockedXAConn();
676
PgDtc_set_property(econn, inprogress, (void *) 1);
677
if (!PgDtc_two_phase_operation(econn, ROLLBACK_PREPARED, pgxid))
679
PgDtc_set_property(econn, inprogress, (void *) 0);
680
CONN_CS_RELEASE(econn);
684
ret = enlist->AbortRequestDone(res);
685
bReleaseEnlist = true;
692
helper->ReleaseRMCookie(RMCookie, TRUE);
696
mylog("%p->Done ret=%d\n", this, ret);
701
// Acquire/releses SLOCK
702
// or [ELOCK -> LIFELOCK -> ] SLOCK.
704
HRESULT IAsyncPG::ReleaseConnection(void)
706
mylog("%p->ReleaseConnection\n", this);
709
if (isolated || NULL == dtcconn)
715
if (NULL != eThread[CommitExec] || NULL != eThread[AbortExec] || requestAccepted)
718
Wait_cThread(true, true);
720
if (!isolated && !done && dtcconn && PgDtc_get_property(dtcconn, connected))
722
isolateXAConn(true, false);
726
mylog("%p->ReleaseConnection exit\n", this);
730
EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para);
731
EXTERN_C static void __cdecl ClosePrepareThread(LPVOID para);
732
EXTERN_C static void __cdecl CloseCommitThread(LPVOID para);
733
EXTERN_C static void __cdecl CloseAbortThread(LPVOID para);
736
// Acquire/release [ELOCK -> ] SLOCK.
738
HRESULT STDMETHODCALLTYPE IAsyncPG::PrepareRequest(BOOL fRetaining, DWORD grfRM,
739
BOOL fWantMoniker, BOOL fSinglePhase)
743
const DWORD reqtype = PrepareExec;
745
mylog("%p PrepareRequest called grhRM=%d enl=%p\n", this, grfRM, enlist);
747
if (dtcconn && 0 != PgDtc_get_property(dtcconn, errorNumber))
754
res = XACT_S_SINGLEPHASE;
755
mylog("XACT is singlePhase\n");
762
#ifdef _SLEEP_FOR_TEST_
764
#endif /* _SLEEP_FOR_TEST_ */
765
reqp = new RequestPara;
766
reqp->type = reqtype;
767
reqp->lpr = (LPVOID) this;
769
#define DONT_CALL_RETURN_FROM_HERE ???
771
HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
780
eThread[reqtype] = hThread;
783
* We call here _beginthread not _beginthreadex
784
* so as not to call CloseHandle() to clean up
787
_beginthread(ClosePrepareThread, 0, (void *) this);
795
// Acquire/release [ELOCK -> ] SLOCK.
797
HRESULT STDMETHODCALLTYPE IAsyncPG::CommitRequest(DWORD grfRM, XACTUOW * pNewUOW)
799
HRESULT res = S_OK, ret = S_OK;
801
const DWORD reqtype = CommitExec;
803
mylog("%p CommitRequest called grfRM=%d enl=%p\n", this, grfRM, enlist);
806
if (!prepared || done)
808
else if (S_OK != prepare_result)
813
#define DONT_CALL_RETURN_FROM_HERE ???
816
#ifdef _SLEEP_FOR_TEST_
818
#endif /* _SLEEP_FOR_TEST_ */
819
reqp = new RequestPara;
820
reqp->type = reqtype;
821
reqp->lpr = (LPVOID) this;
824
HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
834
eThread[reqtype] = hThread;
837
* We call here _beginthread not _beginthreadex
838
* so as not to call CloseHandle() to clean up
841
_beginthread(CloseCommitThread, 0, (void *) this);
843
mylog("CommitRequest ret=%d\n", ret);
844
requestAccepted = true;
851
// Acquire/release [ELOCK -> ] SLOCK.
853
HRESULT STDMETHODCALLTYPE IAsyncPG::AbortRequest(BOID * pboidReason, BOOL fRetaining,
856
HRESULT res = S_OK, ret = S_OK;
858
const DWORD reqtype = AbortExec;
860
mylog("%p AbortRequest called\n", this);
864
else if (prepared && S_OK != prepare_result)
869
#define return DONT_CALL_RETURN_FROM_HERE ???
872
if (!prepared && dtcconn)
874
PgDtc_set_property(dtcconn, inprogress, (void *) 1);
875
PgDtc_one_phase_operation(dtcconn, ONE_PHASE_ROLLBACK);
876
PgDtc_set_property(dtcconn, inprogress, (void *) 0);
878
reqp = new RequestPara;
879
reqp->type = reqtype;
880
reqp->lpr = (LPVOID) this;
883
HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);
893
eThread[reqtype] = hThread;
896
* We call here _beginthread not _beginthreadex
897
* so as not to call CloseHandle() to clean up
900
_beginthread(CloseAbortThread, 0, (void *) this);
902
mylog("AbortRequest ret=%d\n", ret);
903
requestAccepted = true;
909
HRESULT STDMETHODCALLTYPE IAsyncPG::TMDown(void)
911
forcelog("%p TMDown called\n", this);
915
bool IAsyncPG::CloseThread(DWORD type)
917
CSTR func = "CloseThread";
919
DWORD ret, excode = S_OK;
920
bool rls_async = false;
922
mylog("%s for %p thread=%d\n", func, this, eThread[type]);
923
if (th = eThread[type], NULL == th || eFin[type])
925
ret = WaitForSingleObject(th, INFINITE);
926
if (WAIT_OBJECT_0 == ret)
930
case IAsyncPG::AbortExec:
931
case IAsyncPG::CommitExec:
935
GetExitCodeThread(th, &excode);
940
eThread[type] = NULL;
945
mylog("%s ret=%d\n", func, ret);
949
EXTERN_C static void __cdecl ClosePrepareThread(LPVOID para)
951
CSTR func = "ClosePrepareThread";
952
IAsyncPG *async = (IAsyncPG *) para;
955
mylog("%s for %p", func, async);
956
if (release = async->CloseThread(IAsyncPG::PrepareExec), release)
958
mylog("%s release=%d\n", func, release);
961
EXTERN_C static void __cdecl CloseCommitThread(LPVOID para)
963
CSTR func = "CloseCommitThread";
964
IAsyncPG *async = (IAsyncPG *) para;
967
mylog("%s for %p", func, async);
968
if (release = async->CloseThread(IAsyncPG::CommitExec), release)
970
mylog("%s release=%d\n", func, release);
973
EXTERN_C static void __cdecl CloseAbortThread(LPVOID para)
975
CSTR func = "CloseAbortThread";
976
IAsyncPG *async = (IAsyncPG *) para;
979
mylog("%s for %p", func, async);
980
if (release = async->CloseThread(IAsyncPG::AbortExec), release)
982
mylog("%s release=%d\n", func, release);
985
EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para)
987
RequestPara *reqp = (RequestPara *) para;
988
DWORD type = reqp->type;
989
IAsyncPG *async = (IAsyncPG *) reqp->lpr;
990
HRESULT res = reqp->res, ret;
992
mylog("DtcRequestExec type=%d", reqp->type);
994
ret = async->RequestExec(type, res);
995
mylog(" Done ret=%d\n", ret);
999
CSTR regKey = "SOFTWARE\\Microsoft\\MSDTC\\XADLL";
1001
RETCODE static EnlistInDtc_1pipe(void *conn, ITransaction *pTra, ITransactionDispenser *pDtc)
1003
CSTR func = "EnlistInDtc_1pipe";
1004
static IDtcToXaHelperSinglePipe *pHelper = NULL;
1005
ITransactionResourceAsync *pRes = NULL;
1014
res = pDtc->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void **) &pHelper);
1015
if (res != S_OK || !pHelper)
1017
forcelog("DtcToXaHelperSingelPipe get error %d\n", res);
1022
res = (NULL != (asdum = new IAsyncPG)) ? S_OK : E_FAIL;
1025
mylog("CoCreateInstance error %d\n", res);
1029
/*mylog("dllname=%s dsn=%s\n", GetXaLibName(), conn->connInfo.dsn); res = 0;*/
1033
PgDtc_create_connect_string(conn, dtcname, sizeof(dtcname));
1035
res = pHelper->XARMCreate(dtcname, (char *) GetXaLibName(), &dwRMCookie);
1038
mylog("XARMCreate error code=%x\n", res);
1039
if (XACT_E_XA_TX_DISABLED == res)
1041
PgDtc_set_error(conn, "XARMcreate error:Please enable XA transaction in MSDTC security configuration", func);
1050
ret = ::RegOpenKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, KEY_QUERY_VALUE | KEY_SET_VALUE, &sKey);
1051
if (ERROR_SUCCESS != ret)
1052
ret = ::RegCreateKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, NULL, REG_OPTION_NON_VOLATILE, KEY_ALL_ACCESS, NULL, &sKey, NULL);
1053
if (ERROR_SUCCESS == ret)
1055
switch (ret = ::RegQueryValueEx(sKey, "XADLL", NULL, NULL, NULL, &rSize))
1061
ret = ::RegSetValueEx(sKey, GetXaLibName(), 0, REG_SZ,
1062
(CONST BYTE *) GetXaLibPath(), (DWORD) strlen(GetXaLibPath()) + 1);
1063
if (ERROR_SUCCESS == ret)
1068
PgDtc_set_error(conn, "XARMCreate error:Please register HKLM\\SOFTWARE\\Microsoft\\MSDTC\\XADLL", func);
1071
::RegCloseKey(sKey);
1075
PgDtc_set_error(conn, "MSDTC XARMCreate error", func);
1078
res = pHelper->ConvertTridToXID((DWORD *) pTra, dwRMCookie, &xid);
1081
mylog("ConvertTridToXid error %d\n", res);
1086
XidToText(xid, pgxid);
1087
mylog("ConvertTridToXID -> %s\n", pgxid);
1089
asdum->SetXid(&xid);
1090
/* Create an IAsyncPG instance by myself */
1091
/* DLLGetClassObject(GUID_IAsyncPG, IID_ITransactionResourceAsync, (void **) &asdum); */
1093
asdum->SetHelper(pHelper, dwRMCookie);
1094
res = pHelper->EnlistWithRM(dwRMCookie, pTra, asdum, &asdum->enlist);
1097
mylog("EnlistWithRM error %d\n", res);
1098
pHelper->ReleaseRMCookie(dwRMCookie, TRUE);
1102
mylog("asdum=%p start transaction\n", asdum);
1103
asdum->SetConnection(conn);
1105
PgDtc_set_async(conn, asdum);
1113
IsolateDtcConn(void *conn, BOOL continueConnection)
1118
if (async = (IAsyncPG *) PgDtc_get_async(conn), NULL != async)
1120
if (PgDtc_get_property(conn, idleInGlobalTransaction))
1124
async->separateXAConn(false, continueConnection ? true : false);
1136
EXTERN_C RETCODE EnlistInDtc(void *conn, void *pTra, int method)
1138
static ITransactionDispenser *pDtc = NULL;
1143
IAsyncPG *asdum = (IAsyncPG *) PgDtc_get_async(conn);
1144
PgDtc_set_property(conn, enlisted, (void *) 0);
1147
if (CONN_IS_IN_TRANS(conn))
1149
PgDtc_one_phase_operation(conn, SHUTDOWN_LOCAL_TRANSACTION);
1155
res = DtcGetTransactionManager(NULL, NULL, IID_ITransactionDispenser,
1156
0, 0, NULL, (void **) &pDtc);
1157
if (res != S_OK || !pDtc)
1159
forcelog("TransactionManager get error %d\n", res);
1163
ret = EnlistInDtc_1pipe(conn, (ITransaction *) pTra, pDtc);
1164
if (SQL_SUCCEEDED(ret))
1165
PgDtc_set_property(conn, enlisted, (void *) 1);
1169
EXTERN_C RETCODE DtcOnDisconnect(void *conn)
1171
mylog("DtcOnDisconnect\n");
1173
IAsyncPG *asdum = (IAsyncPG *) PgDtc_get_async(conn);
1178
asdum->ReleaseConnection();
1186
#endif /* _HANDLE_ENLIST_IN_DTC_ */