pg初始化shmem,给其加上索引"ShmemIndex"后,接着就在shmem里初始化xlog。然后依次初始化clog、subtrans、twophase、multixact。安排按clog、subtrans、multixact、twophase的顺序写,把twophase放到multixact之后是因为前面三个用了相同的算法和数据结构,连起来写可以加深印象和归类记忆。这一篇讨论twophase的初始化。
分布式事务在pg里用两阶段提交(twophase)支持,发起者给一个全局事务ID(GID)标识该事务,同时涉及到的数据库有一个本地事务,pg里是prepare事务。
每一个全局事务(globaltransaction,简写为gxact)都和一个全局事务ID(GID)相关。每个客户端用PREPARE TRANSACTION命令分配一个GID给一个postgres事务,做两阶段提交准备。
pg在共享内存数组里保持所有活跃全局事务。当PREPARE TRANSACTION命令发起后,这个全局事务的GID被保存在数组里。这些发生在WAL日志记录之前,因为如果已经有一个相同GID的已处于prepared状态全局事务时,可以检查重复GID和退出事务。
一个全局事务(gxact)还有一个辅助的PGPROC放在ProcArray数组里。这方便了PGPROC勾住这个全局事务(gxact)的锁。
为了从崩溃/关闭后再启动时一切正常,所有准备好的事务必须存储在永久存储中。这包括锁信息,等待的通知(pending notifications)等。所有状态信息写入到data/pg_towphase文件夹里的每事务状态文件。
2初始化xlog相关结构
话说main()->…->PostmasterMain()->…->reset_shared() -> CreateSharedMemoryAndSemaphores()>…->TwoPhaseShmemInit(),初始化支持分布式事务的两阶段提交/TwoPhase相关数据结构TwoPhaseState等,用作内存里管理和缓存两阶段提交/TwoPhase(对应于存放在"data/ pg_twophase"文件夹里的文件)。
TwoPhaseShmemInit()->ShmemInitStruct(),在其中调用hash_search()在哈希表索引"ShmemIndex"中查找"Prepared Transaction Table",如果没有,就在shmemIndex中给"Prepared Transaction Table"分一个HashElement和ShmemIndexEnt(entry),在其中的Entry中写上"Prepared Transaction Table"。返回ShmemInitStruct(),再调用ShmemAlloc()在共享内存上给"Prepared Transaction Table"相关结构(见下面“TwoPhase相关结构图”)分配空间,设置entry(在这儿即ShmemIndexEnt类型变量)的成员location指向该空间,size成员记录该空间大小,最后返回ShmemInitStruct(),让TwoPhaseStateData *类型静态全局变量TwoPhaseState指向TwoPhaseStateData结构,TwoPhaseStateData的起始地址就是在shmem里给"Prepared Transaction Table"相关结构分配的内存起始地址,设置其中TwoPhaseStateData结构类型的成员值。
相关变量、结构定义和初始化完成后数据结构图在下面。
typedef struct TwoPhaseStateData
{
/* Head of linked list of free GlobalTransactionDatastructs */
GlobalTransactionfreeGXacts;
/* Number of valid prepXacts entries. */
int numPrepXacts;
/*
* There are max_prepared_xacts items in thisarray,but C wants a
* fixed-size array.
*/
GlobalTransactionprepXacts[1]; /*VARIABLE LENGTH ARRAY */
} TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */
static TwoPhaseStateData*TwoPhaseState;
/*
*GlobalTransactionData is defined in twophase.c; other places have no
* business knowingthe internal definition.
*/
typedef structGlobalTransactionData *GlobalTransaction;
typedef struct GlobalTransactionData
{
PGPROC proc; /* dummy proc */
TimestampTzprepared_at; /*time of preparation */
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
Oid owner; /* ID of user that executed the xact */
TransactionIdlocking_xid; /*top-level XID of backend working on xact */
bool valid; /* TRUE iffully prepared */
char gid[GIDSIZE]; /* The GID assignedto the prepared xact */
} GlobalTransactionData;
struct PGPROC
{
/* proc->linksMUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
SHM_QUEUE links; /*list link if process is in a list */
PGSemaphoreData sem; /* ONE semaphore to sleep on */
int waitStatus; /* STATUS_WAITING,STATUS_OK or STATUS_ERROR */
LocalTransactionIdlxid; /* local id of top-leveltransaction currently
* being executed by this proc,if running;
* else InvalidLocalTransactionId */
TransactionId xid; /* id of top-level transactioncurrently being
* executed by this proc,if running and XID
* is assigned; else InvalidTransactionId */
TransactionId xmin; /* minimal running XID as it was whenwe were
* starting our xact,excluding LAZY VACUUM:
* vacuum must not remove tuples deleted by
* xid >= xmin ! */
int pid; /*Backend's process ID; 0 if prepared xact */
/* These fields arezero while a backend is still starting up: */
BackendId backendId; /*This backend's backend ID (if assigned) */
Oid databaseId; /* OID of database this backend is using */
Oid roleId; /*OID of role using this backend */
bool inCommit; /* true if within commit critical section */
uint8 vacuumFlags; /* vacuum-related flags,see above */
/*
* While in hot standby mode,shows that aconflict signal has been sent
* for the current transaction. Set/clearedwhile holding ProcArrayLock,
* though not required. Accessed without lock,if needed.
*/
bool recoveryConflictPending;
/* Info about LWLockthe process is currently waiting for,if any. */
bool lwWaiting; /* true if waiting for an LW lock */
bool lwExclusive; /* true if waiting for exclusive access */
struct PGPROC*lwWaitLink; /* next waiter for same LWlock */
/* Info about lockthe process is currently waiting for,if any. */
/* waitLock andwaitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock objectwe're sleeping on ... */
PROCLOCK *waitProcLock; /* Per-holder info for awaited lock */
LOCKMODE waitLockMode; /* type of lock we're waiting for */
LOCKMASK heldLocks; /*bitmask for lock types already held on this
* lock object by this backend */
Latch procLatch; /* generic latch for process */
/*
* Info to allow us to wait for synchronousreplication,if needed.
* waitLSN is InvalidXLogRecPtr if not waiting;set only by user backend.
* syncRepState must not be touched except byowning process or WALSender.
* syncRepLinks used only while holdingSyncRepLock.
*/
XLogRecPtr waitLSN; /*waiting for this LSN or higher */
int syncRepState; /* wait state for sync rep */
SHM_QUEUE syncRepLinks; /*list link if process is in syncrep queue */
/*
* All PROCLOCK objects for locks held orawaited by this backend are
* linked into one of these lists,according tothe partition number of
* their lock.
*/
SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS];
struct XidCachesubxids; /* cache for subtransactionXIDs */
};
/* shmqueue.c */
typedef struct SHM_QUEUE
{
struct SHM_QUEUE *prev;
struct SHM_QUEUE *next;
} SHM_QUEUE;
关于上面的PGPROC结构这里再说一下。
每一个后台进程(backend)在共享内存里有一个PGPROC结构。还有一个当前未使用的PGPROC结构列表可以从中给新的backend分配。
Links:任何PGPROC在列表里。当等待锁时,PGPROC被链接到锁的等待进程队列里(lock's waitProcs queue)。一个回收的PGPROC被链接到ProcGlobal的freeProcs列表里。
pg的两阶段提交事务管理器还为每一个当前prepared事务创建一个假PGPROC结构。这些PGPROC结构出现在ProcArray数据结构以至于/目的是prepared事务显示/看起来仍然在运行且正确的显示其持有锁。通过它的pid等于0这个事实,一个prepared事务PGPROC能够和一个真正的进程在需要的时候能够被区别。在一个prepared事务PGPROC里的信号和lock-activity字段是不用的,但它的myProcLocks[]数组列表是有效的。
下面看看初始化完"Prepared Transaction Table"相关结构后在内存中的结构图
初始化完twophase相关结构的内存结构图
为了精简上图,把创建shmem的哈希表索引"ShmemIndex"时创建的HCTL结构删掉了,这个结构的作用是记录创建可扩展哈希表的相关信息。增加了左边灰色底的部分,描述共享内存/shmem里各变量物理布局概览,由下往上,由低地址到高地址。其中的"twophase"相关结构图在下面给出,要不上面的图太大太复杂了。
twophase相关结构图