序列化:
func (s *Service) MarshalQuery(qr io.Reader, uid string, opts query.ExecutionOptions) ([]byte, error) {
b := make([]byte, 8)
index := 0
{
s.Logger.Info(fmt.Sprintf('MarshalQuery RT index : %d', index))
binary.BigEndian.PutUint64(b, (uint64)(RFTYPE_QUERY))
index = index + 8
}
{
b_n_qr := make([]byte, 4096)
qrLen, err := qr.Read(b_n_qr)
s.Logger.Info(fmt.Sprintf('MarshalQuery qr len index : %d, qrLen: %d', index, qrLen))
if 0 < qrLen {
if nil != err {
s.Logger.Error(fmt.Sprintf(`haraft MarshalQuery fail, qr.Read err = %v`, err))
return nil, err
}
}
{
b_n := make([]byte, 8)
binary.BigEndian.PutUint64(b_n, (uint64)(qrLen))
b = append(b, b_n...)
index = index + 8
}
if 0 < qrLen {
b = append(b, b_n_qr[0:qrLen]...)
index = index + qrLen
}
}
{
uidLen := uint64(len(uid))
s.Logger.Info(fmt.Sprintf('MarshalQuery uid len index : %d, uidLen: %d', index, uidLen))
b_n := make([]byte, 8)
binary.BigEndian.PutUint64(b_n, uidLen)
b = append(b, b_n...)
b = append(b, []byte(uid)...)
index = index + 8 + int(uidLen)
}
{
database := opts.Database
retentionPolicy := opts.RetentionPolicy
chunkSize := (uint64)(opts.ChunkSize)
nodeID := opts.NodeID
readOnly := 'false'
if opts.ReadOnly {
readOnly = 'true'
}
{
databaseLen := uint64(len(database))
s.Logger.Info(fmt.Sprintf('MarshalQuery database len index : %d, databaseLen: %d', index, databaseLen))
b_n := make([]byte, 8)
binary.BigEndian.PutUint64(b_n, databaseLen)
b = append(b, b_n...)
if 0 < databaseLen {
b = append(b, []byte(database)...)
}
index = index + 8 + int(databaseLen)
}
{
retentionPolicyLen := uint64(len(retentionPolicy))
s.Logger.Info(fmt.Sprintf('MarshalQuery retentionPolicy len index : %d, retentionPolicyLen: %d', index, retentionPolicyLen))
b_n := make([]byte, 8)
binary.BigEndian.PutUint64(b_n, retentionPolicyLen)
b = append(b, b_n...)
if 0 < retentionPolicyLen {
b = append(b, []byte(retentionPolicy)...)
}
index = index + 8 + int(retentionPolicyLen)
}
{
s.Logger.Info(fmt.Sprintf('MarshalQuery chunkSize len index : %d', index))
b_n := make([]byte, 8)
binary.BigEndian.PutUint64(b_n, chunkSize)
b = append(b, b_n...)
index = index + 8
}
{
s.Logger.Info(fmt.Sprintf('MarshalQuery nodeID len index : %d', index))
b_n := make([]byte, 8)
binary.BigEndian.PutUint64(b_n, nodeID)
b = append(b, b_n...)
index = index + 8
}
{
readOnlyLen := uint64(len(readOnly))
s.Logger.Info(fmt.Sprintf('MarshalQuery readOnly len index : %d, readOnlyLen: %d', index, readOnlyLen))
b_n := make([]byte, 8)
binary.BigEndian.PutUint64(b_n, readOnlyLen)
b = append(b, b_n...)
b = append(b, []byte(readOnly)...)
index = index + 8
}
}
s.Logger.Info(fmt.Sprintf('MarshalQuery over index : %d', index))
return b, nil
}
核心函数:
反序列化:
func (s *Service) UnmarshalQuery(b []byte) (io.Reader, string, query.ExecutionOptions, error) {
index := (uint64)(0)
over := index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery rt len index : %d', index))
uid := ''
var opts query.ExecutionOptions
var qr io.Reader
index = over
over = index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery qr len index : %d', index))
qrLen := binary.BigEndian.Uint64(b[index:over])
if 0 < qrLen {
index = over
over = index + qrLen
s.Logger.Info(fmt.Sprintf('UnmarshalQuery qr value index : %d, qrLen: %d', index, qrLen))
str := string(b[index:over])
qr = strings.NewReader(str)
}
index = over
over = index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery uid len index : %d', index))
uidLen := binary.BigEndian.Uint64(b[index:over])
index = over
over = over + uidLen
s.Logger.Info(fmt.Sprintf('UnmarshalQuery uid value index : %d, uidLen: %d', index, uidLen))
uid = (string)(b[index:over])
index = over
over = index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery database len index : %d', index))
databaseLen := binary.BigEndian.Uint64(b[index:over])
if 0 < databaseLen {
index = over
over = over + databaseLen
s.Logger.Info(fmt.Sprintf('UnmarshalQuery database value index : %d, databaseLen: %d', index, databaseLen))
database := (string)(b[index:over])
opts.Database = database
}
index = over
over = index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery retentionPolicy len index : %d', index))
retentionPolicyLen := binary.BigEndian.Uint64(b[index:over])
if 0 < retentionPolicyLen {
index = over
over = over + retentionPolicyLen
s.Logger.Info(fmt.Sprintf('UnmarshalQuery retentionPolicy value index : %d, databaseLen: %d', index, retentionPolicyLen))
retentionPolicy := (string)(b[index:over])
opts.RetentionPolicy = retentionPolicy
}
index = over
over = index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery chunkSize len index : %d', index))
chunkSize := binary.BigEndian.Uint64(b[index:over])
opts.ChunkSize = int(chunkSize)
index = over
over = index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery nodeID len index : %d', index))
nodeID := binary.BigEndian.Uint64(b[index:over])
opts.NodeID = nodeID
index = over
over = index + 8
s.Logger.Info(fmt.Sprintf('UnmarshalQuery readOnly len index : %d', index))
readOnlyLen := binary.BigEndian.Uint64(b[index:over])
index = over
over = over + readOnlyLen
s.Logger.Info(fmt.Sprintf('UnmarshalQuery readOnly value index : %d, readOnlyLen: %d', index, readOnlyLen))
readonly := (string)(b[index:over])
opts.ReadOnly = readonly == 'true'
index = over
s.Logger.Info(fmt.Sprintf('UnmarshalQuery over index : %d', index))
return qr, uid, opts, nil
}
Influxdb将创建库/measurement/user/策略等相关接口的实现,放在了对外的query中.
为保证能将query接口相关的数据写入raft的日志,本文记录如何将其序列化与反序列化
文章为作者独立观点,不代表股票交易接口观点