欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Postgres-xl GTM(全局事务管理器 Globale Transaction Manager)快照管理

程序员文章站 2022-07-01 08:22:11
...

src/gtm/main/gtm_snap.c

快照请求通信处理函数

ProcessGetSnapshotCommand --> MSG_SNAPSHOT_GET

ProcessGetSnapshotCommand和ProcessGetSnapshotCommandMulti是处理客户端或GTM Proxy请求快照通信的函数。用于解析网络消息,然后调用GTM_GetTransactionSnapshot构建快照。

void ProcessGetSnapshotCommand(Port *myport, StringInfo message, bool get_gxid) {	
	GTM_Snapshot snapshot;
	MemoryContext oldContext;
	int status;
	int txn_count;	
	int sn_xcnt;

先从message消息体中获取事务数量,确保为1个事务,再从消息中获取该事务的gxid。通过GTM_GXIDToHandle(GXID -> handle) 函数将gxid转换为事务句柄。可参考上一篇博客。

    const char *data = NULL;
    GTM_TransactionHandle txn;
   	GlobalTransactionId gxid;
	txn_count = pq_getmsgint(message, sizeof (int));
	Assert(txn_count == 1);
	data = pq_getmsgbytes(message, sizeof (gxid));
	if (data == NULL)
		ereport(ERROR,(EPROTO,errmsg("Message does not contain valid GXID")));
	memcpy(&gxid, data, sizeof(gxid));
	elog(INFO, "Received transaction ID %d for snapshot obtention", gxid);
	txn = GTM_GXIDToHandle(gxid);
	pq_getmsgend(message);

如果ProcessGetSnapshotCommand指定了需要获取gxid,则不再需要消息中传输过来的gxid,通过事务句柄,GTM_GetGlobalTransactionId函数返回目前事务管理器该事务使用的gxid。

	if (get_gxid) {
		gxid = GTM_GetGlobalTransactionId(txn);
		if (gxid == InvalidGlobalTransactionId)
			ereport(ERROR, (EINVAL, errmsg("Failed to get a new transaction id")));
	}

切换到TopMostMemoryContext内存上下文,通过调用GTM_GetTransactionSnapshot函数获取最新快照。

	oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
	if ((snapshot = GTM_GetTransactionSnapshot(&txn, 1, &status)) == NULL)
		ereport(ERROR,(EINVAL,errmsg("Failed to get a snapshot")));
	MemoryContextSwitchTo(oldContext);

向客户端回送消息,包含的消息内容如下所示。

    StringInfoData buf;	
	pq_beginmessage(&buf, 'S');
	pq_sendint(&buf, get_gxid ? SNAPSHOT_GXID_GET_RESULT : SNAPSHOT_GET_RESULT, 4);
	if (myport->remote_type == GTM_NODE_GTM_PROXY){
		GTM_ProxyMsgHeader proxyhdr;
		proxyhdr.ph_conid = myport->conn_id;
		pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
	}
	pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId));
	pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
	pq_sendbytes(&buf, (char *)&status, sizeof(int) * txn_count);
	pq_sendbytes(&buf, (char *)&snapshot->sn_snapid, sizeof (uint64));
	pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId));
	pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId));

	/* Read once */
	sn_xcnt = snapshot->sn_xcnt;
	pq_sendint(&buf, sn_xcnt, sizeof (int));
	pq_sendbytes(&buf, (char *)snapshot->sn_xip, sizeof(GlobalTransactionId) * sn_xcnt);
	pq_endmessage(myport, &buf);
	if (myport->remote_type != GTM_NODE_GTM_PROXY)
		pq_flush(myport);
	return;
}

ProcessGetSnapshotCommandMulti --> MSG_SNAPSHOT_GET_MULTI

和上一函数相比,事务句柄和全局事务xid局部变量都换成了数组,对请求的处理也是基于请求数据包中的事务数来进行循环处理。

void ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message) {
	StringInfoData buf;
	GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
	GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS];
	GTM_Snapshot snapshot;
	MemoryContext oldContext;
	int txn_count;
	int ii;
	int status[GTM_MAX_GLOBAL_TRANSACTIONS];
	int sn_xcnt;

	txn_count = pq_getmsgint(message, sizeof (int));
	for (ii = 0; ii < txn_count; ii++){
		const char *data = pq_getmsgbytes(message, sizeof (gxid[ii]));
		if (data == NULL) ereport(ERROR,(EPROTO,errmsg("Message does not contain valid GXID")));
		memcpy(&gxid[ii], data, sizeof (gxid[ii]));
		txn[ii] = GTM_GXIDToHandle(gxid[ii]);
	}
	pq_getmsgend(message);

切换到TopMostMemoryContext内存上下文,通过调用GTM_GetTransactionSnapshot函数获取最新快照。

	oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
	if ((snapshot = GTM_GetTransactionSnapshot(txn, txn_count, status)) == NULL)
		ereport(ERROR,(EINVAL,errmsg("Failed to get a snapshot")));
	MemoryContextSwitchTo(oldContext);

回送消息填充

	pq_beginmessage(&buf, 'S');
	pq_sendint(&buf, SNAPSHOT_GET_MULTI_RESULT, 4);
	if (myport->remote_type == GTM_NODE_GTM_PROXY){
		GTM_ProxyMsgHeader proxyhdr;
		proxyhdr.ph_conid = myport->conn_id;
		pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
	}
	pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
	pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count);
	pq_sendbytes(&buf, (char *)&snapshot->sn_snapid, sizeof (uint64));
	pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId));
	pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId));
	/* Read once */
	sn_xcnt = snapshot->sn_xcnt;
	pq_sendint(&buf, sn_xcnt, sizeof (int));
	pq_sendbytes(&buf, (char *)snapshot->sn_xip,sizeof(GlobalTransactionId) * sn_xcnt);
	pq_endmessage(myport, &buf);
	if (myport->remote_type != GTM_NODE_GTM_PROXY)
		pq_flush(myport);
	return;
}

GTM_GetTransactionSnapshot函数构建快照并将其存储在GTMTransactions array中以跟踪所有正在处理的事务。如果GTM Proxy请求多个快照,快照管理器只计算一次,将该快照用于请求中的所有事务

static GTM_Snapshot GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *status) {
	GlobalTransactionId xmin;
	GlobalTransactionId xmax;
	GlobalTransactionId globalxmin;
	int			count = 0;
	gtm_ListCell *elem = NULL;
	int ii;
	/* Instead of allocating memory for a snapshot, we use the snapshot of the
	 * first transaction in the given array. The same snapshot will later be
	 * copied to other transaction info structures. */
	GTM_TransactionInfo *mygtm_txninfo = NULL;
	GTM_Snapshot snapshot = NULL;
	memset(status, 0, sizeof (int) * txn_count);
	for (ii = 0; ii < txn_count; ii++){
		/* Even if the request does not contain a valid GXID, we still send down a snapshot, but mark the status field acoordingly */
		if (handle[ii] != InvalidTransactionHandle)
			mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]);
		else
			status[ii] = STATUS_NOT_FOUND;
		/* If the transaction does not exist, just mark the status field with
		 * a STATUS_ERROR code
		 * FIXME This comment seems to be misplaced/stale - we're not checking
		 * if a transaction exists (we've already done that above and set the
		 * status to STATUS_NOT_FOUND). */
		if ((mygtm_txninfo != NULL) && (snapshot == NULL))
			snapshot = &mygtm_txninfo->gti_current_snapshot;
	}
	/* If no valid transaction exists in the array, we record the snapshot in a
	 * thread-specific structure. This allows us to avoid repeated
	 * allocation/freeing of the structure.
	 * Note that we must use a thread-specific variable and not a global
	 * variable because a concurrent thread might compute a new snapshot and
	 * overwrite the snapshot information while we are still sending this copy
	 * to the client. Using a thread-specific storage avoids that problem. */
	if (snapshot == NULL)
		snapshot = &GetMyThreadInfo->thr_snapshot;
	Assert(snapshot != NULL);
	/* This can only happen when using a snapshot from GTMTransactions, as the
	 * thread-specific sn_xip array is allocated statically as part of GTM_ThreadInfo. */
	if (snapshot->sn_xip == NULL){
		/* First call for this snapshot */
		snapshot->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId));
		if (snapshot->sn_xip == NULL)
			ereport(ERROR,(ENOMEM,errmsg("out of memory")));
	}
	/* It is sufficient to get shared lock on ProcArrayLock, even if we are going to set MyProc->xmin. */
	GTM_RWLockAcquire(&GTMTransactions.gt_TransArrayLock, GTM_LOCKMODE_READ);
	/* xmax is always latestCompletedXid + 1 */
	xmax = GTMTransactions.gt_latestCompletedXid;
	Assert(GlobalTransactionIdIsNormal(xmax));
	GlobalTransactionIdAdvance(xmax);
	/* Get the snapshot id */
	snapshot->sn_snapid = GTMTransactions.gt_snapid;
	/* initialize xmin calculation with xmax */
	globalxmin = xmin = xmax;
	/* Spin over transaction list checking xid, xmin, and subxids.  The goal is to
	 * gather all active xids and find the lowest xmin */
	gtm_foreach(elem, GTMTransactions.gt_open_transactions){
		volatile GTM_TransactionInfo *gtm_txninfo = (GTM_TransactionInfo *)gtm_lfirst(elem);
		GlobalTransactionId xid;
		/* Don't take into account LAZY VACUUMs */
		if (gtm_txninfo->gti_vacuum)
			continue;
		/* Update globalxmin to be the smallest valid xmin */
		xid = gtm_txninfo->gti_xmin;		/* fetch just once */
		if (GlobalTransactionIdIsNormal(xid) && GlobalTransactionIdPrecedes(xid, globalxmin))
			globalxmin = xid;
		/* Fetch xid just once - see GetNewTransactionId */
		xid = gtm_txninfo->gti_gxid;
		/*
		 * If the transaction has been assigned an xid < xmax we add it to the
		 * snapshot, and update xmin if necessary.	There's no need to store
		 * XIDs >= xmax, since we'll treat them as running anyway.  We don't
		 * bother to examine their subxids either.
		 *
		 * We don't include our own XID (if any) in the snapshot, but we must
		 * include it into xmin.
		 */
		if (GlobalTransactionIdIsNormal(xid)){
			/*
			 * Unlike Postgres, we include the GXID of the current transaction
			 * as well in the snapshot. This is necessary because the same
			 * snapshot is shared by multiple backends through GTM proxy and
			 * the GXID will vary for each backend.
			 *
			 * XXX We should confirm that this does not have any adverse effect
			 * on the MVCC visibility and check if any changes are related to
			 * the MVCC checks because of the change
			 */
			if (GlobalTransactionIdFollowsOrEquals(xid, xmax))
				continue;
			if (GlobalTransactionIdPrecedes(xid, xmin))
				xmin = xid;
			snapshot->sn_xip[count++] = xid;
		}
	}
	/* Update globalxmin to include actual process xids.  This is a slightly
	 * different way of computing it than GetOldestXmin uses, but should give
	 * the same result. */
	if (GlobalTransactionIdPrecedes(xmin, globalxmin))
		globalxmin = xmin;
	snapshot->sn_xmin = xmin;
	snapshot->sn_xmax = xmax;
	snapshot->sn_xcnt = count;
	/* Now, before the proc array lock is released, set the xmin in the txninfo
	 * structures of all the transactions. */
	for (ii = 0; ii < txn_count; ii++){
		GTM_Snapshot mysnap = NULL;
		/* We have already gone through all the transaction handles above and
		 * marked the invalid handles with STATUS_ERROR */
		if ((status[ii] == STATUS_ERROR) || (status[ii] == STATUS_NOT_FOUND))
			continue;
		mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]);
		mysnap = &mygtm_txninfo->gti_current_snapshot;
		if (GTM_IsTransSerializable(mygtm_txninfo)){
			if ((mygtm_txninfo->gti_snapshot_set) && (txn_count > 1)){
				GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
				elog(ERROR, "Grouped snapshot can only include first snapshot in Serializable transaction");
			}
			if (!mygtm_txninfo->gti_snapshot_set) {
				/* For the first transaction in the array, the snapshot is already set. */
				if (snapshot != mysnap){
					if (mysnap->sn_xip == NULL){
						/* First call for this snapshot */
						mysnap->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId));
						if (mysnap->sn_xip == NULL) ereport(ERROR, (ENOMEM, errmsg("out of memory")));
					}
					mysnap->sn_xmin = snapshot->sn_xmin;
					mysnap->sn_xmax = snapshot->sn_xmax;
					mysnap->sn_xcnt = snapshot->sn_xcnt;
					memcpy(mysnap->sn_xip, snapshot->sn_xip,sizeof (GlobalTransactionId) * snapshot->sn_xcnt);
				}
				mygtm_txninfo->gti_snapshot_set = true;
			}
		} else if (snapshot != mysnap){
			if (mysnap->sn_xip == NULL){
				/* First call for this snapshot */
				mysnap->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId));
				if (mysnap->sn_xip == NULL) {
					GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
					ereport(ERROR, (ENOMEM, errmsg("out of memory")));
				}
			}
			mysnap->sn_xmin = snapshot->sn_xmin;
			mysnap->sn_xmax = snapshot->sn_xmax;
			mysnap->sn_xcnt = snapshot->sn_xcnt;
			memcpy(mysnap->sn_xip, snapshot->sn_xip, sizeof (GlobalTransactionId) * snapshot->sn_xcnt);
		}
		if ((mygtm_txninfo != NULL) && (!GlobalTransactionIdIsValid(mygtm_txninfo->gti_xmin)))
			mygtm_txninfo->gti_xmin = xmin;
	}
	GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
	elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u)",snapshot->sn_xmin, snapshot->sn_xmax,snapshot->sn_xcnt);
	return snapshot;
}

快照在TopMostMemoryContext中分配,不在进程内存上下文中分配的原因是,事务可能快客户端连接,比如prepared transactions。