1
2
3
4
5 package trace
6
7 import (
8 "fmt"
9 "internal/trace/tracev2"
10 )
11
12
13 type timestamp uint64
14
15 type batch struct {
16 time timestamp
17 gen uint64
18 data []byte
19 }
20
21
22
23 func readBatch(b []byte) (batch, uint64, error) {
24 if len(b) == 0 {
25 return batch{}, 0, fmt.Errorf("batch is empty")
26 }
27 data := make([]byte, len(b))
28 copy(data, b)
29
30
31 if typ := tracev2.EventType(b[0]); typ == tracev2.EvEndOfGeneration {
32 if len(b) != 1 {
33 return batch{}, 1, fmt.Errorf("unexpected end of generation in batch of size >1")
34 }
35 return batch{data: data}, 1, nil
36 }
37 if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
38 return batch{}, 1, fmt.Errorf("expected batch event, got event %d", typ)
39 }
40 total := 1
41 b = b[1:]
42
43
44 gen, n, err := readUvarint(b)
45 if err != nil {
46 return batch{}, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
47 }
48 total += n
49 b = b[n:]
50
51
52 _, n, err = readUvarint(b)
53 if err != nil {
54 return batch{}, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
55 }
56 total += n
57 b = b[n:]
58
59
60 ts, n, err := readUvarint(b)
61 if err != nil {
62 return batch{}, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
63 }
64 total += n
65 b = b[n:]
66
67
68 size, n, err := readUvarint(b)
69 if err != nil {
70 return batch{}, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
71 }
72 if size > tracev2.MaxBatchSize {
73 return batch{}, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
74 }
75 total += n
76 total += int(size)
77 if total != len(data) {
78 return batch{}, uint64(total), fmt.Errorf("expected complete batch")
79 }
80 data = data[:total]
81
82
83 return batch{
84 gen: gen,
85 time: timestamp(ts),
86 data: data,
87 }, uint64(total), nil
88 }
89
View as plain text