feature: Refresh dashboards when entity files change. (#319)

* feature: Refresh dashboards when entities change

* feature: Refresh dashboards when entities change

* bugfix: Concurrency, lock around websocket write
This commit is contained in:
James Read
2024-05-24 23:10:38 +01:00
committed by GitHub
parent 18423a9888
commit 3904f8563d
7 changed files with 74 additions and 12 deletions

View File

@@ -184,6 +184,7 @@ message GetReadyzResponse {
string status = 1; string status = 1;
} }
message EventEntityChanged {}
message EventConfigChanged {} message EventConfigChanged {}
message EventExecutionFinished { message EventExecutionFinished {
LogEntry log_entry = 1; LogEntry log_entry = 1;

View File

@@ -144,6 +144,7 @@ func main() {
go onfileindir.WatchFilesInDirectory(cfg, executor) go onfileindir.WatchFilesInDirectory(cfg, executor)
go oncalendarfile.Schedule(cfg, executor) go oncalendarfile.Schedule(cfg, executor)
entityfiles.AddListener(websocket.OnEntityChanged)
go entityfiles.SetupEntityFileWatchers(cfg) go entityfiles.SetupEntityFileWatchers(cfg)
go updatecheck.StartUpdateChecker(version, commit, cfg, cfg.GetDir()) go updatecheck.StartUpdateChecker(version, commit, cfg, cfg.GetDir())

View File

@@ -14,6 +14,15 @@ import (
"strings" "strings"
) )
var (
EntityChangedSender chan bool
listeners []func()
)
func AddListener(l func()) {
listeners = append(listeners, l)
}
func SetupEntityFileWatchers(cfg *config.Config) { func SetupEntityFileWatchers(cfg *config.Config) {
configDir := cfg.GetDir() configDir := cfg.GetDir()
@@ -121,4 +130,8 @@ func updateEvmFromFile(entityname string, data []map[string]string) {
sv.Set(prefix+"."+k, v) sv.Set(prefix+"."+k, v)
} }
} }
for _, l := range listeners {
l()
}
} }

View File

@@ -4,6 +4,19 @@ import (
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"path/filepath" "path/filepath"
"time"
)
var (
debounceWriteLog map[string]time.Time
)
func init() {
debounceWriteLog = make(map[string]time.Time)
}
const (
debounceDelay = 300
) )
type watchContext struct { type watchContext struct {
@@ -11,6 +24,7 @@ type watchContext struct {
filedir string filedir string
callback func(filename string) callback func(filename string)
interestedEvent fsnotify.Op interestedEvent fsnotify.Op
event *fsnotify.Event
} }
func WatchDirectoryCreate(fullpath string, callback func(filename string)) { func WatchDirectoryCreate(fullpath string, callback func(filename string)) {
@@ -73,7 +87,9 @@ func watchPath(ctx *watchContext) {
func processEvent(ctx *watchContext, watcher *fsnotify.Watcher) { func processEvent(ctx *watchContext, watcher *fsnotify.Watcher) {
select { select {
case event, ok := <-watcher.Events: case event, ok := <-watcher.Events:
if !consumeEvent(ok, ctx, &event) { ctx.event = &event
if !consumeEvent(ok, ctx) {
return return
} }
@@ -84,26 +100,39 @@ func processEvent(ctx *watchContext, watcher *fsnotify.Watcher) {
} }
} }
func consumeEvent(ok bool, ctx *watchContext, event *fsnotify.Event) bool { func consumeEvent(ok bool, ctx *watchContext) bool {
if !ok { if !ok {
return false return false
} }
if ctx.filename != "" && filepath.Base(event.Name) != ctx.filename { if ctx.filename != "" && filepath.Base(ctx.event.Name) != ctx.filename {
log.Tracef("fsnotify irreleventa event different file %+v", event) log.Tracef("fsnotify irreleventa event different file %+v", ctx.event)
return true return true
} }
consumeRelevantEvents(ctx, event) consumeRelevantEvents(ctx)
return true return true
} }
func consumeRelevantEvents(ctx *watchContext, event *fsnotify.Event) { func consumeRelevantEvents(ctx *watchContext) {
if event.Has(ctx.interestedEvent) { if ctx.event.Has(ctx.interestedEvent) {
log.Debugf("fsnotify write event: %v", event) log.Debugf("fsnotify write event: %v", ctx.event)
ctx.callback(event.Name)
processDebounce(ctx)
} else { } else {
log.Debugf("fsnotify irrelevant event on file %v", event) log.Debugf("fsnotify irrelevant event on file %v", ctx.event)
}
}
func processDebounce(ctx *watchContext) {
entry, found := debounceWriteLog[ctx.filename]
if !found || time.Since(entry) < debounceDelay {
debounceWriteLog[ctx.filename] = time.Now()
ctx.callback(ctx.event.Name)
} else {
log.Debugf("Supressing write event because it's within the debounce delay: %v", ctx.filename)
} }
} }

View File

@@ -8,12 +8,17 @@ import (
"google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"net/http" "net/http"
"sync"
) )
var upgrader = ws.Upgrader{ var upgrader = ws.Upgrader{
CheckOrigin: checkOriginPermissive, CheckOrigin: checkOriginPermissive,
} }
var (
sendmutex = sync.Mutex{}
)
type WebsocketClient struct { type WebsocketClient struct {
conn *ws.Conn conn *ws.Conn
} }
@@ -44,6 +49,10 @@ func (WebsocketExecutionListener) OnExecutionStarted(title string) {
*/ */
} }
func OnEntityChanged() {
broadcast(&pb.EventEntityChanged{})
}
/* /*
The default checkOrigin function checks that the origin (browser) matches the The default checkOrigin function checks that the origin (browser) matches the
request origin. However in OliveTin we expect many users to deliberately proxy request origin. However in OliveTin we expect many users to deliberately proxy
@@ -113,9 +122,11 @@ func broadcast(pbmsg protoreflect.ProtoMessage) {
hackyMessage = append(hackyMessage, []byte("}")...) hackyMessage = append(hackyMessage, []byte("}")...)
// </EVIL> // </EVIL>
sendmutex.Lock()
for _, client := range clients { for _, client := range clients {
client.conn.WriteMessage(ws.TextMessage, hackyMessage) client.conn.WriteMessage(ws.TextMessage, hackyMessage)
} }
sendmutex.Unlock()
} }
func (c *WebsocketClient) messageLoop() { func (c *WebsocketClient) messageLoop() {

View File

@@ -14,6 +14,8 @@ export function initMarshaller () {
window.initialHash = window.location.hash window.initialHash = window.location.hash
window.currentSection = ''
window.addEventListener('EventExecutionFinished', onExecutionFinished) window.addEventListener('EventExecutionFinished', onExecutionFinished)
} }
@@ -99,6 +101,8 @@ function onExecutionFinished (evt) {
function showSection (title) { function showSection (title) {
title = title.replaceAll(' ', '') title = title.replaceAll(' ', '')
window.currentSection = title
for (const section of document.querySelectorAll('section')) { for (const section of document.querySelectorAll('section')) {
if (section.title === title) { if (section.title === title) {
section.style.display = 'block' section.style.display = 'block'
@@ -213,7 +217,9 @@ function marshalDashboardStructureToHtml (json) {
} }
} }
if (window.initialHash !== '' && document.body.getAttribute('initial-marshal-complete') === null) { if (window.currentSection !== '') {
showSection(window.currentSection)
} else if (window.initialHash !== '' && document.body.getAttribute('initial-marshal-complete') === null) {
showSection(window.initialHash.replace('#', '')) showSection(window.initialHash.replace('#', ''))
} else { } else {
if (rootGroup.querySelectorAll('action-button').length === 0 && json.dashboards.length > 0) { if (rootGroup.querySelectorAll('action-button').length === 0 && json.dashboards.length > 0) {

View File

@@ -42,10 +42,11 @@ function websocketOnMessage (msg) {
switch (j.type) { switch (j.type) {
case 'EventConfigChanged': case 'EventConfigChanged':
case 'EventExecutionFinished': case 'EventExecutionFinished':
case 'EventEntityChanged':
window.dispatchEvent(e) window.dispatchEvent(e)
break break
default: default:
window.showBigError('Unknown message type from server: ' + j.type) window.showBigError('ws-unhandled-message', 'handling websocket message', 'Unhandled websocket message type from server: ' + j.type, true)
} }
} }