package controllers import ( "bytes" "encoding/json" "github.com/geplauder/lithium/pipelines" "github.com/geplauder/lithium/storage" "github.com/sirupsen/logrus" "io" "net/http" ) func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) { // open file handler formFile, handler, err := r.FormFile("file") if err != nil { writeError(w, http.StatusUnprocessableEntity, err.Error()) return } defer formFile.Close() // check pipelines form param formPipeline := r.FormValue("pipeline") if formPipeline == "" { writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing") return } var execPipe pipelines.IPipeline for _, pipe := range pipes { if formPipeline == pipe.GetSlug() { execPipe = pipe break } } if execPipe == nil { writeError(w, http.StatusUnprocessableEntity, "pipeline not found") return } bucket := r.FormValue("bucket") if bucket == "" { writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing") return } // open file file, err := handler.Open() if err != nil { writeError(w, http.StatusInternalServerError, "error reading uploaded file") return } defer file.Close() // read file to buffer buf := bytes.NewBuffer(nil) _, err = io.Copy(buf, file) if err != nil { writeError(w, http.StatusInternalServerError, "error reading file from buffer") return } // store uploaded file _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes()) if err != nil { return } // execute pipeline output, err := execPipe.Run("source.jpg", bucket, storageProvider) if err != nil { writeError(w, http.StatusInternalServerError, "error executing pipeline") return } w.Header().Set("Content-Type", "application/json") err = json.NewEncoder(w).Encode(struct { Message string `json:"message"` OutputFiles []string `json:"output_files"` }{"ok", []string{output}}) if err != nil { w.WriteHeader(http.StatusInternalServerError) } logrus.Info("Pipeline routes registered successfully") }