main.go 6.37 KB
Newer Older
1
2
3
4
package main

import (
	"fmt"
ci's avatar
ci committed
5
	"github.com/lbryio/lbry.go/extras/errors"
6
7
8
9
	"math/rand"
	"os"
	"time"

10
	"github.com/lbryio/lbry.go/extras/util"
11
	"github.com/lbryio/ytsync/sdk"
ci's avatar
ci committed
12
	log "github.com/sirupsen/logrus"
13
14
	"github.com/spf13/cobra"

15
	"github.com/lbryio/ytsync/manager"
16
	ytUtils "github.com/lbryio/ytsync/util"
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
)

var Version string

const defaultMaxTries = 3

var (
	stopOnError             bool
	maxTries                int
	takeOverExistingChannel bool
	refill                  int
	limit                   int
	skipSpaceCheck          bool
	syncUpdate              bool
	singleRun               bool
	syncStatus              string
	channelID               string
	syncFrom                int64
	syncUntil               int64
	concurrentJobs          int
	videosLimit             int
	maxVideoSize            int
ci's avatar
ci committed
39
	maxVideoLength          float64
ci's avatar
ci committed
40
	removeDBUnpublished     bool
ci's avatar
ci committed
41
	upgradeMetadata         bool
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetLevel(log.DebugLevel)

	cmd := &cobra.Command{
		Use:   "ytsync",
		Short: "Publish youtube channels into LBRY network automatically.",
		Run:   ytSync,
		Args:  cobra.RangeArgs(0, 0),
	}

	cmd.Flags().BoolVar(&stopOnError, "stop-on-error", false, "If a publish fails, stop all publishing and exit")
	cmd.Flags().IntVar(&maxTries, "max-tries", defaultMaxTries, "Number of times to try a publish that fails")
	cmd.Flags().BoolVar(&takeOverExistingChannel, "takeover-existing-channel", false, "If channel exists and we don't own it, take over the channel")
	cmd.Flags().IntVar(&limit, "limit", 0, "limit the amount of channels to sync")
	cmd.Flags().BoolVar(&skipSpaceCheck, "skip-space-check", false, "Do not perform free space check on startup")
	cmd.Flags().BoolVar(&syncUpdate, "update", false, "Update previously synced channels instead of syncing new ones")
	cmd.Flags().BoolVar(&singleRun, "run-once", false, "Whether the process should be stopped after one cycle or not")
ci's avatar
ci committed
62
63
	cmd.Flags().BoolVar(&removeDBUnpublished, "remove-db-unpublished", false, "Remove videos from the database that are marked as published but aren't really published")
	cmd.Flags().BoolVar(&upgradeMetadata, "upgrade-metadata", false, "Upgrade videos if they're on the old metadata version")
64
65
66
	cmd.Flags().StringVar(&syncStatus, "status", "", "Specify which queue to pull from. Overrides --update")
	cmd.Flags().StringVar(&channelID, "channelID", "", "If specified, only this channel will be synced.")
	cmd.Flags().Int64Var(&syncFrom, "after", time.Unix(0, 0).Unix(), "Specify from when to pull jobs [Unix time](Default: 0)")
ci's avatar
ci committed
67
	cmd.Flags().Int64Var(&syncUntil, "before", time.Now().AddDate(1, 0, 0).Unix(), "Specify until when to pull jobs [Unix time](Default: current Unix time)")
68
69
70
	cmd.Flags().IntVar(&concurrentJobs, "concurrent-jobs", 1, "how many jobs to process concurrently")
	cmd.Flags().IntVar(&videosLimit, "videos-limit", 1000, "how many videos to process per channel")
	cmd.Flags().IntVar(&maxVideoSize, "max-size", 2048, "Maximum video size to process (in MB)")
ci's avatar
ci committed
71
	cmd.Flags().Float64Var(&maxVideoLength, "max-length", 2.0, "Maximum video length to process (in hours)")
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

	if err := cmd.Execute(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
}

func ytSync(cmd *cobra.Command, args []string) {
	var hostname string
	slackToken := os.Getenv("SLACK_TOKEN")
	if slackToken == "" {
		log.Error("A slack token was not present in env vars! Slack messages disabled!")
	} else {
		var err error
		hostname, err = os.Hostname()
		if err != nil {
			log.Error("could not detect system hostname")
			hostname = "ytsync-unknown"
		}
Mark Beamer Jr's avatar
ci test    
Mark Beamer Jr committed
91
92
		if len(hostname) > 30 {
			hostname = hostname[0:30]
Mark Beamer Jr's avatar
ci test    
Mark Beamer Jr committed
93
94
		}

95
96
97
		util.InitSlack(os.Getenv("SLACK_TOKEN"), os.Getenv("SLACK_CHANNEL"), hostname)
	}

98
99
	if syncStatus != "" && !util.InSlice(syncStatus, manager.SyncStatuses) {
		log.Errorf("status must be one of the following: %v\n", manager.SyncStatuses)
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
		return
	}

	if stopOnError && maxTries != defaultMaxTries {
		log.Errorln("--stop-on-error and --max-tries are mutually exclusive")
		return
	}
	if maxTries < 1 {
		log.Errorln("setting --max-tries less than 1 doesn't make sense")
		return
	}

	if limit < 0 {
		log.Errorln("setting --limit less than 0 (unlimited) doesn't make sense")
		return
	}

ci's avatar
ci committed
117
	apiURL := os.Getenv("LBRY_WEB_API")
118
119
120
121
122
123
124
125
	apiToken := os.Getenv("LBRY_API_TOKEN")
	youtubeAPIKey := os.Getenv("YOUTUBE_API_KEY")
	lbrycrdString := os.Getenv("LBRYCRD_STRING")
	awsS3ID := os.Getenv("AWS_S3_ID")
	awsS3Secret := os.Getenv("AWS_S3_SECRET")
	awsS3Region := os.Getenv("AWS_S3_REGION")
	awsS3Bucket := os.Getenv("AWS_S3_BUCKET")
	if apiURL == "" {
ci's avatar
ci committed
126
		log.Errorln("An API URL was not defined. Please set the environment variable LBRY_WEB_API")
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
		return
	}
	if apiToken == "" {
		log.Errorln("An API Token was not defined. Please set the environment variable LBRY_API_TOKEN")
		return
	}
	if youtubeAPIKey == "" {
		log.Errorln("A Youtube API key was not defined. Please set the environment variable YOUTUBE_API_KEY")
		return
	}
	if awsS3ID == "" {
		log.Errorln("AWS S3 ID credentials were not defined. Please set the environment variable AWS_S3_ID")
		return
	}
	if awsS3Secret == "" {
		log.Errorln("AWS S3 Secret credentials were not defined. Please set the environment variable AWS_S3_SECRET")
		return
	}
	if awsS3Region == "" {
		log.Errorln("AWS S3 Region was not defined. Please set the environment variable AWS_S3_REGION")
		return
	}
	if awsS3Bucket == "" {
		log.Errorln("AWS S3 Bucket was not defined. Please set the environment variable AWS_S3_BUCKET")
		return
	}
	if lbrycrdString == "" {
		log.Infoln("Using default (local) lbrycrd instance. Set LBRYCRD_STRING if you want to use something else")
	}
156
157

	blobsDir := ytUtils.GetBlobsDir()
158
159
160
161
162
163
164
165
166
167
168
169

	syncProperties := &sdk.SyncProperties{
		SyncFrom:         syncFrom,
		SyncUntil:        syncUntil,
		YoutubeChannelID: channelID,
	}
	apiConfig := &sdk.APIConfig{
		YoutubeAPIKey: youtubeAPIKey,
		ApiURL:        apiURL,
		ApiToken:      apiToken,
		HostName:      hostname,
	}
170
	sm := manager.NewSyncManager(
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
		stopOnError,
		maxTries,
		takeOverExistingChannel,
		refill,
		limit,
		skipSpaceCheck,
		syncUpdate,
		concurrentJobs,
		concurrentJobs,
		blobsDir,
		videosLimit,
		maxVideoSize,
		lbrycrdString,
		awsS3ID,
		awsS3Secret,
		awsS3Region,
		awsS3Bucket,
		syncStatus,
		singleRun,
		syncProperties,
		apiConfig,
ci's avatar
ci committed
192
		maxVideoLength,
ci's avatar
ci committed
193
		removeDBUnpublished,
ci's avatar
ci committed
194
		upgradeMetadata,
195
196
197
	)
	err := sm.Start()
	if err != nil {
ci's avatar
ci committed
198
		ytUtils.SendErrorToSlack(errors.FullTrace(err))
199
	}
200
	ytUtils.SendInfoToSlack("Syncing process terminated!")
201
}