diff options
Diffstat (limited to 'modules/eventsource')
-rw-r--r-- | modules/eventsource/event.go | 118 | ||||
-rw-r--r-- | modules/eventsource/event_test.go | 53 | ||||
-rw-r--r-- | modules/eventsource/manager.go | 79 | ||||
-rw-r--r-- | modules/eventsource/manager_run.go | 115 | ||||
-rw-r--r-- | modules/eventsource/messenger.go | 68 |
5 files changed, 433 insertions, 0 deletions
diff --git a/modules/eventsource/event.go b/modules/eventsource/event.go new file mode 100644 index 00000000..ebcca509 --- /dev/null +++ b/modules/eventsource/event.go @@ -0,0 +1,118 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package eventsource + +import ( + "bytes" + "fmt" + "io" + "strings" + "time" + + "code.gitea.io/gitea/modules/json" +) + +func wrapNewlines(w io.Writer, prefix, value []byte) (sum int64, err error) { + if len(value) == 0 { + return 0, nil + } + var n int + last := 0 + for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') { + n, err = w.Write(prefix) + sum += int64(n) + if err != nil { + return sum, err + } + n, err = w.Write(value[last : last+j+1]) + sum += int64(n) + if err != nil { + return sum, err + } + last += j + 1 + } + n, err = w.Write(prefix) + sum += int64(n) + if err != nil { + return sum, err + } + n, err = w.Write(value[last:]) + sum += int64(n) + if err != nil { + return sum, err + } + n, err = w.Write([]byte("\n")) + sum += int64(n) + return sum, err +} + +// Event is an eventsource event, not all fields need to be set +type Event struct { + // Name represents the value of the event: tag in the stream + Name string + // Data is either JSONified []byte or any that can be JSONd + Data any + // ID represents the ID of an event + ID string + // Retry tells the receiver only to attempt to reconnect to the source after this time + Retry time.Duration +} + +// WriteTo writes data to w until there's no more data to write or when an error occurs. +// The return value n is the number of bytes written. Any error encountered during the write is also returned. +func (e *Event) WriteTo(w io.Writer) (int64, error) { + sum := int64(0) + var nint int + n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name)) + sum += n + if err != nil { + return sum, err + } + + if e.Data != nil { + var data []byte + switch v := e.Data.(type) { + case []byte: + data = v + case string: + data = []byte(v) + default: + var err error + data, err = json.Marshal(e.Data) + if err != nil { + return sum, err + } + } + n, err := wrapNewlines(w, []byte("data: "), data) + sum += n + if err != nil { + return sum, err + } + } + + n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID)) + sum += n + if err != nil { + return sum, err + } + + if e.Retry != 0 { + nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond)) + sum += int64(nint) + if err != nil { + return sum, err + } + } + + nint, err = w.Write([]byte("\n")) + sum += int64(nint) + + return sum, err +} + +func (e *Event) String() string { + buf := new(strings.Builder) + _, _ = e.WriteTo(buf) + return buf.String() +} diff --git a/modules/eventsource/event_test.go b/modules/eventsource/event_test.go new file mode 100644 index 00000000..4c427288 --- /dev/null +++ b/modules/eventsource/event_test.go @@ -0,0 +1,53 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package eventsource + +import ( + "bytes" + "testing" +) + +func Test_wrapNewlines(t *testing.T) { + tests := []struct { + name string + prefix string + value string + output string + }{ + { + "check no new lines", + "prefix: ", + "value", + "prefix: value\n", + }, + { + "check simple newline", + "prefix: ", + "value1\nvalue2", + "prefix: value1\nprefix: value2\n", + }, + { + "check pathological newlines", + "p: ", + "\n1\n\n2\n3\n", + "p: \np: 1\np: \np: 2\np: 3\np: \n", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &bytes.Buffer{} + gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value)) + if err != nil { + t.Errorf("wrapNewlines() error = %v", err) + return + } + if gotSum != int64(len(tt.output)) { + t.Errorf("wrapNewlines() = %v, want %v", gotSum, int64(len(tt.output))) + } + if gotW := w.String(); gotW != tt.output { + t.Errorf("wrapNewlines() = %v, want %v", gotW, tt.output) + } + }) + } +} diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go new file mode 100644 index 00000000..730cacd9 --- /dev/null +++ b/modules/eventsource/manager.go @@ -0,0 +1,79 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package eventsource + +import ( + "sync" +) + +// Manager manages the eventsource Messengers +type Manager struct { + mutex sync.Mutex + + messengers map[int64]*Messenger + connection chan struct{} +} + +var manager *Manager + +func init() { + manager = &Manager{ + messengers: make(map[int64]*Messenger), + connection: make(chan struct{}, 1), + } +} + +// GetManager returns a Manager and initializes one as singleton if there's none yet +func GetManager() *Manager { + return manager +} + +// Register message channel +func (m *Manager) Register(uid int64) <-chan *Event { + m.mutex.Lock() + messenger, ok := m.messengers[uid] + if !ok { + messenger = NewMessenger(uid) + m.messengers[uid] = messenger + } + select { + case m.connection <- struct{}{}: + default: + } + m.mutex.Unlock() + return messenger.Register() +} + +// Unregister message channel +func (m *Manager) Unregister(uid int64, channel <-chan *Event) { + m.mutex.Lock() + defer m.mutex.Unlock() + messenger, ok := m.messengers[uid] + if !ok { + return + } + if messenger.Unregister(channel) { + delete(m.messengers, uid) + } +} + +// UnregisterAll message channels +func (m *Manager) UnregisterAll() { + m.mutex.Lock() + defer m.mutex.Unlock() + for _, messenger := range m.messengers { + messenger.UnregisterAll() + } + m.messengers = map[int64]*Messenger{} +} + +// SendMessage sends a message to a particular user +func (m *Manager) SendMessage(uid int64, message *Event) { + m.mutex.Lock() + messenger, ok := m.messengers[uid] + m.mutex.Unlock() + if ok { + messenger.SendMessage(message) + } +} diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go new file mode 100644 index 00000000..f66dc78c --- /dev/null +++ b/modules/eventsource/manager_run.go @@ -0,0 +1,115 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package eventsource + +import ( + "context" + "time" + + activities_model "code.gitea.io/gitea/models/activities" + issues_model "code.gitea.io/gitea/models/issues" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/process" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/services/convert" +) + +// Init starts this eventsource +func (m *Manager) Init() { + if setting.UI.Notification.EventSourceUpdateTime <= 0 { + return + } + go graceful.GetManager().RunWithShutdownContext(m.Run) +} + +// Run runs the manager within a provided context +func (m *Manager) Run(ctx context.Context) { + ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true) + defer finished() + + then := timeutil.TimeStampNow().Add(-2) + timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime) +loop: + for { + select { + case <-ctx.Done(): + timer.Stop() + break loop + case <-timer.C: + m.mutex.Lock() + connectionCount := len(m.messengers) + if connectionCount == 0 { + log.Trace("Event source has no listeners") + // empty the connection channel + select { + case <-m.connection: + default: + } + } + m.mutex.Unlock() + if connectionCount == 0 { + // No listeners so the source can be paused + log.Trace("Pausing the eventsource") + select { + case <-ctx.Done(): + break loop + case <-m.connection: + log.Trace("Connection detected - restarting the eventsource") + // OK we're back so lets reset the timer and start again + // We won't change the "then" time because there could be concurrency issues + select { + case <-timer.C: + default: + } + continue + } + } + + now := timeutil.TimeStampNow().Add(-2) + + uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now) + if err != nil { + log.Error("Unable to get UIDcounts: %v", err) + } + for _, uidCount := range uidCounts { + m.SendMessage(uidCount.UserID, &Event{ + Name: "notification-count", + Data: uidCount, + }) + } + then = now + + if setting.Service.EnableTimetracking { + usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx) + if err != nil { + log.Error("Unable to get GetUIDsAndStopwatch: %v", err) + return + } + + for _, userStopwatches := range usersStopwatches { + apiSWs, err := convert.ToStopWatches(ctx, userStopwatches.StopWatches) + if err != nil { + if !issues_model.IsErrIssueNotExist(err) { + log.Error("Unable to APIFormat stopwatches: %v", err) + } + continue + } + dataBs, err := json.Marshal(apiSWs) + if err != nil { + log.Error("Unable to marshal stopwatches: %v", err) + continue + } + m.SendMessage(userStopwatches.UserID, &Event{ + Name: "stopwatches", + Data: string(dataBs), + }) + } + } + } + } + m.UnregisterAll() +} diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go new file mode 100644 index 00000000..378e7171 --- /dev/null +++ b/modules/eventsource/messenger.go @@ -0,0 +1,68 @@ +// Copyright 2020 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package eventsource + +import "sync" + +// Messenger is a per uid message store +type Messenger struct { + mutex sync.Mutex + uid int64 + channels []chan *Event +} + +// NewMessenger creates a messenger for a particular uid +func NewMessenger(uid int64) *Messenger { + return &Messenger{ + uid: uid, + channels: [](chan *Event){}, + } +} + +// Register returns a new chan []byte +func (m *Messenger) Register() <-chan *Event { + m.mutex.Lock() + // TODO: Limit the number of messengers per uid + channel := make(chan *Event, 1) + m.channels = append(m.channels, channel) + m.mutex.Unlock() + return channel +} + +// Unregister removes the provider chan []byte +func (m *Messenger) Unregister(channel <-chan *Event) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + for i, toRemove := range m.channels { + if channel == toRemove { + m.channels = append(m.channels[:i], m.channels[i+1:]...) + close(toRemove) + break + } + } + return len(m.channels) == 0 +} + +// UnregisterAll removes all chan []byte +func (m *Messenger) UnregisterAll() { + m.mutex.Lock() + defer m.mutex.Unlock() + for _, channel := range m.channels { + close(channel) + } + m.channels = nil +} + +// SendMessage sends the message to all registered channels +func (m *Messenger) SendMessage(message *Event) { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.channels { + channel := m.channels[i] + select { + case channel <- message: + default: + } + } +} |