feat: add ability to download a specific video
Some checks are pending
build / build (push) Waiting to run
Some checks are pending
build / build (push) Waiting to run
This commit is contained in:
parent
02baccf0fc
commit
fa0d5183cd
25
AGENTS.md
Normal file
25
AGENTS.md
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
# Repository Guidelines
|
||||
|
||||
## Project Structure & Module Organization
|
||||
The Go entrypoint lives in `main.go`, delegating to packages inside `internal/`. `internal/config` loads JSON config, `internal/dl` wraps yt-dlp invocation, `internal/model` defines show metadata, and `internal/format`, `metadata`, `nfo`, and `scheduler` handle parsing, artwork, and task orchestration. Sample configuration, OPML exports, and binary artifacts are kept at the repo root (`config.json`, `youtube_subs.opml`, `subsyt`, `yt-dlp`). Treat `vids/` and `tmp/` as working directories; avoid checking large media into git.
|
||||
|
||||
## Build, Test, and Development Commands
|
||||
- `CGO_ENABLED=0 go build -o subsyt`: produce the statically linked binary expected by downstream deployments.
|
||||
- `go run . --config=/path/to/config.json`: execute against a config file; `CONFIG=/abs/config.json` also works.
|
||||
- `go test ./...`: run package tests; add `-run TestName` when iterating locally.
|
||||
- `go fmt ./... && go vet ./...`: format the tree and catch common correctness issues before review.
|
||||
|
||||
## Coding Style & Naming Conventions
|
||||
Follow idiomatic Go style enforced by `gofmt`. Packages stay lowercase (`internal/dl`), exported types use `PascalCase`, and helpers remain unexported unless they serve multiple packages. Keep configuration fields aligned with the JSON schema in `internal/config`; when introducing new options, document both the struct tags and sample configs.
|
||||
|
||||
## Testing Guidelines
|
||||
Write table-driven tests in `_test.go` files alongside the code under test, using Go’s standard `testing` package. Cover OPML parsing, identifier generation, and scheduler edge cases with realistic fixtures placed under `testdata/` (create it if missing). Target branch coverage for new logic, and ensure `go test ./...` passes without external network calls by mocking downloader interfaces.
|
||||
|
||||
## Commit & Pull Request Guidelines
|
||||
Commits follow a Conventional Commits prefix (`fix:`, `refactor:`) in imperative mood, mirroring existing history (`git log --oneline`). Group related changes, and keep generated files out of the diff. Pull requests should summarize behavioural changes, note config schema updates, link to any tracking issues, and include before/after snippets or logs when touching downloader flows. Confirm the static binary still builds before requesting review.
|
||||
|
||||
## Configuration & External Tools
|
||||
Document expected locations for `yt-dlp`, cookies, and OPML files when altering defaults. If a change requires bgutil or other external services, provide startup commands and update the sample `config.json`. Ensure new options degrade gracefully for existing configs and call out migration steps in the PR description.
|
||||
|
||||
## HTTP API
|
||||
Enable the intake server with `http_api.enable` (binds to `127.0.0.1:4416`). Set `auth_token` for bearer auth and `queue_file` to persist pending jobs. Example: `curl -H "Authorization: Bearer $TOKEN" -d '{"url":"https://youtu.be/ID","out_dir":"Channel"}' http://127.0.0.1:4416/v1/videos`. Inspect the queue with `curl -H "Authorization: Bearer $TOKEN" http://127.0.0.1:4416/status`.
|
||||
35
README.md
35
README.md
|
|
@ -263,6 +263,41 @@ services:
|
|||
- 4416:4416
|
||||
```
|
||||
|
||||
## http api
|
||||
|
||||
Enable the built-in intake server to queue ad-hoc videos without editing the
|
||||
OPML export.
|
||||
|
||||
```
|
||||
{
|
||||
"http_api": {
|
||||
"enable": true,
|
||||
"listen": "127.0.0.1:4416",
|
||||
"auth_token": "super-secret",
|
||||
"queue_file": "./tmp/api-queue.json"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Submit new downloads with bearer authentication:
|
||||
|
||||
```
|
||||
curl \
|
||||
-H "Authorization: Bearer super-secret" \
|
||||
-H "Content-Type: application/json" \
|
||||
--data '{"url":"https://youtu.be/VIDEO","out_dir":"Channel"}' \
|
||||
http://127.0.0.1:4416/v1/videos
|
||||
```
|
||||
|
||||
Requests reuse the configured yt-dlp binary, honor existing throttling, and
|
||||
persist through restarts when `queue_file` is provided.
|
||||
|
||||
Check the pending queue for debugging:
|
||||
|
||||
```
|
||||
curl -H "Authorization: Bearer super-secret" http://127.0.0.1:4416/status
|
||||
```
|
||||
|
||||
## result
|
||||
|
||||
```
|
||||
|
|
|
|||
|
|
@ -21,11 +21,19 @@ type Provider struct {
|
|||
Player_client string
|
||||
}
|
||||
|
||||
type Http_api struct {
|
||||
Enable bool
|
||||
Listen string
|
||||
Auth_token string
|
||||
Queue_file string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Out_dir string
|
||||
Provider map[string]Provider
|
||||
Dry_run bool
|
||||
Daemon bool
|
||||
Http_api Http_api
|
||||
}
|
||||
|
||||
func Load(filepath string) (Config, error) {
|
||||
|
|
|
|||
|
|
@ -25,7 +25,12 @@ type Download struct {
|
|||
Metadata bool
|
||||
}
|
||||
|
||||
var upgradeMu sync.Mutex
|
||||
|
||||
func UpgradeYtDlp(cmd string) error {
|
||||
upgradeMu.Lock()
|
||||
defer upgradeMu.Unlock()
|
||||
|
||||
resolved, err := resolveYtDlpPath(cmd)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
185
internal/server/queue.go
Normal file
185
internal/server/queue.go
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type VideoRequest struct {
|
||||
URL string `json:"url"`
|
||||
OutDir string `json:"out_dir"`
|
||||
MetadataOnly bool `json:"metadata_only"`
|
||||
}
|
||||
|
||||
type queueItem struct {
|
||||
ID string `json:"id"`
|
||||
Request VideoRequest `json:"request"`
|
||||
}
|
||||
|
||||
// Queue provides a simple FIFO with optional on-disk persistence.
|
||||
type Queue struct {
|
||||
mu sync.Mutex
|
||||
items []queueItem
|
||||
persist string
|
||||
notifyChan chan struct{}
|
||||
}
|
||||
|
||||
func NewQueue(persistPath string) (*Queue, error) {
|
||||
q := &Queue{
|
||||
persist: persistPath,
|
||||
notifyChan: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
if persistPath != "" {
|
||||
if err := q.load(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (q *Queue) Enqueue(req VideoRequest) (string, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
id := generateID()
|
||||
item := queueItem{ID: id, Request: req}
|
||||
q.items = append(q.items, item)
|
||||
|
||||
if err := q.persistLocked(); err != nil {
|
||||
q.items = q.items[:len(q.items)-1]
|
||||
return "", err
|
||||
}
|
||||
|
||||
q.signal()
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (q *Queue) Next(ctx context.Context) (queueItem, error) {
|
||||
for {
|
||||
q.mu.Lock()
|
||||
if len(q.items) > 0 {
|
||||
item := q.items[0]
|
||||
q.mu.Unlock()
|
||||
return item, nil
|
||||
}
|
||||
q.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return queueItem{}, ctx.Err()
|
||||
case <-q.notifyChan:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Queue) MarkDone(id string) error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
if len(q.items) == 0 {
|
||||
return errors.New("queue is empty")
|
||||
}
|
||||
|
||||
if q.items[0].ID != id {
|
||||
// Find the item anywhere in the queue to avoid leaks if processing order is disrupted.
|
||||
index := -1
|
||||
for i, item := range q.items {
|
||||
if item.ID == id {
|
||||
index = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if index == -1 {
|
||||
return errors.New("queue item not found")
|
||||
}
|
||||
q.items = append(q.items[:index], q.items[index+1:]...)
|
||||
} else {
|
||||
q.items = q.items[1:]
|
||||
}
|
||||
|
||||
return q.persistLocked()
|
||||
}
|
||||
|
||||
func (q *Queue) signal() {
|
||||
select {
|
||||
case q.notifyChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Queue) Snapshot() []queueItem {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
items := make([]queueItem, len(q.items))
|
||||
copy(items, q.items)
|
||||
return items
|
||||
}
|
||||
|
||||
func (q *Queue) load() error {
|
||||
data, err := os.ReadFile(q.persist)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var stored []queueItem
|
||||
if len(data) == 0 {
|
||||
q.items = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &stored); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
q.items = append(q.items, stored...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *Queue) persistLocked() error {
|
||||
if q.persist == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(q.persist), 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(q.items) == 0 {
|
||||
if err := os.Remove(q.persist); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(q.items, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tmp := q.persist + ".tmp"
|
||||
if err := os.WriteFile(tmp, data, 0o644); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.Rename(tmp, q.persist)
|
||||
}
|
||||
|
||||
var idCounter uint64
|
||||
|
||||
func generateID() string {
|
||||
count := atomic.AddUint64(&idCounter, 1)
|
||||
return fmt.Sprintf("%d-%d", time.Now().UTC().UnixNano(), count)
|
||||
}
|
||||
92
internal/server/queue_test.go
Normal file
92
internal/server/queue_test.go
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestQueueEnqueueNextMarkDone(t *testing.T) {
|
||||
q, err := NewQueue("")
|
||||
if err != nil {
|
||||
t.Fatalf("new queue: %v", err)
|
||||
}
|
||||
|
||||
req := VideoRequest{URL: "https://example.com/watch?v=1", OutDir: "/tmp/videos", MetadataOnly: false}
|
||||
id, err := q.Enqueue(req)
|
||||
if err != nil {
|
||||
t.Fatalf("enqueue: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
item, err := q.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("next: %v", err)
|
||||
}
|
||||
|
||||
if item.ID != id {
|
||||
t.Fatalf("expected id %s, got %s", id, item.ID)
|
||||
}
|
||||
|
||||
if item.Request.URL != req.URL {
|
||||
t.Fatalf("unexpected url: %s", item.Request.URL)
|
||||
}
|
||||
|
||||
if err := q.MarkDone(item.ID); err != nil {
|
||||
t.Fatalf("mark done: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(20 * time.Millisecond):
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueuePersistence(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
queuePath := filepath.Join(tmpDir, "queue.json")
|
||||
|
||||
q, err := NewQueue(queuePath)
|
||||
if err != nil {
|
||||
t.Fatalf("new queue: %v", err)
|
||||
}
|
||||
|
||||
req := VideoRequest{URL: "https://example.com/watch?v=2", OutDir: "/data", MetadataOnly: true}
|
||||
id, err := q.Enqueue(req)
|
||||
if err != nil {
|
||||
t.Fatalf("enqueue: %v", err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(queuePath); err != nil {
|
||||
t.Fatalf("queue file missing: %v", err)
|
||||
}
|
||||
|
||||
q2, err := NewQueue(queuePath)
|
||||
if err != nil {
|
||||
t.Fatalf("reload queue: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
item, err := q2.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("next: %v", err)
|
||||
}
|
||||
|
||||
if item.ID != id {
|
||||
t.Fatalf("expected persisted id %s, got %s", id, item.ID)
|
||||
}
|
||||
|
||||
if err := q2.MarkDone(item.ID); err != nil {
|
||||
t.Fatalf("mark done: %v", err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(queuePath); !os.IsNotExist(err) {
|
||||
t.Fatalf("expected queue file removed after draining, got %v", err)
|
||||
}
|
||||
}
|
||||
195
internal/server/server.go
Normal file
195
internal/server/server.go
Normal file
|
|
@ -0,0 +1,195 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.meatbag.se/varl/subsyt/internal/config"
|
||||
)
|
||||
|
||||
const defaultListen = "127.0.0.1:4416"
|
||||
|
||||
type Server struct {
|
||||
cfg config.Http_api
|
||||
queue *Queue
|
||||
defaultOutDir string
|
||||
httpServer *http.Server
|
||||
shutdownSignal chan struct{}
|
||||
}
|
||||
|
||||
func NewServer(cfg config.Http_api, defaultOutDir string, queue *Queue) *Server {
|
||||
return &Server{
|
||||
cfg: cfg,
|
||||
queue: queue,
|
||||
defaultOutDir: defaultOutDir,
|
||||
shutdownSignal: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Start(ctx context.Context) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/v1/videos", s.handleVideos)
|
||||
mux.HandleFunc("/status", s.handleStatus)
|
||||
|
||||
addr := s.cfg.Listen
|
||||
if addr == "" {
|
||||
addr = defaultListen
|
||||
}
|
||||
|
||||
s.httpServer = &http.Server{
|
||||
Addr: addr,
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-s.shutdownSignal:
|
||||
}
|
||||
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if s.httpServer != nil {
|
||||
if err := s.httpServer.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Printf("http shutdown error: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err := s.httpServer.ListenAndServe()
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) Stop() {
|
||||
close(s.shutdownSignal)
|
||||
}
|
||||
|
||||
func (s *Server) handleVideos(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
w.Header().Set("Allow", http.MethodPost)
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
if !s.authorize(r.Header.Get("Authorization")) {
|
||||
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
var payload VideoRequest
|
||||
dec := json.NewDecoder(r.Body)
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&payload); err != nil {
|
||||
http.Error(w, "invalid json", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if payload.URL == "" {
|
||||
http.Error(w, "url is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := url.ParseRequestURI(payload.URL); err != nil {
|
||||
http.Error(w, "invalid url", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
payload.OutDir = s.resolveOutDir(payload.OutDir)
|
||||
if payload.OutDir == "" {
|
||||
http.Error(w, "out_dir is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
id, err := s.queue.Enqueue(payload)
|
||||
if err != nil {
|
||||
log.Printf("failed to enqueue request: %v", err)
|
||||
http.Error(w, "failed to queue request", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("queued video %s -> %s (%s)", payload.URL, payload.OutDir, id)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{
|
||||
"status": "queued",
|
||||
"id": id,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
if !s.authorize(r.Header.Get("Authorization")) {
|
||||
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
items := s.queue.Snapshot()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if err := json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"count": len(items),
|
||||
"items": items,
|
||||
}); err != nil {
|
||||
log.Printf("status encode error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) authorize(header string) bool {
|
||||
required := strings.TrimSpace(s.cfg.Auth_token)
|
||||
if required == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
if header == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
parts := strings.SplitN(header, " ", 2)
|
||||
if len(parts) != 2 {
|
||||
return false
|
||||
}
|
||||
|
||||
if !strings.EqualFold(parts[0], "Bearer") {
|
||||
return false
|
||||
}
|
||||
|
||||
return strings.TrimSpace(parts[1]) == required
|
||||
}
|
||||
|
||||
func (s *Server) resolveOutDir(raw string) string {
|
||||
root := s.defaultOutDir
|
||||
if raw == "" {
|
||||
return root
|
||||
}
|
||||
|
||||
if filepath.IsAbs(raw) {
|
||||
return filepath.Clean(raw)
|
||||
}
|
||||
|
||||
if root == "" {
|
||||
return filepath.Clean(raw)
|
||||
}
|
||||
|
||||
return filepath.Join(root, raw)
|
||||
}
|
||||
135
internal/server/server_test.go
Normal file
135
internal/server/server_test.go
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.meatbag.se/varl/subsyt/internal/config"
|
||||
)
|
||||
|
||||
func TestHandleVideosRequiresAuth(t *testing.T) {
|
||||
queue, err := NewQueue("")
|
||||
if err != nil {
|
||||
t.Fatalf("new queue: %v", err)
|
||||
}
|
||||
|
||||
s := NewServer(config.Http_api{Auth_token: "secret"}, "/videos", queue)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/videos", bytes.NewBufferString(`{"url":"https://example.com"}`))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
s.handleVideos(rec, req)
|
||||
|
||||
if rec.Result().StatusCode != http.StatusUnauthorized {
|
||||
t.Fatalf("expected unauthorized, got %d", rec.Result().StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleVideosSuccess(t *testing.T) {
|
||||
queue, err := NewQueue("")
|
||||
if err != nil {
|
||||
t.Fatalf("new queue: %v", err)
|
||||
}
|
||||
|
||||
s := NewServer(config.Http_api{}, "/videos", queue)
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"url": "https://example.com/watch?v=123",
|
||||
"out_dir": "channel",
|
||||
"metadata_only": true,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/videos", bytes.NewReader(body))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
s.handleVideos(rec, req)
|
||||
|
||||
if rec.Result().StatusCode != http.StatusAccepted {
|
||||
t.Fatalf("expected accepted, got %d", rec.Result().StatusCode)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
item, err := queue.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("next: %v", err)
|
||||
}
|
||||
|
||||
expectedOutDir := filepath.Join("/videos", "channel")
|
||||
if item.Request.OutDir != expectedOutDir {
|
||||
t.Fatalf("expected out dir %s, got %s", expectedOutDir, item.Request.OutDir)
|
||||
}
|
||||
|
||||
if !item.Request.MetadataOnly {
|
||||
t.Fatalf("expected metadata flag to be true")
|
||||
}
|
||||
|
||||
if err := queue.MarkDone(item.ID); err != nil {
|
||||
t.Fatalf("mark done: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleStatus(t *testing.T) {
|
||||
queue, err := NewQueue("")
|
||||
if err != nil {
|
||||
t.Fatalf("new queue: %v", err)
|
||||
}
|
||||
|
||||
s := NewServer(config.Http_api{Auth_token: "secret"}, "/videos", queue)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/status", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
s.handleStatus(rec, req)
|
||||
|
||||
if rec.Result().StatusCode != http.StatusUnauthorized {
|
||||
t.Fatalf("expected unauthorized, got %d", rec.Result().StatusCode)
|
||||
}
|
||||
|
||||
if _, err := queue.Enqueue(VideoRequest{URL: "https://example.com/watch?v=1", OutDir: "/videos", MetadataOnly: false}); err != nil {
|
||||
t.Fatalf("enqueue: %v", err)
|
||||
}
|
||||
|
||||
req = httptest.NewRequest(http.MethodGet, "/status", nil)
|
||||
req.Header.Set("Authorization", "Bearer secret")
|
||||
rec = httptest.NewRecorder()
|
||||
|
||||
s.handleStatus(rec, req)
|
||||
|
||||
if rec.Result().StatusCode != http.StatusOK {
|
||||
t.Fatalf("expected ok, got %d", rec.Result().StatusCode)
|
||||
}
|
||||
|
||||
var payload struct {
|
||||
Count int `json:"count"`
|
||||
Items []queueItem `json:"items"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(rec.Body).Decode(&payload); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
|
||||
if payload.Count != 1 {
|
||||
t.Fatalf("expected count 1, got %d", payload.Count)
|
||||
}
|
||||
|
||||
if len(payload.Items) != 1 {
|
||||
t.Fatalf("expected one item, got %d", len(payload.Items))
|
||||
}
|
||||
|
||||
if payload.Items[0].Request.URL == "" {
|
||||
t.Fatalf("expected item url to be set")
|
||||
}
|
||||
}
|
||||
67
main.go
67
main.go
|
|
@ -1,17 +1,21 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.meatbag.se/varl/subsyt/internal/config"
|
||||
"git.meatbag.se/varl/subsyt/internal/dl"
|
||||
"git.meatbag.se/varl/subsyt/internal/format"
|
||||
"git.meatbag.se/varl/subsyt/internal/metadata"
|
||||
"git.meatbag.se/varl/subsyt/internal/scheduler"
|
||||
"git.meatbag.se/varl/subsyt/internal/server"
|
||||
)
|
||||
|
||||
func run(cfg config.Config) {
|
||||
|
|
@ -69,6 +73,9 @@ func run(cfg config.Config) {
|
|||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
configPtr := flag.String("config", "", "path to config file")
|
||||
flag.Parse()
|
||||
|
||||
|
|
@ -90,6 +97,17 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
provider, ok := cfg.Provider["youtube"]
|
||||
if !ok {
|
||||
log.Fatal("youtube provider configuration missing")
|
||||
}
|
||||
|
||||
if cfg.Http_api.Enable {
|
||||
if err := setupAPIServer(ctx, cfg, provider); err != nil {
|
||||
log.Fatalf("failed to start http api: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Daemon {
|
||||
log.Println("running with scheduler")
|
||||
s := scheduler.Scheduler{}
|
||||
|
|
@ -99,3 +117,52 @@ func main() {
|
|||
run(cfg)
|
||||
}
|
||||
}
|
||||
|
||||
func setupAPIServer(ctx context.Context, cfg config.Config, provider config.Provider) error {
|
||||
queue, err := server.NewQueue(cfg.Http_api.Queue_file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load queue: %w", err)
|
||||
}
|
||||
|
||||
go queueWorker(ctx, queue, cfg, provider)
|
||||
|
||||
srv := server.NewServer(cfg.Http_api, cfg.Out_dir, queue)
|
||||
go func() {
|
||||
if err := srv.Start(ctx); err != nil {
|
||||
log.Printf("http api server stopped with error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func queueWorker(ctx context.Context, q *server.Queue, cfg config.Config, provider config.Provider) {
|
||||
for {
|
||||
item, err := q.Next(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
log.Printf("queue wait error: %v", err)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
outPath := filepath.Join(cfg.Out_dir, "_misc")
|
||||
|
||||
dl.Youtube(dl.Download{
|
||||
Url: item.Request.URL,
|
||||
OutDir: outPath,
|
||||
DryRun: cfg.Dry_run,
|
||||
Metadata: false,
|
||||
}, provider)
|
||||
|
||||
if err := q.MarkDone(item.ID); err != nil {
|
||||
log.Printf("queue mark done failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue