Last active
October 28, 2021 16:15
-
-
Save dtest11/6c37857d42a5085a8e9245c3882e7866 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"encoding/binary" | |
"encoding/json" | |
"io" | |
"log" | |
"os" | |
"path/filepath" | |
) | |
type ConsumeQueue struct { | |
Offset int64 | |
Size int32 | |
TagHashCode int64 | |
} | |
func getHomeDir() string { | |
result, err := os.UserHomeDir() | |
if err != nil { | |
panic(err) | |
} | |
return result | |
} | |
func (c *ConsumeQueue) loadDisk(name string) { | |
absPath := filepath.Join(getHomeDir(), name) | |
file, err := os.Open(absPath) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer file.Close() | |
m := ConsumeQueue{} | |
for i := 0; i < 10; i++ { | |
data, err := readNextBytes(file, 20) | |
if err == io.EOF { | |
break | |
} | |
buffer := bytes.NewBuffer(data) | |
err = binary.Read(buffer, binary.BigEndian, &m) | |
if err != nil { | |
log.Fatal("binary.Read failed", err) | |
} | |
bt, _ := json.Marshal(&m) | |
log.Println(string(bt)) | |
} | |
} | |
func readNextBytes(file *os.File, number int) ([]byte, error) { | |
result := make([]byte, number) | |
_, err := file.Read(result) | |
if err == io.EOF { | |
return result, nil | |
} | |
return result, err | |
} | |
func main() { | |
name := `store/consumequeue/hello/0/00000000000000000000` | |
var c ConsumeQueue | |
c.loadDisk(name) | |
// var index IndexFile | |
// indexPath := `store/index/20211021135708119` | |
// index.loadDisk(indexPath) | |
// "Offset":1042634,"Size":187, | |
offset := 1042634 | |
size := 187 | |
commitlogPath := `store/commitlog/00000000000000000000` | |
getMessage(commitlogPath, int64(offset), size) | |
} | |
/** IndexFile | |
org/apache/rocketmq/store/index | |
*/ | |
type IndexFile struct { | |
Header Header | |
SlotTable [5_0000_00]LinkedList // 500W *4 Bytes | |
} | |
// Header 40Bytes | |
type Header struct { | |
BeginTimestamp int64 `json:"begin_timestamp"` | |
EndTimestamp int64 `json:"end_timestamp"` | |
BeginPhysicsOffset int64 `json:"begin_physics_offset"` | |
EndPhysicsOffset int64 `json:"end_physics_offset"` | |
HashSlotCount int32 `json:"hash_slot_count"` | |
IndexCount int32 `json:"index_count"` | |
} | |
type LinkedList struct { // Index item 20 * 2000W | |
KeyHash int32 | |
PhysicsOffset int64 | |
TimeDiff int32 | |
PrevIndexNo int32 | |
Next *LinkedList | |
} | |
func (index *IndexFile) loadDisk(name string) { | |
absPath := filepath.Join(getHomeDir(), name) | |
file, err := os.Open(absPath) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer file.Close() | |
m := Header{} | |
data, err := readNextBytes(file, 40) | |
if err != nil && err != io.EOF { | |
panic(err) | |
} | |
buffer := bytes.NewBuffer(data) | |
err = binary.Read(buffer, binary.BigEndian, &m) | |
if err != nil { | |
log.Fatal("binary.Read failed", err) | |
} | |
bt, _ := json.Marshal(&m) | |
log.Println(string(bt)) | |
} | |
func (inde *IndexFile) SelectIndex(name string) { | |
absPath := filepath.Join(getHomeDir(), name) | |
file, err := os.Open(absPath) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer file.Close() | |
m := Header{} | |
data, err := readNextBytes(file, 40) | |
if err != nil && err != io.EOF { | |
panic(err) | |
} | |
buffer := bytes.NewBuffer(data) | |
err = binary.Read(buffer, binary.BigEndian, &m) | |
if err != nil { | |
log.Fatal("binary.Read failed", err) | |
} | |
bt, _ := json.Marshal(&m) | |
log.Println(string(bt)) | |
} | |
/** | |
DefaultMessageStore.this.doDispatch(dispatchRequest); | |
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { | |
**/ | |
type CommitLog struct { | |
MsgSize FourByte | |
MagicCode FourByte | |
Crc FourByte | |
QueueId FourByte | |
Flags FourByte | |
QueueOffset EightByte | |
PhysicsOffset EightByte | |
SysFlag FourByte | |
BornTimeStamp EightByte | |
BornHost EightByte // dynamic 8 /20 | |
StoreTimeStamp EightByte | |
StoreHostAddress EightByte // 8 / 20 | |
ReconsumeTimes FourByte | |
PreparedTransactionOffset EightByte | |
BodyLen FourByte | |
MessageBody [16]byte // dynamic | |
TopicLength int8 // [1]byte // 1 | |
Topic [9]byte // dynamic | |
PropertiesLength int16 //[2]byte | |
Properties [98]byte | |
} | |
/** | |
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); | |
**/ | |
func getMessage(name string, offset int64, size int) []byte { | |
position := offset % int64(size) | |
log.Println("pos:", position) | |
absPath := filepath.Join(getHomeDir(), name) | |
file, err := os.Open(absPath) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer file.Close() | |
file.Seek(position, 1) | |
data, err := readNextBytes(file, size) | |
if err != nil && err != io.EOF { | |
panic(err) | |
} | |
parseMsg(data) | |
return data | |
} | |
func parseMsg(data []byte) { | |
var m CommitLog | |
buffer := bytes.NewBuffer(data) | |
err := binary.Read(buffer, binary.BigEndian, &m) | |
if err != nil { | |
log.Fatal("binary.Read failed", err) | |
} | |
bt, _ := json.Marshal(&m) | |
log.Println(string(bt)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment