Skip to content

Instantly share code, notes, and snippets.

@dtest11
Last active October 28, 2021 16:15
Show Gist options
  • Save dtest11/6c37857d42a5085a8e9245c3882e7866 to your computer and use it in GitHub Desktop.
Save dtest11/6c37857d42a5085a8e9245c3882e7866 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"io"
"log"
"os"
"path/filepath"
)
type ConsumeQueue struct {
Offset int64
Size int32
TagHashCode int64
}
func getHomeDir() string {
result, err := os.UserHomeDir()
if err != nil {
panic(err)
}
return result
}
func (c *ConsumeQueue) loadDisk(name string) {
absPath := filepath.Join(getHomeDir(), name)
file, err := os.Open(absPath)
if err != nil {
log.Fatal(err)
}
defer file.Close()
m := ConsumeQueue{}
for i := 0; i < 10; i++ {
data, err := readNextBytes(file, 20)
if err == io.EOF {
break
}
buffer := bytes.NewBuffer(data)
err = binary.Read(buffer, binary.BigEndian, &m)
if err != nil {
log.Fatal("binary.Read failed", err)
}
bt, _ := json.Marshal(&m)
log.Println(string(bt))
}
}
func readNextBytes(file *os.File, number int) ([]byte, error) {
result := make([]byte, number)
_, err := file.Read(result)
if err == io.EOF {
return result, nil
}
return result, err
}
func main() {
name := `store/consumequeue/hello/0/00000000000000000000`
var c ConsumeQueue
c.loadDisk(name)
// var index IndexFile
// indexPath := `store/index/20211021135708119`
// index.loadDisk(indexPath)
// "Offset":1042634,"Size":187,
offset := 1042634
size := 187
commitlogPath := `store/commitlog/00000000000000000000`
getMessage(commitlogPath, int64(offset), size)
}
/** IndexFile
org/apache/rocketmq/store/index
*/
type IndexFile struct {
Header Header
SlotTable [5_0000_00]LinkedList // 500W *4 Bytes
}
// Header 40Bytes
type Header struct {
BeginTimestamp int64 `json:"begin_timestamp"`
EndTimestamp int64 `json:"end_timestamp"`
BeginPhysicsOffset int64 `json:"begin_physics_offset"`
EndPhysicsOffset int64 `json:"end_physics_offset"`
HashSlotCount int32 `json:"hash_slot_count"`
IndexCount int32 `json:"index_count"`
}
type LinkedList struct { // Index item 20 * 2000W
KeyHash int32
PhysicsOffset int64
TimeDiff int32
PrevIndexNo int32
Next *LinkedList
}
func (index *IndexFile) loadDisk(name string) {
absPath := filepath.Join(getHomeDir(), name)
file, err := os.Open(absPath)
if err != nil {
log.Fatal(err)
}
defer file.Close()
m := Header{}
data, err := readNextBytes(file, 40)
if err != nil && err != io.EOF {
panic(err)
}
buffer := bytes.NewBuffer(data)
err = binary.Read(buffer, binary.BigEndian, &m)
if err != nil {
log.Fatal("binary.Read failed", err)
}
bt, _ := json.Marshal(&m)
log.Println(string(bt))
}
func (inde *IndexFile) SelectIndex(name string) {
absPath := filepath.Join(getHomeDir(), name)
file, err := os.Open(absPath)
if err != nil {
log.Fatal(err)
}
defer file.Close()
m := Header{}
data, err := readNextBytes(file, 40)
if err != nil && err != io.EOF {
panic(err)
}
buffer := bytes.NewBuffer(data)
err = binary.Read(buffer, binary.BigEndian, &m)
if err != nil {
log.Fatal("binary.Read failed", err)
}
bt, _ := json.Marshal(&m)
log.Println(string(bt))
}
/**
DefaultMessageStore.this.doDispatch(dispatchRequest);
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
**/
type CommitLog struct {
MsgSize FourByte
MagicCode FourByte
Crc FourByte
QueueId FourByte
Flags FourByte
QueueOffset EightByte
PhysicsOffset EightByte
SysFlag FourByte
BornTimeStamp EightByte
BornHost EightByte // dynamic 8 /20
StoreTimeStamp EightByte
StoreHostAddress EightByte // 8 / 20
ReconsumeTimes FourByte
PreparedTransactionOffset EightByte
BodyLen FourByte
MessageBody [16]byte // dynamic
TopicLength int8 // [1]byte // 1
Topic [9]byte // dynamic
PropertiesLength int16 //[2]byte
Properties [98]byte
}
/**
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
**/
func getMessage(name string, offset int64, size int) []byte {
position := offset % int64(size)
log.Println("pos:", position)
absPath := filepath.Join(getHomeDir(), name)
file, err := os.Open(absPath)
if err != nil {
log.Fatal(err)
}
defer file.Close()
file.Seek(position, 1)
data, err := readNextBytes(file, size)
if err != nil && err != io.EOF {
panic(err)
}
parseMsg(data)
return data
}
func parseMsg(data []byte) {
var m CommitLog
buffer := bytes.NewBuffer(data)
err := binary.Read(buffer, binary.BigEndian, &m)
if err != nil {
log.Fatal("binary.Read failed", err)
}
bt, _ := json.Marshal(&m)
log.Println(string(bt))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment