subsyt/internal/server/queue.go
Viktor Varland 6cd9860681
Some checks are pending
build / build (push) Waiting to run
fix: avoid bind mount read errors
2025-09-30 12:39:49 +02:00

208 lines
3.5 KiB
Go

package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"syscall"
"time"
)
var (
osWriteFile = os.WriteFile
osRename = os.Rename
)
type VideoRequest struct {
URL string `json:"url"`
OutDir string `json:"out_dir"`
MetadataOnly bool `json:"metadata_only"`
}
type queueItem struct {
ID string `json:"id"`
Request VideoRequest `json:"request"`
}
// Queue provides a simple FIFO with optional on-disk persistence.
type Queue struct {
mu sync.Mutex
items []queueItem
persist string
notifyChan chan struct{}
}
func NewQueue(persistPath string) (*Queue, error) {
q := &Queue{
persist: persistPath,
notifyChan: make(chan struct{}, 1),
}
if persistPath != "" {
if err := q.load(); err != nil {
return nil, err
}
}
return q, nil
}
func (q *Queue) Enqueue(req VideoRequest) (string, error) {
q.mu.Lock()
defer q.mu.Unlock()
id := generateID()
item := queueItem{ID: id, Request: req}
q.items = append(q.items, item)
if err := q.persistLocked(); err != nil {
q.items = q.items[:len(q.items)-1]
return "", err
}
q.signal()
return id, nil
}
func (q *Queue) Next(ctx context.Context) (queueItem, error) {
for {
q.mu.Lock()
if len(q.items) > 0 {
item := q.items[0]
q.mu.Unlock()
return item, nil
}
q.mu.Unlock()
select {
case <-ctx.Done():
return queueItem{}, ctx.Err()
case <-q.notifyChan:
}
}
}
func (q *Queue) MarkDone(id string) error {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.items) == 0 {
return errors.New("queue is empty")
}
if q.items[0].ID != id {
// Find the item anywhere in the queue to avoid leaks if processing order is disrupted.
index := -1
for i, item := range q.items {
if item.ID == id {
index = i
break
}
}
if index == -1 {
return errors.New("queue item not found")
}
q.items = append(q.items[:index], q.items[index+1:]...)
} else {
q.items = q.items[1:]
}
return q.persistLocked()
}
func (q *Queue) signal() {
select {
case q.notifyChan <- struct{}{}:
default:
}
}
func (q *Queue) Snapshot() []queueItem {
q.mu.Lock()
defer q.mu.Unlock()
items := make([]queueItem, len(q.items))
copy(items, q.items)
return items
}
func (q *Queue) load() error {
data, err := os.ReadFile(q.persist)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
var stored []queueItem
if len(data) == 0 {
q.items = nil
return nil
}
if err := json.Unmarshal(data, &stored); err != nil {
return err
}
q.items = append(q.items, stored...)
return nil
}
func (q *Queue) persistLocked() error {
if q.persist == "" {
return nil
}
if err := os.MkdirAll(filepath.Dir(q.persist), 0o755); err != nil {
return err
}
if len(q.items) == 0 {
if err := os.Remove(q.persist); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
data, err := json.MarshalIndent(q.items, "", " ")
if err != nil {
return err
}
return writeFileAtomic(q.persist, data)
}
var idCounter uint64
func generateID() string {
count := atomic.AddUint64(&idCounter, 1)
return fmt.Sprintf("%d-%d", time.Now().UTC().UnixNano(), count)
}
func writeFileAtomic(path string, data []byte) error {
tmp := path + ".tmp"
if err := osWriteFile(tmp, data, 0o644); err != nil {
return err
}
if err := osRename(tmp, path); err != nil {
if errors.Is(err, syscall.EBUSY) {
if writeErr := osWriteFile(path, data, 0o644); writeErr != nil {
_ = os.Remove(tmp)
return writeErr
}
return os.Remove(tmp)
}
_ = os.Remove(tmp)
return err
}
return nil
}