From 6be5df80301aa3a88766eb31e2d35222119aad5b Mon Sep 17 00:00:00 2001 From: Roman Zipp Date: Sun, 23 Jan 2022 17:10:26 +0100 Subject: [PATCH 1/6] Update route registration --- main.go | 7 ++++--- main_test.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 33cf1f7..16ed52a 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,9 @@ func IndexHandler(w http.ResponseWriter, r *http.Request) { } } -func RegisterPipelineRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) { +func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) { + r.HandleFunc("/", IndexHandler) + for _, pipeline := range pipelines { r.HandleFunc("/"+pipeline.GetSlug(), func(w http.ResponseWriter, r *http.Request) { PipelineHandler(pipeline, storageProvider, w, r) @@ -64,9 +66,8 @@ func main() { r := mux.NewRouter() r.Use(authMiddleware.Middleware) - r.HandleFunc("/", IndexHandler) - RegisterPipelineRoutes(r, pipes, storageProvider) + RegisterRoutes(r, pipes, storageProvider) err = http.ListenAndServe(appSettings.Endpoint, r) if err != nil { diff --git a/main_test.go b/main_test.go index 0e2941d..cd2b0c1 100644 --- a/main_test.go +++ b/main_test.go @@ -34,7 +34,7 @@ func TestEndpointRoute(t *testing.T) { router := mux.NewRouter() fs := storage.GetMemoryStorageProvider() - RegisterPipelineRoutes(router, []pipelines.IPipeline{data}, fs) + RegisterRoutes(router, []pipelines.IPipeline{data}, fs) request, _ := http.NewRequest("GET", "/"+data.Slug, nil) responseRecorder := httptest.NewRecorder() From ed716435aca3145bf926e6876d3bc6bf54f912e0 Mon Sep 17 00:00:00 2001 From: Roman Zipp Date: Sun, 23 Jan 2022 17:14:43 +0100 Subject: [PATCH 2/6] Enhance router pipeline route matching --- main.go | 16 ++++++++++------ main_test.go | 4 ++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index 16ed52a..2f7df81 100644 --- a/main.go +++ b/main.go @@ -38,12 +38,16 @@ func IndexHandler(w http.ResponseWriter, r *http.Request) { func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) { r.HandleFunc("/", IndexHandler) - - for _, pipeline := range pipelines { - r.HandleFunc("/"+pipeline.GetSlug(), func(w http.ResponseWriter, r *http.Request) { - PipelineHandler(pipeline, storageProvider, w, r) - }) - } + r.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) { + for _, pipeline := range pipelines { + if pipeline.GetSlug() == mux.Vars(r)["pipeline"] { + PipelineHandler(pipeline, storageProvider, w, r) + return + } + } + + w.WriteHeader(404) + }) } func main() { diff --git a/main_test.go b/main_test.go index cd2b0c1..e97f4ef 100644 --- a/main_test.go +++ b/main_test.go @@ -36,7 +36,7 @@ func TestEndpointRoute(t *testing.T) { RegisterRoutes(router, []pipelines.IPipeline{data}, fs) - request, _ := http.NewRequest("GET", "/"+data.Slug, nil) + request, _ := http.NewRequest("GET", "/pipelines/"+data.Slug, nil) responseRecorder := httptest.NewRecorder() router.ServeHTTP(responseRecorder, request) @@ -49,7 +49,7 @@ func TestEndpointRoute(t *testing.T) { t.Run("Unregistered pipelines return 404", func(t *testing.T) { router := mux.NewRouter() - request, _ := http.NewRequest("GET", "/"+data.Slug, nil) + request, _ := http.NewRequest("GET", "/pipelines/"+data.Slug, nil) responseRecorder := httptest.NewRecorder() router.ServeHTTP(responseRecorder, request) From 304be7397d05c46b9a768554146291b33eba8027 Mon Sep 17 00:00:00 2001 From: Roman Zipp Date: Sun, 23 Jan 2022 18:19:10 +0100 Subject: [PATCH 3/6] Add image upload endpoint --- README.md | 2 +- main.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++++--- main_test.go | 60 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 7101b4e..b7db386 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ Authorization: Bearer } ``` -### `GET` `/{pipeline}` +### `GET` `/pipelines/{pipeline}` Show pipeline information. diff --git a/main.go b/main.go index 2f7df81..67f8a20 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,17 @@ package main import ( + "bytes" "encoding/json" - "net/http" - "github.com/geplauder/lithium/auth" "github.com/geplauder/lithium/pipelines" "github.com/geplauder/lithium/settings" "github.com/geplauder/lithium/storage" "github.com/gorilla/mux" "github.com/spf13/afero" + "io" + "net/http" + "strings" ) const Name string = "Lithium" @@ -36,8 +38,75 @@ func IndexHandler(w http.ResponseWriter, r *http.Request) { } } +func writeError(w http.ResponseWriter, status int, errStr string) { + w.WriteHeader(status) + json.NewEncoder(w).Encode(struct { + Error string `json:"error"` + }{errStr}) +} + +func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) { + // open file handler + formFile, handler, err := r.FormFile("file") + if err != nil { + writeError(w, http.StatusUnprocessableEntity, err.Error()) + return + } + + defer formFile.Close() + + // check pipelines form param + formPipelines := strings.Split(r.FormValue("pipelines"), ",") + if len(formPipelines) == 0 { + writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing") + return + } + + bucket := r.FormValue("bucket") + if bucket == "" { + writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing") + return + } + + // open file + file, err := handler.Open() + if err != nil { + writeError(w, http.StatusInternalServerError, "error reading uploaded file") + return + } + + defer file.Close() + + // read file to buffer + buf := bytes.NewBuffer(nil) + _, err = io.Copy(buf, file) + if err != nil { + writeError(w, http.StatusInternalServerError, "error reading file from buffer") + return + } + + // store uploaded file + _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes()) + if err != nil { + return + } + + w.Header().Set("Content-Type", "application/json") + + err = json.NewEncoder(w).Encode(struct { + Message string `json:"message"` + }{"ok"}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } +} + func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) { - r.HandleFunc("/", IndexHandler) + r.HandleFunc("/", IndexHandler).Methods("GET") + r.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) { + UploadHandler(w, r, pipelines, storageProvider) + }).Methods("POST") r.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) { for _, pipeline := range pipelines { if pipeline.GetSlug() == mux.Vars(r)["pipeline"] { @@ -47,7 +116,7 @@ func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvi } w.WriteHeader(404) - }) + }).Methods("GET") } func main() { diff --git a/main_test.go b/main_test.go index e97f4ef..bcc73f4 100644 --- a/main_test.go +++ b/main_test.go @@ -1,7 +1,9 @@ package main import ( + "encoding/base64" "encoding/json" + "fmt" "net/http" "net/http/httptest" "testing" @@ -57,3 +59,61 @@ func TestEndpointRoute(t *testing.T) { assert.Equal(t, 404, responseRecorder.Code) }) } + +func TestUploadRoute(t *testing.T) { + t.Run("Test uploads missing multipart boundary", func(t *testing.T) { + router := mux.NewRouter() + fs := storage.GetMemoryStorageProvider() + + RegisterRoutes(router, []pipelines.IPipeline{pipelines.Pipeline{ + Name: "", + Slug: "", + Type: 0, + RemoveMetadata: false, + Steps: []pipelines.Step{}, + Output: struct { + Format string `json:"format"` + Quality int `json:"quality"` + }{"jpeg", 10}, + }}, fs) + + request, _ := http.NewRequest("POST", "/upload", nil) + request.Header["Content-Type"] = []string{"multipart/form-data"} + responseRecorder := httptest.NewRecorder() + + router.ServeHTTP(responseRecorder, request) + + assert.Equal(t, 0x1A6, responseRecorder.Code) + str, _ := base64.StdEncoding.DecodeString("eyJlcnJvciI6Im5" + + "vIG11bHRpcGFydCBib3VuZGFyeSBwYXJhbSBpbiBDb250ZW50LVR5cGUifQ==") + assert.JSONEq(t, string(str), responseRecorder.Body.String()) + }) + + t.Run("Test uploads missing multipart boundary", func(t *testing.T) { + router := mux.NewRouter() + fs := storage.GetMemoryStorageProvider() + + RegisterRoutes(router, []pipelines.IPipeline{pipelines.Pipeline{ + Name: "", + Slug: "", + Type: 0, + RemoveMetadata: false, + Steps: []pipelines.Step{}, + Output: struct { + Format string `json:"format"` + Quality int `json:"quality"` + }{"jpeg", 10}, + }}, fs) + + request, _ := http.NewRequest("POST", "/upload", nil) + request.Header["Content-Type"] = []string{"multipart/form-data", "boundary=X-INSOMNIA-BOUNDARY"} + responseRecorder := httptest.NewRecorder() + + router.ServeHTTP(responseRecorder, request) + + assert.Equal(t, 0x1A6, responseRecorder.Code) + str, _ := base64.StdEncoding.DecodeString("eyJlcnJvciI6Im5vIG11bHRpcGFydCBib3VuZGFyeSBwYXJhbSBpbiBDb250ZW50LVR5cGUifQ==") + assert.JSONEq(t, string(str), responseRecorder.Body.String()) + fmt.Println(responseRecorder.Body.String()) + }) +} From 5d5c6a1ac6bf66b9154b73a2e84ee9151aeec6fb Mon Sep 17 00:00:00 2001 From: Roman Zipp Date: Sun, 23 Jan 2022 18:19:40 +0100 Subject: [PATCH 4/6] Remove unneeded return --- main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/main.go b/main.go index 67f8a20..5a01895 100644 --- a/main.go +++ b/main.go @@ -98,7 +98,6 @@ func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPi }{"ok"}) if err != nil { w.WriteHeader(http.StatusInternalServerError) - return } } From af04ff1dc9e684412f8be462969da747e9809119 Mon Sep 17 00:00:00 2001 From: Roman Zipp Date: Sun, 23 Jan 2022 18:24:34 +0100 Subject: [PATCH 5/6] Update upload pipeline validation --- main.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 5a01895..704652b 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "github.com/spf13/afero" "io" "net/http" - "strings" ) const Name string = "Lithium" @@ -56,12 +55,25 @@ func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPi defer formFile.Close() // check pipelines form param - formPipelines := strings.Split(r.FormValue("pipelines"), ",") - if len(formPipelines) == 0 { + formPipeline := r.FormValue("pipeline") + if formPipeline == "" { writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing") return } + var execPipe pipelines.IPipeline + for _, pipe := range pipes { + if formPipeline == pipe.GetSlug() { + execPipe = pipe + break + } + } + + if execPipe == nil { + writeError(w, http.StatusUnprocessableEntity, "pipeline not found") + return + } + bucket := r.FormValue("bucket") if bucket == "" { writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing") From 25318799090b3002471378875abac324dbfeb720 Mon Sep 17 00:00:00 2001 From: Roman Zipp Date: Sun, 23 Jan 2022 18:28:14 +0100 Subject: [PATCH 6/6] Add processing pipeline on image upload --- main.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 704652b..91e208b 100644 --- a/main.go +++ b/main.go @@ -103,11 +103,20 @@ func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPi return } + // execute pipeline + output, err := execPipe.Run("source.jpg", bucket, storageProvider) + if err != nil { + writeError(w, http.StatusInternalServerError, "error executing pipeline") + return + } + w.Header().Set("Content-Type", "application/json") err = json.NewEncoder(w).Encode(struct { - Message string `json:"message"` - }{"ok"}) + Message string `json:"message"` + OutputFiles []string `json:"output_files"` + }{"ok", []string{output}}) + if err != nil { w.WriteHeader(http.StatusInternalServerError) }