Skip to content

Instantly share code, notes, and snippets.

@lyubent
Created May 17, 2014 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lyubent/194c54bf4dea5b211b5c to your computer and use it in GitHub Desktop.
Save lyubent/194c54bf4dea5b211b5c to your computer and use it in GitHub Desktop.
QueryRecorder difference pre package change.
@@ -19,130 +19,134 @@ package org.apache.cassandra.cql3.recording;
import java.io.*;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
/**
* Used to create and append to the logfile storing executed queries
*/
public class QueryRecorder
{
- private final String queryLogFileName = "QueryLog";
- private final String queryLogExtension = ".log";
- private final String queryLogDirectory;
+ private static final String QUERYLOG_NAME = "QueryLog";
+ private static final String QUERYLOG_EXT = ".log";
+ private final String QUERYLOG_DIR;
private final int frequency;
private AtomicReference<QueryQueue> queryQueue = new AtomicReference<>();
+ private final OpOrder opOrder = new OpOrder();
public QueryRecorder(int logLimit, int frequency, String queryLogDirectory)
{
this.frequency = frequency;
- this.queryLogDirectory = queryLogDirectory;
+ QUERYLOG_DIR = queryLogDirectory;
queryQueue.set(new QueryQueue(logLimit));
}
- /**
- * Appends nth query to the query log file.
- *
- * @param queryString Query to be recorded to the query log
- */
- public synchronized void append(String queryString)
+ public void allocate(String queryString)
{
- // lazy init. the query buffer
- if (queryQueue.get().queue == null)
+ if (queryQueue.get().getQueue() == null)
queryQueue.get().initQueue();
- // 8: long (timestamp), 4: int (query length), n: query string
- byte [] queryBytes = queryString.getBytes();
- int size = 8 + 4 + queryBytes.length;
- byte [] logSegment = ByteBuffer.allocate(size)
- .putLong(FBUtilities.timestampMicros())
- .putInt(queryBytes.length)
- .put(queryBytes)
- .array();
-
- // check queue has enough room to add current query
- if (size + queryQueue.get().getPosition() < queryQueue.get().queue.length)
+ OpOrder.Group opGroup = opOrder.start();
+ try
{
- System.arraycopy(logSegment, 0, queryQueue.get().queue, queryQueue.get().getPosition(), size);
- queryQueue.get().incPositionBy(size);
+ byte [] queryBytes = queryString.getBytes();
+ int size = calcSegmentSize(queryBytes);
+ int position = allocate(size);
+ byte [] logSegment = ByteBuffer.allocate(size)
+ .putLong(FBUtilities.timestampMicros())
+ .putInt(queryBytes.length)
+ .put(queryBytes)
+ .array();
+
+ // check for room in queue first, if queue is full then close the op, flush the log,
+ // re-initialize the queue, re-start the op and then continue with the append after
+ // updating the position of the newly initialized queue.
+ if (position == -1)
+ {
+ opGroup.close();
+ runFlush();
+ opGroup = opOrder.start();
+ // re-allocate.
+ position = allocate(size);
+ }
+ append(size, position, logSegment, queryQueue.get());
}
- else
+ finally
{
- runFlush();
+ opGroup.close();
}
}
- public Integer getFrequency()
+ private int allocate(int size)
{
- return frequency;
- }
+ QueryQueue queue;
+ int position;
+ int length;
- public void runFlush()
- {
- File logFile = new File(queryLogDirectory, FBUtilities.timestampMicros() + queryLogFileName + queryLogExtension);
- try (FileOutputStream fos = new FileOutputStream(logFile))
- {
- fos.write(queryQueue.get().queue, 0, queryQueue.get().getPosition());
- }
- catch (IOException iox)
+ while (true)
{
- throw new RuntimeException(String.format("Failed to flush query log %s", logFile.getAbsolutePath()), iox);
+ queue = queryQueue.get();
+ position = queue.getPosition();
+ length = queue.getQueue().length;
+
+ if (position + size > length)
+ return -1;
+ else if (position + size > 0 && queue.compareAndSetPos(position, position + size))
+ return position;
}
-
- // TODO create a new queryQueue instead of using this one, using compare and set perhaps.
- queryQueue.get().initQueue();
- queryQueue.get().logPosition.set(0);
}
- protected class QueryQueue
+ /**
+ * Appends nth query to the query log queue.
+ *
+ * @param logSegment Query to be recorded to the query log
+ */
+ private void append(int size, int position, byte [] logSegment, QueryQueue queue)
{
- private byte[] queue;
- private AtomicInteger logPosition;
- private int limit;
-
- public QueryQueue(int logLimit)
- {
- this.limit = logLimit * 1024 * 1024;
- logPosition = new AtomicInteger(0);
- }
-
- public void initQueue()
- {
- queue = new byte[limit];
- }
+ System.arraycopy(logSegment, 0, queue.getQueue(), position, size);
+ }
- public int getPosition()
- {
- return logPosition.get();
- }
+ /**
+ * Calculates size of a query segment
+ * 8: long (timestamp), 4: int (query length), n: query string
+ *
+ * @param queryBytes query for which to calculate size
+ * @return
+ */
+ private int calcSegmentSize(byte[] queryBytes)
+ {
+ return 8 + 4 + queryBytes.length;
+ }
- public void incPositionBy(int delta)
- {
- logPosition.addAndGet(delta);
- }
+ public Integer getFrequency()
+ {
+ return frequency;
}
- public class QuerylogSegment
+ public synchronized void runFlush()
{
- long timestamp;
- String queryString;
+ byte[] queueToFlush = queryQueue.get().getQueue();
- public QuerylogSegment(long timestamp, byte[] queryString)
- {
- this.timestamp = timestamp;
- this.queryString = new String(queryString);
- }
+ int finalPos = queryQueue.get().getPosition();
+ int limit = queryQueue.get().getQueue().length;
+ if (queryQueue.compareAndSet(queryQueue.get(), new QueryQueue(limit)))
+ queryQueue.get().initQueue();
- public long getTimestamp()
+ // todo timing out
+ OpOrder.Barrier barrier = opOrder.newBarrier();
+ barrier.issue();
+ barrier.await();
+
+ File logFile = new File(QUERYLOG_DIR, FBUtilities.timestampMicros() + QUERYLOG_NAME + QUERYLOG_EXT);
+ try (FileOutputStream fos = new FileOutputStream(logFile))
{
- return timestamp;
+ fos.write(queueToFlush, 0, finalPos);
}
-
- public String getQueryString()
+ catch (IOException iox)
{
- return queryString;
+ throw new RuntimeException(String.format("Failed to flush query log %s", logFile.getAbsolutePath()), iox);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment