186 lines
3.2 KiB
Go
186 lines
3.2 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type VideoRequest struct {
|
|
URL string `json:"url"`
|
|
OutDir string `json:"out_dir"`
|
|
MetadataOnly bool `json:"metadata_only"`
|
|
}
|
|
|
|
type queueItem struct {
|
|
ID string `json:"id"`
|
|
Request VideoRequest `json:"request"`
|
|
}
|
|
|
|
// Queue provides a simple FIFO with optional on-disk persistence.
|
|
type Queue struct {
|
|
mu sync.Mutex
|
|
items []queueItem
|
|
persist string
|
|
notifyChan chan struct{}
|
|
}
|
|
|
|
func NewQueue(persistPath string) (*Queue, error) {
|
|
q := &Queue{
|
|
persist: persistPath,
|
|
notifyChan: make(chan struct{}, 1),
|
|
}
|
|
|
|
if persistPath != "" {
|
|
if err := q.load(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return q, nil
|
|
}
|
|
|
|
func (q *Queue) Enqueue(req VideoRequest) (string, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
id := generateID()
|
|
item := queueItem{ID: id, Request: req}
|
|
q.items = append(q.items, item)
|
|
|
|
if err := q.persistLocked(); err != nil {
|
|
q.items = q.items[:len(q.items)-1]
|
|
return "", err
|
|
}
|
|
|
|
q.signal()
|
|
return id, nil
|
|
}
|
|
|
|
func (q *Queue) Next(ctx context.Context) (queueItem, error) {
|
|
for {
|
|
q.mu.Lock()
|
|
if len(q.items) > 0 {
|
|
item := q.items[0]
|
|
q.mu.Unlock()
|
|
return item, nil
|
|
}
|
|
q.mu.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return queueItem{}, ctx.Err()
|
|
case <-q.notifyChan:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *Queue) MarkDone(id string) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
if len(q.items) == 0 {
|
|
return errors.New("queue is empty")
|
|
}
|
|
|
|
if q.items[0].ID != id {
|
|
// Find the item anywhere in the queue to avoid leaks if processing order is disrupted.
|
|
index := -1
|
|
for i, item := range q.items {
|
|
if item.ID == id {
|
|
index = i
|
|
break
|
|
}
|
|
}
|
|
if index == -1 {
|
|
return errors.New("queue item not found")
|
|
}
|
|
q.items = append(q.items[:index], q.items[index+1:]...)
|
|
} else {
|
|
q.items = q.items[1:]
|
|
}
|
|
|
|
return q.persistLocked()
|
|
}
|
|
|
|
func (q *Queue) signal() {
|
|
select {
|
|
case q.notifyChan <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (q *Queue) Snapshot() []queueItem {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
items := make([]queueItem, len(q.items))
|
|
copy(items, q.items)
|
|
return items
|
|
}
|
|
|
|
func (q *Queue) load() error {
|
|
data, err := os.ReadFile(q.persist)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
var stored []queueItem
|
|
if len(data) == 0 {
|
|
q.items = nil
|
|
return nil
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &stored); err != nil {
|
|
return err
|
|
}
|
|
|
|
q.items = append(q.items, stored...)
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) persistLocked() error {
|
|
if q.persist == "" {
|
|
return nil
|
|
}
|
|
|
|
if err := os.MkdirAll(filepath.Dir(q.persist), 0o755); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(q.items) == 0 {
|
|
if err := os.Remove(q.persist); err != nil && !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
data, err := json.MarshalIndent(q.items, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tmp := q.persist + ".tmp"
|
|
if err := os.WriteFile(tmp, data, 0o644); err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.Rename(tmp, q.persist)
|
|
}
|
|
|
|
var idCounter uint64
|
|
|
|
func generateID() string {
|
|
count := atomic.AddUint64(&idCounter, 1)
|
|
return fmt.Sprintf("%d-%d", time.Now().UTC().UnixNano(), count)
|
|
}
|