|
|
package main
import ( "bytes" "encoding/json" "io" "net/http"
"github.com/geplauder/lithium/middlewares" "github.com/geplauder/lithium/pipelines" "github.com/geplauder/lithium/settings" "github.com/geplauder/lithium/storage" "github.com/gorilla/mux" "github.com/spf13/afero" )
const Name string = "Lithium" const Version string = "0.1.0"
var GitCommit string
type Metadata struct { Name string `json:"name"` Version string `json:"version"` CommitHash string `json:"commit_hash"` }
func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(pipeline) if err != nil { w.WriteHeader(http.StatusInternalServerError) } }
func IndexHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(Metadata{Name, Version, GitCommit}) if err != nil { w.WriteHeader(http.StatusInternalServerError) } }
func writeError(w http.ResponseWriter, status int, errStr string) { w.WriteHeader(status) json.NewEncoder(w).Encode(struct { Error string `json:"error"` }{errStr}) }
func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) { // open file handler
formFile, handler, err := r.FormFile("file") if err != nil { writeError(w, http.StatusUnprocessableEntity, err.Error()) return }
defer formFile.Close()
// check pipelines form param
formPipeline := r.FormValue("pipeline") if formPipeline == "" { writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing") return }
var execPipe pipelines.IPipeline for _, pipe := range pipes { if formPipeline == pipe.GetSlug() { execPipe = pipe break } }
if execPipe == nil { writeError(w, http.StatusUnprocessableEntity, "pipeline not found") return }
bucket := r.FormValue("bucket") if bucket == "" { writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing") return }
// open file
file, err := handler.Open() if err != nil { writeError(w, http.StatusInternalServerError, "error reading uploaded file") return }
defer file.Close()
// read file to buffer
buf := bytes.NewBuffer(nil) _, err = io.Copy(buf, file) if err != nil { writeError(w, http.StatusInternalServerError, "error reading file from buffer") return }
// store uploaded file
_, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes()) if err != nil { return }
// execute pipeline
output, err := execPipe.Run("source.jpg", bucket, storageProvider) if err != nil { writeError(w, http.StatusInternalServerError, "error executing pipeline") return }
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(struct { Message string `json:"message"` OutputFiles []string `json:"output_files"` }{"ok", []string{output}})
if err != nil { w.WriteHeader(http.StatusInternalServerError) } }
func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) { r.HandleFunc("/", IndexHandler).Methods("GET") r.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) { UploadHandler(w, r, pipelines, storageProvider) }).Methods("POST") r.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) { for _, pipeline := range pipelines { if pipeline.GetSlug() == mux.Vars(r)["pipeline"] { PipelineHandler(pipeline, storageProvider, w, r) return } }
w.WriteHeader(404) }).Methods("GET") }
func main() { appSettings, err := settings.LoadSettings(afero.NewOsFs()) if err != nil { panic(err) }
var storageProvider storage.IStorageProvider
if appSettings.StorageProvider.Type == 0 { storageProvider = storage.GetFileSystemStorageProvider(appSettings.StorageProvider.BasePath, "") } else { panic("Invalid file system provided!") }
pipes := pipelines.LoadPipelines()
r := mux.NewRouter()
if appSettings.Authentication.Enabled { authMiddleware := middlewares.CreateAuthenticationMiddleware(appSettings.Authentication.Token)
r.Use(authMiddleware.Middleware) }
if appSettings.RateLimiter.Enabled { rateLimiterMiddleware, err := middlewares.CreateRateLimiterMiddleware(appSettings.RateLimiter.RequestsPerMinute, appSettings.RateLimiter.AllowedBurst) if err != nil { panic(err) }
r.Use(rateLimiterMiddleware.Middleware) }
RegisterRoutes(r, pipes, storageProvider)
err = http.ListenAndServe(appSettings.Endpoint, r) if err != nil { panic(err) } }
|