From af2e372091e7a2f189240e1867857a51d4ab424a Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Fri, 25 Nov 2011 14:15:40 +0800 Subject: [PATCH] Use FileChannel to write log in batch,instead of RandomAccessFile --- .../bitronix/tm/gui/TransactionLogHeaderPanel.java | 2 +- .../main/java/bitronix/tm/journal/DiskJournal.java | 5 +- .../tm/journal/TransactionLogAppender.java | 87 +++++++++++++------ .../bitronix/tm/journal/TransactionLogHeader.java | 90 +++++++++++++------- 4 files changed, 120 insertions(+), 64 deletions(-) diff --git a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java index fdab521..f302fb6 100644 --- a/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java +++ b/btm/src/main/java/bitronix/tm/gui/TransactionLogHeaderPanel.java @@ -83,7 +83,7 @@ public class TransactionLogHeaderPanel extends JPanel { public void read(File logFile, boolean active) throws IOException { RandomAccessFile raf = new RandomAccessFile(logFile, "r"); - TransactionLogHeader header = new TransactionLogHeader(raf, 0L); + TransactionLogHeader header = new TransactionLogHeader(raf.getChannel(), 0L); raf.close(); if (log.isDebugEnabled()) { log.debug("read header: " + header); } setLogFile(logFile); diff --git a/btm/src/main/java/bitronix/tm/journal/DiskJournal.java b/btm/src/main/java/bitronix/tm/journal/DiskJournal.java index ae4f5f2..c26567b 100644 --- a/btm/src/main/java/bitronix/tm/journal/DiskJournal.java +++ b/btm/src/main/java/bitronix/tm/journal/DiskJournal.java @@ -92,9 +92,8 @@ public class DiskJournal implements Journal { return; } } - - synchronized (this) { - TransactionLogRecord tlog = new TransactionLogRecord(status, gtrid, uniqueNames); + TransactionLogRecord tlog = new TransactionLogRecord(status, gtrid, uniqueNames); + synchronized (this) { boolean written = activeTla.writeLog(tlog); if (!written) { // time to swap log files diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java index 93e637f..b1e0b44 100644 --- a/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java +++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogAppender.java @@ -28,6 +28,9 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Iterator; +import java.util.Set; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.FileLock; /** @@ -47,7 +50,7 @@ public class TransactionLogAppender { public static final int END_RECORD = 0x786e7442; private final File file; - private final RandomAccessFile randomAccessFile; + private final FileChannel fc; private final FileLock lock; private final TransactionLogHeader header; @@ -65,9 +68,9 @@ public class TransactionLogAppender { public TransactionLogAppender(File file, long maxFileLength) throws IOException { this.maxFileLength = maxFileLength; this.file = file; - this.randomAccessFile = new RandomAccessFile(file, "rw"); - this.header = new TransactionLogHeader(randomAccessFile, maxFileLength); - this.lock = randomAccessFile.getChannel().tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, false); + this.fc = new RandomAccessFile(file, "rw").getChannel(); + this.header = new TransactionLogHeader(fc, maxFileLength); + this.lock = fc.tryLock(0, TransactionLogHeader.TIMESTAMP_HEADER, false); if (this.lock == null) throw new IOException("transaction log file " + file.getName() + " is locked. Is another instance already running?"); @@ -82,6 +85,14 @@ public class TransactionLogAppender { return header; } + private int length(Set set){ + int sum=0; + for(String s:set){ + sum+=s.length(); + } + return sum; + } + /** * Write a {@link TransactionLogRecord} to disk. * @param tlog the record to write to disk. @@ -89,7 +100,7 @@ public class TransactionLogAppender { * @throws IOException if an I/O error occurs. */ public boolean writeLog(TransactionLogRecord tlog) throws IOException { - synchronized (randomAccessFile) { + long futureFilePosition = getHeader().getPosition() + tlog.calculateTotalRecordSize(); if (futureFilePosition >= maxFileLength) { // see TransactionLogHeader.setPosition() as it double-checks this if (log.isDebugEnabled()) @@ -98,27 +109,45 @@ public class TransactionLogAppender { } if (log.isDebugEnabled()) { log.debug("between " + getHeader().getPosition() + " and " + futureFilePosition + ", writing " + tlog); } - randomAccessFile.writeInt(tlog.getStatus()); - randomAccessFile.writeInt(tlog.getRecordLength()); - randomAccessFile.writeInt(tlog.getHeaderLength()); - randomAccessFile.writeLong(tlog.getTime()); - randomAccessFile.writeInt(tlog.getSequenceNumber()); - randomAccessFile.writeInt(tlog.getCrc32()); - randomAccessFile.writeByte((byte) tlog.getGtrid().getArray().length); - randomAccessFile.write(tlog.getGtrid().getArray()); - randomAccessFile.writeInt(tlog.getUniqueNames().size()); - Iterator it = tlog.getUniqueNames().iterator(); - while (it.hasNext()) { - String uniqueName = (String) it.next(); - randomAccessFile.writeShort(uniqueName.length()); - randomAccessFile.writeBytes(uniqueName); // this writes each character discarding the 8th bit. Isn't that US-ASCII ? - } - randomAccessFile.writeInt(tlog.getEndRecord()); + Set uniqueNames = tlog.getUniqueNames(); + + ByteBuffer buf = ByteBuffer.allocate(4 + 4 + 4 + 8 + 4 + 4 + 1 + + tlog.getGtrid().getArray().length + 4 + length(uniqueNames) + + 2 * uniqueNames.size() + 4); + + // buf.putInt(value) + + buf.putInt(tlog.getStatus()); + buf.putInt(tlog.getRecordLength()); + buf.putInt(tlog.getHeaderLength()); + buf.putLong(tlog.getTime()); + buf.putInt(tlog.getSequenceNumber()); + buf.putInt(tlog.getCrc32()); + buf.put((byte) tlog.getGtrid().getArray().length); + buf.put(tlog.getGtrid().getArray()); + + buf.putInt(uniqueNames.size()); + Iterator it = uniqueNames.iterator(); + while (it.hasNext()) { + String uniqueName = (String) it.next(); + buf.putShort((short) uniqueName.length()); + buf.put(uniqueName.getBytes()); // this writes each + // character discarding + // the 8th bit. Isn't + // that US-ASCII ? + } + buf.putInt(tlog.getEndRecord()); + + buf.flip(); + synchronized (fc) { + while (buf.hasRemaining()) + this.fc.write(buf); + } getHeader().goAhead(tlog.calculateTotalRecordSize()); if (log.isDebugEnabled()) { log.debug("disk journal appender now at position " + getHeader().getPosition()); } return true; - } + } /** @@ -126,13 +155,13 @@ public class TransactionLogAppender { * @throws IOException if an I/O error occurs. */ public void close() throws IOException { - synchronized (randomAccessFile) { + synchronized (fc) { shutdownBatcherThread(); getHeader().setState(TransactionLogHeader.CLEAN_LOG_STATE); - randomAccessFile.getFD().sync(); + fc.force(false); lock.release(); - randomAccessFile.close(); + fc.close(); } } @@ -172,11 +201,11 @@ public class TransactionLogAppender { protected void doForce() throws IOException { - synchronized (randomAccessFile) { - if (log.isDebugEnabled()) { log.debug("forcing log writing"); } - randomAccessFile.getFD().sync(); - if (log.isDebugEnabled()) { log.debug("done forcing log"); } + if (log.isDebugEnabled()) { log.debug("forcing log writing"); } + synchronized (fc) { + fc.force(false); } + if (log.isDebugEnabled()) { log.debug("done forcing log"); } } private void spawnBatcherThread() { diff --git a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java index cfacae6..83aeb45 100644 --- a/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java +++ b/btm/src/main/java/bitronix/tm/journal/TransactionLogHeader.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; /** * Used to control a log file's header. @@ -74,7 +76,7 @@ public class TransactionLogHeader { public final static byte UNCLEAN_LOG_STATE = -1; - private final RandomAccessFile randomAccessFile; + private final FileChannel fc; private int formatId; private long timestamp; private byte state; @@ -88,16 +90,22 @@ public class TransactionLogHeader { * @param maxFileLength the max file length. * @throws IOException if an I/O error occurs. */ - public TransactionLogHeader(RandomAccessFile randomAccessFile, long maxFileLength) throws IOException { - this.randomAccessFile = randomAccessFile; + public TransactionLogHeader(FileChannel fc, long maxFileLength) throws IOException { + this.fc = fc; this.maxFileLength = maxFileLength; - randomAccessFile.seek(FORMAT_ID_HEADER); - formatId = randomAccessFile.readInt(); - timestamp = randomAccessFile.readLong(); - state = randomAccessFile.readByte(); - position = randomAccessFile.readLong(); - randomAccessFile.seek(position); + fc.position(FORMAT_ID_HEADER); + ByteBuffer buf = ByteBuffer.allocate(4 + 8 + 1 + 8); + while (buf.hasRemaining()) { + this.fc.read(buf); + } + buf.flip(); + formatId = buf.getInt(); + timestamp = buf.getLong(); + state = buf.get(); + position = buf.getLong(); + fc.position(position); + if (log.isDebugEnabled()) { log.debug("read header " + this); } } @@ -145,12 +153,17 @@ public class TransactionLogHeader { * @throws IOException if an I/O error occurs. */ public void setFormatId(int formatId) throws IOException { - synchronized (randomAccessFile) { - long currentPos = randomAccessFile.getFilePointer(); - randomAccessFile.seek(FORMAT_ID_HEADER); - randomAccessFile.writeInt(formatId); - randomAccessFile.seek(currentPos); - } + ByteBuffer buf = ByteBuffer.allocate(8); + buf.putInt(formatId); + buf.flip(); + synchronized (fc) { + long currentPos = fc.position(); + fc.position(FORMAT_ID_HEADER); + while (buf.hasRemaining()) { + this.fc.write(buf); + } + fc.position(currentPos); + } this.formatId = formatId; } @@ -162,12 +175,17 @@ public class TransactionLogHeader { * @throws IOException if an I/O error occurs. */ public void setTimestamp(long timestamp) throws IOException { - synchronized (randomAccessFile) { - long currentPos = randomAccessFile.getFilePointer(); - randomAccessFile.seek(TIMESTAMP_HEADER); - randomAccessFile.writeLong(timestamp); - randomAccessFile.seek(currentPos); - } + ByteBuffer buf = ByteBuffer.allocate(8); + buf.putLong(position); + buf.flip(); + synchronized (fc) { + long currentPos = fc.position(); + fc.position(TIMESTAMP_HEADER); + while (buf.hasRemaining()) { + this.fc.write(buf); + } + fc.position(currentPos); + } this.timestamp = timestamp; } @@ -179,12 +197,17 @@ public class TransactionLogHeader { * @throws IOException if an I/O error occurs. */ public void setState(byte state) throws IOException { - synchronized (randomAccessFile) { - long currentPos = randomAccessFile.getFilePointer(); - randomAccessFile.seek(STATE_HEADER); - randomAccessFile.writeByte(state); - randomAccessFile.seek(currentPos); - } + ByteBuffer buf = ByteBuffer.allocate(1); + buf.put(state); + buf.flip(); + synchronized (fc) { + long currentPos = fc.position(); + fc.position(STATE_HEADER); + while (buf.hasRemaining()) { + this.fc.write(buf); + } + fc.position(currentPos); + } this.state = state; } @@ -201,10 +224,15 @@ public class TransactionLogHeader { if (position >= maxFileLength) throw new IOException("invalid position " + position + " (too high)"); - synchronized (randomAccessFile) { - randomAccessFile.seek(CURRENT_POSITION_HEADER); - randomAccessFile.writeLong(position); - randomAccessFile.seek(position); + ByteBuffer buf = ByteBuffer.allocate(8); + buf.putLong(position); + buf.flip(); + synchronized (fc) { + fc.position(CURRENT_POSITION_HEADER); + while (buf.hasRemaining()) { + this.fc.write(buf); + } + fc.position(position); } this.position = position; -- 1.7.5.4