package main import ( "bytes" "encoding/json" "io" "net/http" "os" "github.com/geplauder/lithium/middlewares" "github.com/geplauder/lithium/pipelines" "github.com/geplauder/lithium/settings" "github.com/geplauder/lithium/storage" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "github.com/spf13/afero" ) const Name string = "Lithium" const Version string = "0.1.0" var GitCommit string type Metadata struct { Name string `json:"name"` Version string `json:"version"` CommitHash string `json:"commit_hash"` } func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(pipeline) if err != nil { w.WriteHeader(http.StatusInternalServerError) } } func IndexHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(Metadata{Name, Version, GitCommit}) if err != nil { w.WriteHeader(http.StatusInternalServerError) } } func writeError(w http.ResponseWriter, status int, errStr string) { w.WriteHeader(status) json.NewEncoder(w).Encode(struct { Error string `json:"error"` }{errStr}) } func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) { // open file handler formFile, handler, err := r.FormFile("file") if err != nil { writeError(w, http.StatusUnprocessableEntity, err.Error()) return } defer formFile.Close() // check pipelines form param formPipeline := r.FormValue("pipeline") if formPipeline == "" { writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing") return } var execPipe pipelines.IPipeline for _, pipe := range pipes { if formPipeline == pipe.GetSlug() { execPipe = pipe break } } if execPipe == nil { writeError(w, http.StatusUnprocessableEntity, "pipeline not found") return } bucket := r.FormValue("bucket") if bucket == "" { writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing") return } // open file file, err := handler.Open() if err != nil { writeError(w, http.StatusInternalServerError, "error reading uploaded file") return } defer file.Close() // read file to buffer buf := bytes.NewBuffer(nil) _, err = io.Copy(buf, file) if err != nil { writeError(w, http.StatusInternalServerError, "error reading file from buffer") return } // store uploaded file _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes()) if err != nil { return } // execute pipeline output, err := execPipe.Run("source.jpg", bucket, storageProvider) if err != nil { writeError(w, http.StatusInternalServerError, "error executing pipeline") return } w.Header().Set("Content-Type", "application/json") err = json.NewEncoder(w).Encode(struct { Message string `json:"message"` OutputFiles []string `json:"output_files"` }{"ok", []string{output}}) if err != nil { w.WriteHeader(http.StatusInternalServerError) } logrus.Info("Pipeline routes registered successfully") } func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) { index := r.Methods(http.MethodGet).Subrouter() index.HandleFunc("/", IndexHandler) upload := r.Methods(http.MethodPost).Subrouter() upload.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) { UploadHandler(w, r, pipelines, storageProvider) }) pipeline := r.Methods(http.MethodGet).Subrouter() pipeline.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) { for _, pipeline := range pipelines { if pipeline.GetSlug() == mux.Vars(r)["pipeline"] { PipelineHandler(pipeline, storageProvider, w, r) return } } w.WriteHeader(404) }) } func main() { logFile, err := os.OpenFile("lithium.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err == nil { multiWriter := io.MultiWriter(os.Stdout, logFile) logrus.SetOutput(multiWriter) } else { logrus.SetOutput(os.Stdout) } logrus.SetFormatter(&logrus.JSONFormatter{}) appSettings, err := settings.LoadSettings(afero.NewOsFs()) if err != nil { logrus.Fatal("Unexpected error while loading settings: ", err) } var storageProvider storage.IStorageProvider if appSettings.StorageProvider.Type == 0 { storageProvider = storage.GetFileSystemStorageProvider(appSettings.StorageProvider.BasePath, "") } else { logrus.WithFields(logrus.Fields{ "StorageProviderType": appSettings.StorageProvider.Type, }).Fatal("Invalid file system provided") } pipes := pipelines.LoadPipelines() r := mux.NewRouter() if appSettings.Authentication.Enabled { authMiddleware := middlewares.CreateAuthenticationMiddleware(appSettings.Authentication.Token) r.Use(authMiddleware.Middleware) } if appSettings.RateLimiter.Enabled { rateLimiterMiddleware, err := middlewares.CreateRateLimiterMiddleware(appSettings.RateLimiter.RequestsPerMinute, appSettings.RateLimiter.AllowedBurst) if err != nil { logrus.Fatal("Unexpected error while creating rate limiter middleware: ", err) } r.Use(rateLimiterMiddleware.Middleware) } RegisterRoutes(r, pipes, storageProvider) logrus.Info("Lithium started, listening for requests...") err = http.ListenAndServe(appSettings.Endpoint, r) if err != nil { logrus.Fatal("Unexpected error while serving http server: ", err) } }