diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..baa05bb --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,25 @@ +# Repository Guidelines + +## Project Structure & Module Organization +The Go entrypoint lives in `main.go`, delegating to packages inside `internal/`. `internal/config` loads JSON config, `internal/dl` wraps yt-dlp invocation, `internal/model` defines show metadata, and `internal/format`, `metadata`, `nfo`, and `scheduler` handle parsing, artwork, and task orchestration. Sample configuration, OPML exports, and binary artifacts are kept at the repo root (`config.json`, `youtube_subs.opml`, `subsyt`, `yt-dlp`). Treat `vids/` and `tmp/` as working directories; avoid checking large media into git. + +## Build, Test, and Development Commands +- `CGO_ENABLED=0 go build -o subsyt`: produce the statically linked binary expected by downstream deployments. +- `go run . --config=/path/to/config.json`: execute against a config file; `CONFIG=/abs/config.json` also works. +- `go test ./...`: run package tests; add `-run TestName` when iterating locally. +- `go fmt ./... && go vet ./...`: format the tree and catch common correctness issues before review. + +## Coding Style & Naming Conventions +Follow idiomatic Go style enforced by `gofmt`. Packages stay lowercase (`internal/dl`), exported types use `PascalCase`, and helpers remain unexported unless they serve multiple packages. Keep configuration fields aligned with the JSON schema in `internal/config`; when introducing new options, document both the struct tags and sample configs. + +## Testing Guidelines +Write table-driven tests in `_test.go` files alongside the code under test, using Go’s standard `testing` package. Cover OPML parsing, identifier generation, and scheduler edge cases with realistic fixtures placed under `testdata/` (create it if missing). Target branch coverage for new logic, and ensure `go test ./...` passes without external network calls by mocking downloader interfaces. + +## Commit & Pull Request Guidelines +Commits follow a Conventional Commits prefix (`fix:`, `refactor:`) in imperative mood, mirroring existing history (`git log --oneline`). Group related changes, and keep generated files out of the diff. Pull requests should summarize behavioural changes, note config schema updates, link to any tracking issues, and include before/after snippets or logs when touching downloader flows. Confirm the static binary still builds before requesting review. + +## Configuration & External Tools +Document expected locations for `yt-dlp`, cookies, and OPML files when altering defaults. If a change requires bgutil or other external services, provide startup commands and update the sample `config.json`. Ensure new options degrade gracefully for existing configs and call out migration steps in the PR description. + +## HTTP API +Enable the intake server with `http_api.enable` (binds to `127.0.0.1:4416`). Set `auth_token` for bearer auth and `queue_file` to persist pending jobs. Example: `curl -H "Authorization: Bearer $TOKEN" -d '{"url":"https://youtu.be/ID","out_dir":"Channel"}' http://127.0.0.1:4416/v1/videos`. Inspect the queue with `curl -H "Authorization: Bearer $TOKEN" http://127.0.0.1:4416/status`. diff --git a/README.md b/README.md index f5b9039..c707aec 100644 --- a/README.md +++ b/README.md @@ -263,6 +263,41 @@ services: - 4416:4416 ``` +## http api + +Enable the built-in intake server to queue ad-hoc videos without editing the +OPML export. + +``` +{ + "http_api": { + "enable": true, + "listen": "127.0.0.1:4416", + "auth_token": "super-secret", + "queue_file": "./tmp/api-queue.json" + } +} +``` + +Submit new downloads with bearer authentication: + +``` +curl \ + -H "Authorization: Bearer super-secret" \ + -H "Content-Type: application/json" \ + --data '{"url":"https://youtu.be/VIDEO","out_dir":"Channel"}' \ + http://127.0.0.1:4416/v1/videos +``` + +Requests reuse the configured yt-dlp binary, honor existing throttling, and +persist through restarts when `queue_file` is provided. + +Check the pending queue for debugging: + +``` +curl -H "Authorization: Bearer super-secret" http://127.0.0.1:4416/status +``` + ## result ``` diff --git a/internal/config/config.go b/internal/config/config.go index 53945df..8ad4436 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -21,11 +21,19 @@ type Provider struct { Player_client string } +type Http_api struct { + Enable bool + Listen string + Auth_token string + Queue_file string +} + type Config struct { Out_dir string Provider map[string]Provider Dry_run bool Daemon bool + Http_api Http_api } func Load(filepath string) (Config, error) { diff --git a/internal/dl/dl.go b/internal/dl/dl.go index a51a9af..36ceb0e 100644 --- a/internal/dl/dl.go +++ b/internal/dl/dl.go @@ -25,7 +25,12 @@ type Download struct { Metadata bool } +var upgradeMu sync.Mutex + func UpgradeYtDlp(cmd string) error { + upgradeMu.Lock() + defer upgradeMu.Unlock() + resolved, err := resolveYtDlpPath(cmd) if err != nil { return err diff --git a/internal/server/queue.go b/internal/server/queue.go new file mode 100644 index 0000000..14766dd --- /dev/null +++ b/internal/server/queue.go @@ -0,0 +1,185 @@ +package server + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" +) + +type VideoRequest struct { + URL string `json:"url"` + OutDir string `json:"out_dir"` + MetadataOnly bool `json:"metadata_only"` +} + +type queueItem struct { + ID string `json:"id"` + Request VideoRequest `json:"request"` +} + +// Queue provides a simple FIFO with optional on-disk persistence. +type Queue struct { + mu sync.Mutex + items []queueItem + persist string + notifyChan chan struct{} +} + +func NewQueue(persistPath string) (*Queue, error) { + q := &Queue{ + persist: persistPath, + notifyChan: make(chan struct{}, 1), + } + + if persistPath != "" { + if err := q.load(); err != nil { + return nil, err + } + } + + return q, nil +} + +func (q *Queue) Enqueue(req VideoRequest) (string, error) { + q.mu.Lock() + defer q.mu.Unlock() + + id := generateID() + item := queueItem{ID: id, Request: req} + q.items = append(q.items, item) + + if err := q.persistLocked(); err != nil { + q.items = q.items[:len(q.items)-1] + return "", err + } + + q.signal() + return id, nil +} + +func (q *Queue) Next(ctx context.Context) (queueItem, error) { + for { + q.mu.Lock() + if len(q.items) > 0 { + item := q.items[0] + q.mu.Unlock() + return item, nil + } + q.mu.Unlock() + + select { + case <-ctx.Done(): + return queueItem{}, ctx.Err() + case <-q.notifyChan: + } + } +} + +func (q *Queue) MarkDone(id string) error { + q.mu.Lock() + defer q.mu.Unlock() + + if len(q.items) == 0 { + return errors.New("queue is empty") + } + + if q.items[0].ID != id { + // Find the item anywhere in the queue to avoid leaks if processing order is disrupted. + index := -1 + for i, item := range q.items { + if item.ID == id { + index = i + break + } + } + if index == -1 { + return errors.New("queue item not found") + } + q.items = append(q.items[:index], q.items[index+1:]...) + } else { + q.items = q.items[1:] + } + + return q.persistLocked() +} + +func (q *Queue) signal() { + select { + case q.notifyChan <- struct{}{}: + default: + } +} + +func (q *Queue) Snapshot() []queueItem { + q.mu.Lock() + defer q.mu.Unlock() + + items := make([]queueItem, len(q.items)) + copy(items, q.items) + return items +} + +func (q *Queue) load() error { + data, err := os.ReadFile(q.persist) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + var stored []queueItem + if len(data) == 0 { + q.items = nil + return nil + } + + if err := json.Unmarshal(data, &stored); err != nil { + return err + } + + q.items = append(q.items, stored...) + return nil +} + +func (q *Queue) persistLocked() error { + if q.persist == "" { + return nil + } + + if err := os.MkdirAll(filepath.Dir(q.persist), 0o755); err != nil { + return err + } + + if len(q.items) == 0 { + if err := os.Remove(q.persist); err != nil && !os.IsNotExist(err) { + return err + } + return nil + } + + data, err := json.MarshalIndent(q.items, "", " ") + if err != nil { + return err + } + + tmp := q.persist + ".tmp" + if err := os.WriteFile(tmp, data, 0o644); err != nil { + return err + } + + return os.Rename(tmp, q.persist) +} + +var idCounter uint64 + +func generateID() string { + count := atomic.AddUint64(&idCounter, 1) + return fmt.Sprintf("%d-%d", time.Now().UTC().UnixNano(), count) +} diff --git a/internal/server/queue_test.go b/internal/server/queue_test.go new file mode 100644 index 0000000..7c7533b --- /dev/null +++ b/internal/server/queue_test.go @@ -0,0 +1,92 @@ +package server + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" +) + +func TestQueueEnqueueNextMarkDone(t *testing.T) { + q, err := NewQueue("") + if err != nil { + t.Fatalf("new queue: %v", err) + } + + req := VideoRequest{URL: "https://example.com/watch?v=1", OutDir: "/tmp/videos", MetadataOnly: false} + id, err := q.Enqueue(req) + if err != nil { + t.Fatalf("enqueue: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + item, err := q.Next(ctx) + if err != nil { + t.Fatalf("next: %v", err) + } + + if item.ID != id { + t.Fatalf("expected id %s, got %s", id, item.ID) + } + + if item.Request.URL != req.URL { + t.Fatalf("unexpected url: %s", item.Request.URL) + } + + if err := q.MarkDone(item.ID); err != nil { + t.Fatalf("mark done: %v", err) + } + + select { + case <-time.After(20 * time.Millisecond): + default: + } +} + +func TestQueuePersistence(t *testing.T) { + tmpDir := t.TempDir() + queuePath := filepath.Join(tmpDir, "queue.json") + + q, err := NewQueue(queuePath) + if err != nil { + t.Fatalf("new queue: %v", err) + } + + req := VideoRequest{URL: "https://example.com/watch?v=2", OutDir: "/data", MetadataOnly: true} + id, err := q.Enqueue(req) + if err != nil { + t.Fatalf("enqueue: %v", err) + } + + if _, err := os.Stat(queuePath); err != nil { + t.Fatalf("queue file missing: %v", err) + } + + q2, err := NewQueue(queuePath) + if err != nil { + t.Fatalf("reload queue: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + item, err := q2.Next(ctx) + if err != nil { + t.Fatalf("next: %v", err) + } + + if item.ID != id { + t.Fatalf("expected persisted id %s, got %s", id, item.ID) + } + + if err := q2.MarkDone(item.ID); err != nil { + t.Fatalf("mark done: %v", err) + } + + if _, err := os.Stat(queuePath); !os.IsNotExist(err) { + t.Fatalf("expected queue file removed after draining, got %v", err) + } +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..1e7de06 --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,195 @@ +package server + +import ( + "context" + "encoding/json" + "errors" + "log" + "net/http" + "net/url" + "path/filepath" + "strings" + "time" + + "git.meatbag.se/varl/subsyt/internal/config" +) + +const defaultListen = "127.0.0.1:4416" + +type Server struct { + cfg config.Http_api + queue *Queue + defaultOutDir string + httpServer *http.Server + shutdownSignal chan struct{} +} + +func NewServer(cfg config.Http_api, defaultOutDir string, queue *Queue) *Server { + return &Server{ + cfg: cfg, + queue: queue, + defaultOutDir: defaultOutDir, + shutdownSignal: make(chan struct{}), + } +} + +func (s *Server) Start(ctx context.Context) error { + mux := http.NewServeMux() + mux.HandleFunc("/v1/videos", s.handleVideos) + mux.HandleFunc("/status", s.handleStatus) + + addr := s.cfg.Listen + if addr == "" { + addr = defaultListen + } + + s.httpServer = &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + go func() { + select { + case <-ctx.Done(): + case <-s.shutdownSignal: + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if s.httpServer != nil { + if err := s.httpServer.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Printf("http shutdown error: %v", err) + } + } + }() + + err := s.httpServer.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + + return nil +} + +func (s *Server) Stop() { + close(s.shutdownSignal) +} + +func (s *Server) handleVideos(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.Header().Set("Allow", http.MethodPost) + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + if !s.authorize(r.Header.Get("Authorization")) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + var payload VideoRequest + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + if err := dec.Decode(&payload); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + + if payload.URL == "" { + http.Error(w, "url is required", http.StatusBadRequest) + return + } + + if _, err := url.ParseRequestURI(payload.URL); err != nil { + http.Error(w, "invalid url", http.StatusBadRequest) + return + } + + payload.OutDir = s.resolveOutDir(payload.OutDir) + if payload.OutDir == "" { + http.Error(w, "out_dir is required", http.StatusBadRequest) + return + } + + id, err := s.queue.Enqueue(payload) + if err != nil { + log.Printf("failed to enqueue request: %v", err) + http.Error(w, "failed to queue request", http.StatusInternalServerError) + return + } + + log.Printf("queued video %s -> %s (%s)", payload.URL, payload.OutDir, id) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + _ = json.NewEncoder(w).Encode(map[string]string{ + "status": "queued", + "id": id, + }) +} + +func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + if !s.authorize(r.Header.Get("Authorization")) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + items := s.queue.Snapshot() + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]interface{}{ + "count": len(items), + "items": items, + }); err != nil { + log.Printf("status encode error: %v", err) + } +} + +func (s *Server) authorize(header string) bool { + required := strings.TrimSpace(s.cfg.Auth_token) + if required == "" { + return true + } + + if header == "" { + return false + } + + parts := strings.SplitN(header, " ", 2) + if len(parts) != 2 { + return false + } + + if !strings.EqualFold(parts[0], "Bearer") { + return false + } + + return strings.TrimSpace(parts[1]) == required +} + +func (s *Server) resolveOutDir(raw string) string { + root := s.defaultOutDir + if raw == "" { + return root + } + + if filepath.IsAbs(raw) { + return filepath.Clean(raw) + } + + if root == "" { + return filepath.Clean(raw) + } + + return filepath.Join(root, raw) +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go new file mode 100644 index 0000000..d659061 --- /dev/null +++ b/internal/server/server_test.go @@ -0,0 +1,135 @@ +package server + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" + + "git.meatbag.se/varl/subsyt/internal/config" +) + +func TestHandleVideosRequiresAuth(t *testing.T) { + queue, err := NewQueue("") + if err != nil { + t.Fatalf("new queue: %v", err) + } + + s := NewServer(config.Http_api{Auth_token: "secret"}, "/videos", queue) + + req := httptest.NewRequest(http.MethodPost, "/v1/videos", bytes.NewBufferString(`{"url":"https://example.com"}`)) + rec := httptest.NewRecorder() + + s.handleVideos(rec, req) + + if rec.Result().StatusCode != http.StatusUnauthorized { + t.Fatalf("expected unauthorized, got %d", rec.Result().StatusCode) + } +} + +func TestHandleVideosSuccess(t *testing.T) { + queue, err := NewQueue("") + if err != nil { + t.Fatalf("new queue: %v", err) + } + + s := NewServer(config.Http_api{}, "/videos", queue) + + payload := map[string]interface{}{ + "url": "https://example.com/watch?v=123", + "out_dir": "channel", + "metadata_only": true, + } + + body, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/v1/videos", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + s.handleVideos(rec, req) + + if rec.Result().StatusCode != http.StatusAccepted { + t.Fatalf("expected accepted, got %d", rec.Result().StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + item, err := queue.Next(ctx) + if err != nil { + t.Fatalf("next: %v", err) + } + + expectedOutDir := filepath.Join("/videos", "channel") + if item.Request.OutDir != expectedOutDir { + t.Fatalf("expected out dir %s, got %s", expectedOutDir, item.Request.OutDir) + } + + if !item.Request.MetadataOnly { + t.Fatalf("expected metadata flag to be true") + } + + if err := queue.MarkDone(item.ID); err != nil { + t.Fatalf("mark done: %v", err) + } +} + +func TestHandleStatus(t *testing.T) { + queue, err := NewQueue("") + if err != nil { + t.Fatalf("new queue: %v", err) + } + + s := NewServer(config.Http_api{Auth_token: "secret"}, "/videos", queue) + + req := httptest.NewRequest(http.MethodGet, "/status", nil) + rec := httptest.NewRecorder() + + s.handleStatus(rec, req) + + if rec.Result().StatusCode != http.StatusUnauthorized { + t.Fatalf("expected unauthorized, got %d", rec.Result().StatusCode) + } + + if _, err := queue.Enqueue(VideoRequest{URL: "https://example.com/watch?v=1", OutDir: "/videos", MetadataOnly: false}); err != nil { + t.Fatalf("enqueue: %v", err) + } + + req = httptest.NewRequest(http.MethodGet, "/status", nil) + req.Header.Set("Authorization", "Bearer secret") + rec = httptest.NewRecorder() + + s.handleStatus(rec, req) + + if rec.Result().StatusCode != http.StatusOK { + t.Fatalf("expected ok, got %d", rec.Result().StatusCode) + } + + var payload struct { + Count int `json:"count"` + Items []queueItem `json:"items"` + } + + if err := json.NewDecoder(rec.Body).Decode(&payload); err != nil { + t.Fatalf("decode: %v", err) + } + + if payload.Count != 1 { + t.Fatalf("expected count 1, got %d", payload.Count) + } + + if len(payload.Items) != 1 { + t.Fatalf("expected one item, got %d", len(payload.Items)) + } + + if payload.Items[0].Request.URL == "" { + t.Fatalf("expected item url to be set") + } +} diff --git a/main.go b/main.go index 14a257b..ab34a8a 100644 --- a/main.go +++ b/main.go @@ -1,17 +1,21 @@ package main import ( + "context" + "errors" "flag" "fmt" "log" "os" "path/filepath" + "time" "git.meatbag.se/varl/subsyt/internal/config" "git.meatbag.se/varl/subsyt/internal/dl" "git.meatbag.se/varl/subsyt/internal/format" "git.meatbag.se/varl/subsyt/internal/metadata" "git.meatbag.se/varl/subsyt/internal/scheduler" + "git.meatbag.se/varl/subsyt/internal/server" ) func run(cfg config.Config) { @@ -69,6 +73,9 @@ func run(cfg config.Config) { } func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + configPtr := flag.String("config", "", "path to config file") flag.Parse() @@ -90,6 +97,17 @@ func main() { panic(err) } + provider, ok := cfg.Provider["youtube"] + if !ok { + log.Fatal("youtube provider configuration missing") + } + + if cfg.Http_api.Enable { + if err := setupAPIServer(ctx, cfg, provider); err != nil { + log.Fatalf("failed to start http api: %v", err) + } + } + if cfg.Daemon { log.Println("running with scheduler") s := scheduler.Scheduler{} @@ -99,3 +117,52 @@ func main() { run(cfg) } } + +func setupAPIServer(ctx context.Context, cfg config.Config, provider config.Provider) error { + queue, err := server.NewQueue(cfg.Http_api.Queue_file) + if err != nil { + return fmt.Errorf("load queue: %w", err) + } + + go queueWorker(ctx, queue, cfg, provider) + + srv := server.NewServer(cfg.Http_api, cfg.Out_dir, queue) + go func() { + if err := srv.Start(ctx); err != nil { + log.Printf("http api server stopped with error: %v", err) + } + }() + + return nil +} + +func queueWorker(ctx context.Context, q *server.Queue, cfg config.Config, provider config.Provider) { + for { + item, err := q.Next(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.Printf("queue wait error: %v", err) + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + continue + } + + outPath := filepath.Join(cfg.Out_dir, "_misc") + + dl.Youtube(dl.Download{ + Url: item.Request.URL, + OutDir: outPath, + DryRun: cfg.Dry_run, + Metadata: false, + }, provider) + + if err := q.MarkDone(item.ID); err != nil { + log.Printf("queue mark done failed: %v", err) + } + } +}