Created
May 17, 2014 20:26
-
-
Save lyubent/194c54bf4dea5b211b5c to your computer and use it in GitHub Desktop.
QueryRecorder difference pre package change.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@@ -19,130 +19,134 @@ package org.apache.cassandra.cql3.recording; | |
import java.io.*; | |
import java.nio.ByteBuffer; | |
-import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
import org.apache.cassandra.utils.FBUtilities; | |
+import org.apache.cassandra.utils.concurrent.OpOrder; | |
/** | |
* Used to create and append to the logfile storing executed queries | |
*/ | |
public class QueryRecorder | |
{ | |
- private final String queryLogFileName = "QueryLog"; | |
- private final String queryLogExtension = ".log"; | |
- private final String queryLogDirectory; | |
+ private static final String QUERYLOG_NAME = "QueryLog"; | |
+ private static final String QUERYLOG_EXT = ".log"; | |
+ private final String QUERYLOG_DIR; | |
private final int frequency; | |
private AtomicReference<QueryQueue> queryQueue = new AtomicReference<>(); | |
+ private final OpOrder opOrder = new OpOrder(); | |
public QueryRecorder(int logLimit, int frequency, String queryLogDirectory) | |
{ | |
this.frequency = frequency; | |
- this.queryLogDirectory = queryLogDirectory; | |
+ QUERYLOG_DIR = queryLogDirectory; | |
queryQueue.set(new QueryQueue(logLimit)); | |
} | |
- /** | |
- * Appends nth query to the query log file. | |
- * | |
- * @param queryString Query to be recorded to the query log | |
- */ | |
- public synchronized void append(String queryString) | |
+ public void allocate(String queryString) | |
{ | |
- // lazy init. the query buffer | |
- if (queryQueue.get().queue == null) | |
+ if (queryQueue.get().getQueue() == null) | |
queryQueue.get().initQueue(); | |
- // 8: long (timestamp), 4: int (query length), n: query string | |
- byte [] queryBytes = queryString.getBytes(); | |
- int size = 8 + 4 + queryBytes.length; | |
- byte [] logSegment = ByteBuffer.allocate(size) | |
- .putLong(FBUtilities.timestampMicros()) | |
- .putInt(queryBytes.length) | |
- .put(queryBytes) | |
- .array(); | |
- | |
- // check queue has enough room to add current query | |
- if (size + queryQueue.get().getPosition() < queryQueue.get().queue.length) | |
+ OpOrder.Group opGroup = opOrder.start(); | |
+ try | |
{ | |
- System.arraycopy(logSegment, 0, queryQueue.get().queue, queryQueue.get().getPosition(), size); | |
- queryQueue.get().incPositionBy(size); | |
+ byte [] queryBytes = queryString.getBytes(); | |
+ int size = calcSegmentSize(queryBytes); | |
+ int position = allocate(size); | |
+ byte [] logSegment = ByteBuffer.allocate(size) | |
+ .putLong(FBUtilities.timestampMicros()) | |
+ .putInt(queryBytes.length) | |
+ .put(queryBytes) | |
+ .array(); | |
+ | |
+ // check for room in queue first, if queue is full then close the op, flush the log, | |
+ // re-initialize the queue, re-start the op and then continue with the append after | |
+ // updating the position of the newly initialized queue. | |
+ if (position == -1) | |
+ { | |
+ opGroup.close(); | |
+ runFlush(); | |
+ opGroup = opOrder.start(); | |
+ // re-allocate. | |
+ position = allocate(size); | |
+ } | |
+ append(size, position, logSegment, queryQueue.get()); | |
} | |
- else | |
+ finally | |
{ | |
- runFlush(); | |
+ opGroup.close(); | |
} | |
} | |
- public Integer getFrequency() | |
+ private int allocate(int size) | |
{ | |
- return frequency; | |
- } | |
+ QueryQueue queue; | |
+ int position; | |
+ int length; | |
- public void runFlush() | |
- { | |
- File logFile = new File(queryLogDirectory, FBUtilities.timestampMicros() + queryLogFileName + queryLogExtension); | |
- try (FileOutputStream fos = new FileOutputStream(logFile)) | |
- { | |
- fos.write(queryQueue.get().queue, 0, queryQueue.get().getPosition()); | |
- } | |
- catch (IOException iox) | |
+ while (true) | |
{ | |
- throw new RuntimeException(String.format("Failed to flush query log %s", logFile.getAbsolutePath()), iox); | |
+ queue = queryQueue.get(); | |
+ position = queue.getPosition(); | |
+ length = queue.getQueue().length; | |
+ | |
+ if (position + size > length) | |
+ return -1; | |
+ else if (position + size > 0 && queue.compareAndSetPos(position, position + size)) | |
+ return position; | |
} | |
- | |
- // TODO create a new queryQueue instead of using this one, using compare and set perhaps. | |
- queryQueue.get().initQueue(); | |
- queryQueue.get().logPosition.set(0); | |
} | |
- protected class QueryQueue | |
+ /** | |
+ * Appends nth query to the query log queue. | |
+ * | |
+ * @param logSegment Query to be recorded to the query log | |
+ */ | |
+ private void append(int size, int position, byte [] logSegment, QueryQueue queue) | |
{ | |
- private byte[] queue; | |
- private AtomicInteger logPosition; | |
- private int limit; | |
- | |
- public QueryQueue(int logLimit) | |
- { | |
- this.limit = logLimit * 1024 * 1024; | |
- logPosition = new AtomicInteger(0); | |
- } | |
- | |
- public void initQueue() | |
- { | |
- queue = new byte[limit]; | |
- } | |
+ System.arraycopy(logSegment, 0, queue.getQueue(), position, size); | |
+ } | |
- public int getPosition() | |
- { | |
- return logPosition.get(); | |
- } | |
+ /** | |
+ * Calculates size of a query segment | |
+ * 8: long (timestamp), 4: int (query length), n: query string | |
+ * | |
+ * @param queryBytes query for which to calculate size | |
+ * @return | |
+ */ | |
+ private int calcSegmentSize(byte[] queryBytes) | |
+ { | |
+ return 8 + 4 + queryBytes.length; | |
+ } | |
- public void incPositionBy(int delta) | |
- { | |
- logPosition.addAndGet(delta); | |
- } | |
+ public Integer getFrequency() | |
+ { | |
+ return frequency; | |
} | |
- public class QuerylogSegment | |
+ public synchronized void runFlush() | |
{ | |
- long timestamp; | |
- String queryString; | |
+ byte[] queueToFlush = queryQueue.get().getQueue(); | |
- public QuerylogSegment(long timestamp, byte[] queryString) | |
- { | |
- this.timestamp = timestamp; | |
- this.queryString = new String(queryString); | |
- } | |
+ int finalPos = queryQueue.get().getPosition(); | |
+ int limit = queryQueue.get().getQueue().length; | |
+ if (queryQueue.compareAndSet(queryQueue.get(), new QueryQueue(limit))) | |
+ queryQueue.get().initQueue(); | |
- public long getTimestamp() | |
+ // todo timing out | |
+ OpOrder.Barrier barrier = opOrder.newBarrier(); | |
+ barrier.issue(); | |
+ barrier.await(); | |
+ | |
+ File logFile = new File(QUERYLOG_DIR, FBUtilities.timestampMicros() + QUERYLOG_NAME + QUERYLOG_EXT); | |
+ try (FileOutputStream fos = new FileOutputStream(logFile)) | |
{ | |
- return timestamp; | |
+ fos.write(queueToFlush, 0, finalPos); | |
} | |
- | |
- public String getQueryString() | |
+ catch (IOException iox) | |
{ | |
- return queryString; | |
+ throw new RuntimeException(String.format("Failed to flush query log %s", logFile.getAbsolutePath()), iox); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment