summaryrefslogtreecommitdiffstats
path: root/modules/eventsource
diff options
context:
space:
mode:
Diffstat (limited to 'modules/eventsource')
-rw-r--r--modules/eventsource/event.go118
-rw-r--r--modules/eventsource/event_test.go53
-rw-r--r--modules/eventsource/manager.go79
-rw-r--r--modules/eventsource/manager_run.go115
-rw-r--r--modules/eventsource/messenger.go68
5 files changed, 433 insertions, 0 deletions
diff --git a/modules/eventsource/event.go b/modules/eventsource/event.go
new file mode 100644
index 00000000..ebcca509
--- /dev/null
+++ b/modules/eventsource/event.go
@@ -0,0 +1,118 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package eventsource
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+
+ "code.gitea.io/gitea/modules/json"
+)
+
+func wrapNewlines(w io.Writer, prefix, value []byte) (sum int64, err error) {
+ if len(value) == 0 {
+ return 0, nil
+ }
+ var n int
+ last := 0
+ for j := bytes.IndexByte(value, '\n'); j > -1; j = bytes.IndexByte(value[last:], '\n') {
+ n, err = w.Write(prefix)
+ sum += int64(n)
+ if err != nil {
+ return sum, err
+ }
+ n, err = w.Write(value[last : last+j+1])
+ sum += int64(n)
+ if err != nil {
+ return sum, err
+ }
+ last += j + 1
+ }
+ n, err = w.Write(prefix)
+ sum += int64(n)
+ if err != nil {
+ return sum, err
+ }
+ n, err = w.Write(value[last:])
+ sum += int64(n)
+ if err != nil {
+ return sum, err
+ }
+ n, err = w.Write([]byte("\n"))
+ sum += int64(n)
+ return sum, err
+}
+
+// Event is an eventsource event, not all fields need to be set
+type Event struct {
+ // Name represents the value of the event: tag in the stream
+ Name string
+ // Data is either JSONified []byte or any that can be JSONd
+ Data any
+ // ID represents the ID of an event
+ ID string
+ // Retry tells the receiver only to attempt to reconnect to the source after this time
+ Retry time.Duration
+}
+
+// WriteTo writes data to w until there's no more data to write or when an error occurs.
+// The return value n is the number of bytes written. Any error encountered during the write is also returned.
+func (e *Event) WriteTo(w io.Writer) (int64, error) {
+ sum := int64(0)
+ var nint int
+ n, err := wrapNewlines(w, []byte("event: "), []byte(e.Name))
+ sum += n
+ if err != nil {
+ return sum, err
+ }
+
+ if e.Data != nil {
+ var data []byte
+ switch v := e.Data.(type) {
+ case []byte:
+ data = v
+ case string:
+ data = []byte(v)
+ default:
+ var err error
+ data, err = json.Marshal(e.Data)
+ if err != nil {
+ return sum, err
+ }
+ }
+ n, err := wrapNewlines(w, []byte("data: "), data)
+ sum += n
+ if err != nil {
+ return sum, err
+ }
+ }
+
+ n, err = wrapNewlines(w, []byte("id: "), []byte(e.ID))
+ sum += n
+ if err != nil {
+ return sum, err
+ }
+
+ if e.Retry != 0 {
+ nint, err = fmt.Fprintf(w, "retry: %d\n", int64(e.Retry/time.Millisecond))
+ sum += int64(nint)
+ if err != nil {
+ return sum, err
+ }
+ }
+
+ nint, err = w.Write([]byte("\n"))
+ sum += int64(nint)
+
+ return sum, err
+}
+
+func (e *Event) String() string {
+ buf := new(strings.Builder)
+ _, _ = e.WriteTo(buf)
+ return buf.String()
+}
diff --git a/modules/eventsource/event_test.go b/modules/eventsource/event_test.go
new file mode 100644
index 00000000..4c427288
--- /dev/null
+++ b/modules/eventsource/event_test.go
@@ -0,0 +1,53 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package eventsource
+
+import (
+ "bytes"
+ "testing"
+)
+
+func Test_wrapNewlines(t *testing.T) {
+ tests := []struct {
+ name string
+ prefix string
+ value string
+ output string
+ }{
+ {
+ "check no new lines",
+ "prefix: ",
+ "value",
+ "prefix: value\n",
+ },
+ {
+ "check simple newline",
+ "prefix: ",
+ "value1\nvalue2",
+ "prefix: value1\nprefix: value2\n",
+ },
+ {
+ "check pathological newlines",
+ "p: ",
+ "\n1\n\n2\n3\n",
+ "p: \np: 1\np: \np: 2\np: 3\np: \n",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &bytes.Buffer{}
+ gotSum, err := wrapNewlines(w, []byte(tt.prefix), []byte(tt.value))
+ if err != nil {
+ t.Errorf("wrapNewlines() error = %v", err)
+ return
+ }
+ if gotSum != int64(len(tt.output)) {
+ t.Errorf("wrapNewlines() = %v, want %v", gotSum, int64(len(tt.output)))
+ }
+ if gotW := w.String(); gotW != tt.output {
+ t.Errorf("wrapNewlines() = %v, want %v", gotW, tt.output)
+ }
+ })
+ }
+}
diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go
new file mode 100644
index 00000000..730cacd9
--- /dev/null
+++ b/modules/eventsource/manager.go
@@ -0,0 +1,79 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package eventsource
+
+import (
+ "sync"
+)
+
+// Manager manages the eventsource Messengers
+type Manager struct {
+ mutex sync.Mutex
+
+ messengers map[int64]*Messenger
+ connection chan struct{}
+}
+
+var manager *Manager
+
+func init() {
+ manager = &Manager{
+ messengers: make(map[int64]*Messenger),
+ connection: make(chan struct{}, 1),
+ }
+}
+
+// GetManager returns a Manager and initializes one as singleton if there's none yet
+func GetManager() *Manager {
+ return manager
+}
+
+// Register message channel
+func (m *Manager) Register(uid int64) <-chan *Event {
+ m.mutex.Lock()
+ messenger, ok := m.messengers[uid]
+ if !ok {
+ messenger = NewMessenger(uid)
+ m.messengers[uid] = messenger
+ }
+ select {
+ case m.connection <- struct{}{}:
+ default:
+ }
+ m.mutex.Unlock()
+ return messenger.Register()
+}
+
+// Unregister message channel
+func (m *Manager) Unregister(uid int64, channel <-chan *Event) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ messenger, ok := m.messengers[uid]
+ if !ok {
+ return
+ }
+ if messenger.Unregister(channel) {
+ delete(m.messengers, uid)
+ }
+}
+
+// UnregisterAll message channels
+func (m *Manager) UnregisterAll() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for _, messenger := range m.messengers {
+ messenger.UnregisterAll()
+ }
+ m.messengers = map[int64]*Messenger{}
+}
+
+// SendMessage sends a message to a particular user
+func (m *Manager) SendMessage(uid int64, message *Event) {
+ m.mutex.Lock()
+ messenger, ok := m.messengers[uid]
+ m.mutex.Unlock()
+ if ok {
+ messenger.SendMessage(message)
+ }
+}
diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go
new file mode 100644
index 00000000..f66dc78c
--- /dev/null
+++ b/modules/eventsource/manager_run.go
@@ -0,0 +1,115 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package eventsource
+
+import (
+ "context"
+ "time"
+
+ activities_model "code.gitea.io/gitea/models/activities"
+ issues_model "code.gitea.io/gitea/models/issues"
+ "code.gitea.io/gitea/modules/graceful"
+ "code.gitea.io/gitea/modules/json"
+ "code.gitea.io/gitea/modules/log"
+ "code.gitea.io/gitea/modules/process"
+ "code.gitea.io/gitea/modules/setting"
+ "code.gitea.io/gitea/modules/timeutil"
+ "code.gitea.io/gitea/services/convert"
+)
+
+// Init starts this eventsource
+func (m *Manager) Init() {
+ if setting.UI.Notification.EventSourceUpdateTime <= 0 {
+ return
+ }
+ go graceful.GetManager().RunWithShutdownContext(m.Run)
+}
+
+// Run runs the manager within a provided context
+func (m *Manager) Run(ctx context.Context) {
+ ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: EventSource", process.SystemProcessType, true)
+ defer finished()
+
+ then := timeutil.TimeStampNow().Add(-2)
+ timer := time.NewTicker(setting.UI.Notification.EventSourceUpdateTime)
+loop:
+ for {
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ break loop
+ case <-timer.C:
+ m.mutex.Lock()
+ connectionCount := len(m.messengers)
+ if connectionCount == 0 {
+ log.Trace("Event source has no listeners")
+ // empty the connection channel
+ select {
+ case <-m.connection:
+ default:
+ }
+ }
+ m.mutex.Unlock()
+ if connectionCount == 0 {
+ // No listeners so the source can be paused
+ log.Trace("Pausing the eventsource")
+ select {
+ case <-ctx.Done():
+ break loop
+ case <-m.connection:
+ log.Trace("Connection detected - restarting the eventsource")
+ // OK we're back so lets reset the timer and start again
+ // We won't change the "then" time because there could be concurrency issues
+ select {
+ case <-timer.C:
+ default:
+ }
+ continue
+ }
+ }
+
+ now := timeutil.TimeStampNow().Add(-2)
+
+ uidCounts, err := activities_model.GetUIDsAndNotificationCounts(ctx, then, now)
+ if err != nil {
+ log.Error("Unable to get UIDcounts: %v", err)
+ }
+ for _, uidCount := range uidCounts {
+ m.SendMessage(uidCount.UserID, &Event{
+ Name: "notification-count",
+ Data: uidCount,
+ })
+ }
+ then = now
+
+ if setting.Service.EnableTimetracking {
+ usersStopwatches, err := issues_model.GetUIDsAndStopwatch(ctx)
+ if err != nil {
+ log.Error("Unable to get GetUIDsAndStopwatch: %v", err)
+ return
+ }
+
+ for _, userStopwatches := range usersStopwatches {
+ apiSWs, err := convert.ToStopWatches(ctx, userStopwatches.StopWatches)
+ if err != nil {
+ if !issues_model.IsErrIssueNotExist(err) {
+ log.Error("Unable to APIFormat stopwatches: %v", err)
+ }
+ continue
+ }
+ dataBs, err := json.Marshal(apiSWs)
+ if err != nil {
+ log.Error("Unable to marshal stopwatches: %v", err)
+ continue
+ }
+ m.SendMessage(userStopwatches.UserID, &Event{
+ Name: "stopwatches",
+ Data: string(dataBs),
+ })
+ }
+ }
+ }
+ }
+ m.UnregisterAll()
+}
diff --git a/modules/eventsource/messenger.go b/modules/eventsource/messenger.go
new file mode 100644
index 00000000..378e7171
--- /dev/null
+++ b/modules/eventsource/messenger.go
@@ -0,0 +1,68 @@
+// Copyright 2020 The Gitea Authors. All rights reserved.
+// SPDX-License-Identifier: MIT
+
+package eventsource
+
+import "sync"
+
+// Messenger is a per uid message store
+type Messenger struct {
+ mutex sync.Mutex
+ uid int64
+ channels []chan *Event
+}
+
+// NewMessenger creates a messenger for a particular uid
+func NewMessenger(uid int64) *Messenger {
+ return &Messenger{
+ uid: uid,
+ channels: [](chan *Event){},
+ }
+}
+
+// Register returns a new chan []byte
+func (m *Messenger) Register() <-chan *Event {
+ m.mutex.Lock()
+ // TODO: Limit the number of messengers per uid
+ channel := make(chan *Event, 1)
+ m.channels = append(m.channels, channel)
+ m.mutex.Unlock()
+ return channel
+}
+
+// Unregister removes the provider chan []byte
+func (m *Messenger) Unregister(channel <-chan *Event) bool {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for i, toRemove := range m.channels {
+ if channel == toRemove {
+ m.channels = append(m.channels[:i], m.channels[i+1:]...)
+ close(toRemove)
+ break
+ }
+ }
+ return len(m.channels) == 0
+}
+
+// UnregisterAll removes all chan []byte
+func (m *Messenger) UnregisterAll() {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for _, channel := range m.channels {
+ close(channel)
+ }
+ m.channels = nil
+}
+
+// SendMessage sends the message to all registered channels
+func (m *Messenger) SendMessage(message *Event) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for i := range m.channels {
+ channel := m.channels[i]
+ select {
+ case channel <- message:
+ default:
+ }
+ }
+}