Postgres-xl GTM(全局事务管理器 Globale Transaction Manager)快照管理
src/gtm/main/gtm_snap.c
快照请求通信处理函数
ProcessGetSnapshotCommand --> MSG_SNAPSHOT_GET
ProcessGetSnapshotCommand和ProcessGetSnapshotCommandMulti是处理客户端或GTM Proxy请求快照通信的函数。用于解析网络消息,然后调用GTM_GetTransactionSnapshot构建快照。
void ProcessGetSnapshotCommand(Port *myport, StringInfo message, bool get_gxid) {
GTM_Snapshot snapshot;
MemoryContext oldContext;
int status;
int txn_count;
int sn_xcnt;
先从message消息体中获取事务数量,确保为1个事务,再从消息中获取该事务的gxid。通过GTM_GXIDToHandle(GXID -> handle) 函数将gxid转换为事务句柄。可参考上一篇博客。
const char *data = NULL;
GTM_TransactionHandle txn;
GlobalTransactionId gxid;
txn_count = pq_getmsgint(message, sizeof (int));
Assert(txn_count == 1);
data = pq_getmsgbytes(message, sizeof (gxid));
if (data == NULL)
ereport(ERROR,(EPROTO,errmsg("Message does not contain valid GXID")));
memcpy(&gxid, data, sizeof(gxid));
elog(INFO, "Received transaction ID %d for snapshot obtention", gxid);
txn = GTM_GXIDToHandle(gxid);
pq_getmsgend(message);
如果ProcessGetSnapshotCommand指定了需要获取gxid,则不再需要消息中传输过来的gxid,通过事务句柄,GTM_GetGlobalTransactionId函数返回目前事务管理器该事务使用的gxid。
if (get_gxid) {
gxid = GTM_GetGlobalTransactionId(txn);
if (gxid == InvalidGlobalTransactionId)
ereport(ERROR, (EINVAL, errmsg("Failed to get a new transaction id")));
}
切换到TopMostMemoryContext内存上下文,通过调用GTM_GetTransactionSnapshot函数获取最新快照。
oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
if ((snapshot = GTM_GetTransactionSnapshot(&txn, 1, &status)) == NULL)
ereport(ERROR,(EINVAL,errmsg("Failed to get a snapshot")));
MemoryContextSwitchTo(oldContext);
向客户端回送消息,包含的消息内容如下所示。
StringInfoData buf;
pq_beginmessage(&buf, 'S');
pq_sendint(&buf, get_gxid ? SNAPSHOT_GXID_GET_RESULT : SNAPSHOT_GET_RESULT, 4);
if (myport->remote_type == GTM_NODE_GTM_PROXY){
GTM_ProxyMsgHeader proxyhdr;
proxyhdr.ph_conid = myport->conn_id;
pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
}
pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId));
pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
pq_sendbytes(&buf, (char *)&status, sizeof(int) * txn_count);
pq_sendbytes(&buf, (char *)&snapshot->sn_snapid, sizeof (uint64));
pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId));
pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId));
/* Read once */
sn_xcnt = snapshot->sn_xcnt;
pq_sendint(&buf, sn_xcnt, sizeof (int));
pq_sendbytes(&buf, (char *)snapshot->sn_xip, sizeof(GlobalTransactionId) * sn_xcnt);
pq_endmessage(myport, &buf);
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
return;
}
ProcessGetSnapshotCommandMulti --> MSG_SNAPSHOT_GET_MULTI
和上一函数相比,事务句柄和全局事务xid局部变量都换成了数组,对请求的处理也是基于请求数据包中的事务数来进行循环处理。
void ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message) {
StringInfoData buf;
GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS];
GTM_Snapshot snapshot;
MemoryContext oldContext;
int txn_count;
int ii;
int status[GTM_MAX_GLOBAL_TRANSACTIONS];
int sn_xcnt;
txn_count = pq_getmsgint(message, sizeof (int));
for (ii = 0; ii < txn_count; ii++){
const char *data = pq_getmsgbytes(message, sizeof (gxid[ii]));
if (data == NULL) ereport(ERROR,(EPROTO,errmsg("Message does not contain valid GXID")));
memcpy(&gxid[ii], data, sizeof (gxid[ii]));
txn[ii] = GTM_GXIDToHandle(gxid[ii]);
}
pq_getmsgend(message);
切换到TopMostMemoryContext内存上下文,通过调用GTM_GetTransactionSnapshot函数获取最新快照。
oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
if ((snapshot = GTM_GetTransactionSnapshot(txn, txn_count, status)) == NULL)
ereport(ERROR,(EINVAL,errmsg("Failed to get a snapshot")));
MemoryContextSwitchTo(oldContext);
回送消息填充
pq_beginmessage(&buf, 'S');
pq_sendint(&buf, SNAPSHOT_GET_MULTI_RESULT, 4);
if (myport->remote_type == GTM_NODE_GTM_PROXY){
GTM_ProxyMsgHeader proxyhdr;
proxyhdr.ph_conid = myport->conn_id;
pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
}
pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count);
pq_sendbytes(&buf, (char *)&snapshot->sn_snapid, sizeof (uint64));
pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId));
pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId));
/* Read once */
sn_xcnt = snapshot->sn_xcnt;
pq_sendint(&buf, sn_xcnt, sizeof (int));
pq_sendbytes(&buf, (char *)snapshot->sn_xip,sizeof(GlobalTransactionId) * sn_xcnt);
pq_endmessage(myport, &buf);
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
return;
}
GTM_GetTransactionSnapshot函数构建快照并将其存储在GTMTransactions array中以跟踪所有正在处理的事务。如果GTM Proxy请求多个快照,快照管理器只计算一次,将该快照用于请求中的所有事务。
static GTM_Snapshot GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *status) {
GlobalTransactionId xmin;
GlobalTransactionId xmax;
GlobalTransactionId globalxmin;
int count = 0;
gtm_ListCell *elem = NULL;
int ii;
/* Instead of allocating memory for a snapshot, we use the snapshot of the
* first transaction in the given array. The same snapshot will later be
* copied to other transaction info structures. */
GTM_TransactionInfo *mygtm_txninfo = NULL;
GTM_Snapshot snapshot = NULL;
memset(status, 0, sizeof (int) * txn_count);
for (ii = 0; ii < txn_count; ii++){
/* Even if the request does not contain a valid GXID, we still send down a snapshot, but mark the status field acoordingly */
if (handle[ii] != InvalidTransactionHandle)
mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]);
else
status[ii] = STATUS_NOT_FOUND;
/* If the transaction does not exist, just mark the status field with
* a STATUS_ERROR code
* FIXME This comment seems to be misplaced/stale - we're not checking
* if a transaction exists (we've already done that above and set the
* status to STATUS_NOT_FOUND). */
if ((mygtm_txninfo != NULL) && (snapshot == NULL))
snapshot = &mygtm_txninfo->gti_current_snapshot;
}
/* If no valid transaction exists in the array, we record the snapshot in a
* thread-specific structure. This allows us to avoid repeated
* allocation/freeing of the structure.
* Note that we must use a thread-specific variable and not a global
* variable because a concurrent thread might compute a new snapshot and
* overwrite the snapshot information while we are still sending this copy
* to the client. Using a thread-specific storage avoids that problem. */
if (snapshot == NULL)
snapshot = &GetMyThreadInfo->thr_snapshot;
Assert(snapshot != NULL);
/* This can only happen when using a snapshot from GTMTransactions, as the
* thread-specific sn_xip array is allocated statically as part of GTM_ThreadInfo. */
if (snapshot->sn_xip == NULL){
/* First call for this snapshot */
snapshot->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId));
if (snapshot->sn_xip == NULL)
ereport(ERROR,(ENOMEM,errmsg("out of memory")));
}
/* It is sufficient to get shared lock on ProcArrayLock, even if we are going to set MyProc->xmin. */
GTM_RWLockAcquire(>MTransactions.gt_TransArrayLock, GTM_LOCKMODE_READ);
/* xmax is always latestCompletedXid + 1 */
xmax = GTMTransactions.gt_latestCompletedXid;
Assert(GlobalTransactionIdIsNormal(xmax));
GlobalTransactionIdAdvance(xmax);
/* Get the snapshot id */
snapshot->sn_snapid = GTMTransactions.gt_snapid;
/* initialize xmin calculation with xmax */
globalxmin = xmin = xmax;
/* Spin over transaction list checking xid, xmin, and subxids. The goal is to
* gather all active xids and find the lowest xmin */
gtm_foreach(elem, GTMTransactions.gt_open_transactions){
volatile GTM_TransactionInfo *gtm_txninfo = (GTM_TransactionInfo *)gtm_lfirst(elem);
GlobalTransactionId xid;
/* Don't take into account LAZY VACUUMs */
if (gtm_txninfo->gti_vacuum)
continue;
/* Update globalxmin to be the smallest valid xmin */
xid = gtm_txninfo->gti_xmin; /* fetch just once */
if (GlobalTransactionIdIsNormal(xid) && GlobalTransactionIdPrecedes(xid, globalxmin))
globalxmin = xid;
/* Fetch xid just once - see GetNewTransactionId */
xid = gtm_txninfo->gti_gxid;
/*
* If the transaction has been assigned an xid < xmax we add it to the
* snapshot, and update xmin if necessary. There's no need to store
* XIDs >= xmax, since we'll treat them as running anyway. We don't
* bother to examine their subxids either.
*
* We don't include our own XID (if any) in the snapshot, but we must
* include it into xmin.
*/
if (GlobalTransactionIdIsNormal(xid)){
/*
* Unlike Postgres, we include the GXID of the current transaction
* as well in the snapshot. This is necessary because the same
* snapshot is shared by multiple backends through GTM proxy and
* the GXID will vary for each backend.
*
* XXX We should confirm that this does not have any adverse effect
* on the MVCC visibility and check if any changes are related to
* the MVCC checks because of the change
*/
if (GlobalTransactionIdFollowsOrEquals(xid, xmax))
continue;
if (GlobalTransactionIdPrecedes(xid, xmin))
xmin = xid;
snapshot->sn_xip[count++] = xid;
}
}
/* Update globalxmin to include actual process xids. This is a slightly
* different way of computing it than GetOldestXmin uses, but should give
* the same result. */
if (GlobalTransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
snapshot->sn_xmin = xmin;
snapshot->sn_xmax = xmax;
snapshot->sn_xcnt = count;
/* Now, before the proc array lock is released, set the xmin in the txninfo
* structures of all the transactions. */
for (ii = 0; ii < txn_count; ii++){
GTM_Snapshot mysnap = NULL;
/* We have already gone through all the transaction handles above and
* marked the invalid handles with STATUS_ERROR */
if ((status[ii] == STATUS_ERROR) || (status[ii] == STATUS_NOT_FOUND))
continue;
mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]);
mysnap = &mygtm_txninfo->gti_current_snapshot;
if (GTM_IsTransSerializable(mygtm_txninfo)){
if ((mygtm_txninfo->gti_snapshot_set) && (txn_count > 1)){
GTM_RWLockRelease(>MTransactions.gt_TransArrayLock);
elog(ERROR, "Grouped snapshot can only include first snapshot in Serializable transaction");
}
if (!mygtm_txninfo->gti_snapshot_set) {
/* For the first transaction in the array, the snapshot is already set. */
if (snapshot != mysnap){
if (mysnap->sn_xip == NULL){
/* First call for this snapshot */
mysnap->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId));
if (mysnap->sn_xip == NULL) ereport(ERROR, (ENOMEM, errmsg("out of memory")));
}
mysnap->sn_xmin = snapshot->sn_xmin;
mysnap->sn_xmax = snapshot->sn_xmax;
mysnap->sn_xcnt = snapshot->sn_xcnt;
memcpy(mysnap->sn_xip, snapshot->sn_xip,sizeof (GlobalTransactionId) * snapshot->sn_xcnt);
}
mygtm_txninfo->gti_snapshot_set = true;
}
} else if (snapshot != mysnap){
if (mysnap->sn_xip == NULL){
/* First call for this snapshot */
mysnap->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId));
if (mysnap->sn_xip == NULL) {
GTM_RWLockRelease(>MTransactions.gt_TransArrayLock);
ereport(ERROR, (ENOMEM, errmsg("out of memory")));
}
}
mysnap->sn_xmin = snapshot->sn_xmin;
mysnap->sn_xmax = snapshot->sn_xmax;
mysnap->sn_xcnt = snapshot->sn_xcnt;
memcpy(mysnap->sn_xip, snapshot->sn_xip, sizeof (GlobalTransactionId) * snapshot->sn_xcnt);
}
if ((mygtm_txninfo != NULL) && (!GlobalTransactionIdIsValid(mygtm_txninfo->gti_xmin)))
mygtm_txninfo->gti_xmin = xmin;
}
GTM_RWLockRelease(>MTransactions.gt_TransArrayLock);
elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u)",snapshot->sn_xmin, snapshot->sn_xmax,snapshot->sn_xcnt);
return snapshot;
}
快照在TopMostMemoryContext中分配,不在进程内存上下文中分配的原因是,事务可能快客户端连接,比如prepared transactions。
上一篇: ORM简介