在java中有如下三种事务,
- 简单的jdbc级的事务
- jta – 在ejb环境下,用户得到事务并进行控制
- cmp – 完全由容器控制事务,用户通过bean配置文件来定义事务行为
二三种都支持分布式事务,但只支持java环境下的分布式事务。
下面讨论如何在java程序里实现分布式事务,即在同一个事务里访问多个数据源。实际上就是如何使用jta.
这里假设使用oracle数据库,使用weblogic部署应用,所要做的是如下几步:
1. 配置
1.1 确认数据库支持分布式事务 – oracle是支持分布式事务的,jdbc驱动也支持分布式事务
1.2 在weblogic里配置datasource
1.2.1. 配置连接池,注意这里应该选择驱动是thin xa而不是thin
1.2.2. 配置数据源,使用前面配好的xa的连接池
2. 程序实现
2.1. 实现自己的xid
import javax.transaction.xa.*;
public class myxid implements xid
{
protected int formatid;
protected byte gtrid[];
protected byte bqual[];
public myxid()
{
}
public myxid(int formatid, byte gtrid[], byte bqual[])
{
this.formatid = formatid;
this.gtrid = gtrid;
this.bqual = bqual;
}
public class myxid implements xid
{
protected int formatid;
protected byte gtrid[];
protected byte bqual[];
public myxid()
{
}
public myxid(int formatid, byte gtrid[], byte bqual[])
{
this.formatid = formatid;
this.gtrid = gtrid;
this.bqual = bqual;
}
public int getformatid()
{
return formatid;
}
public byte[] getbranchqualifier()
{
return bqual;
}
public byte[] getglobaltransactionid()
{
return gtrid;
}
}
2.2. 通过jndi找到weblogic中配置好的数据源
public xadatasource getxadatasource()
throws exception
{
initialcontext ctx = new initialcontext( mgr.getprops());
xadatasource ds = (xadatasource)ctx.lookup("jdbc/xads");
return ds;
}
throws exception
{
initialcontext ctx = new initialcontext( mgr.getprops());
xadatasource ds = (xadatasource)ctx.lookup("jdbc/xads");
return ds;
}
2.3. 使用xadatasource得到xaconnection,使用xaconnection得到xaresource,基于xaresource进行具体数据访问。如果我们这里lookup多个xadatasource,然后得到多个xaresource,就可以实现多数据源的事务控制。
xadatasource xads;
xaconnection xacon;
xaresource xares;
xid xid;
connection con;
statement stmt;
int ret;
xads = getxadatasource();
xacon = xads.getxaconnection();
xares = xacon.getxaresource();
con = xacon.getconnection();
stmt = con.createstatement();
xid = new myxid(100, new byte[]{0x01}, new byte[]{0x02});
try {
xares.start(xid, xaresource.tmnoflags);
stmt.executeupdate(“insert into test_table values (100)”);
xares.end(xid, xaresource.tmsuccess);
ret = xares.prepare(xid);
if (ret == xaresource.xa_ok) {
xares.commit(xid, false);
}
}
catch (xaexception e) {
e.printstacktrace();
}
finally {
stmt.close();
con.close();
xacon.close();
}
xaconnection xacon;
xaresource xares;
xid xid;
connection con;
statement stmt;
int ret;
xads = getxadatasource();
xacon = xads.getxaconnection();
xares = xacon.getxaresource();
con = xacon.getconnection();
stmt = con.createstatement();
xid = new myxid(100, new byte[]{0x01}, new byte[]{0x02});
try {
xares.start(xid, xaresource.tmnoflags);
stmt.executeupdate(“insert into test_table values (100)”);
xares.end(xid, xaresource.tmsuccess);
ret = xares.prepare(xid);
if (ret == xaresource.xa_ok) {
xares.commit(xid, false);
}
}
catch (xaexception e) {
e.printstacktrace();
}
finally {
stmt.close();
con.close();
xacon.close();
}
