Browse Source
Merge commit '47295de3fed8fb23db46eca4cff096ee3209f883' into HEAD
pr-feature/enhance-api-responses
Merge commit '47295de3fed8fb23db46eca4cff096ee3209f883' into HEAD
pr-feature/enhance-api-responses
Jenkins
3 years ago
7 changed files with 158 additions and 137 deletions
-
13controllers/controllers.go
-
30controllers/index.go
-
16controllers/pipeline.go
-
91controllers/upload.go
-
11lithium.log
-
127main.go
-
7main_test.go
@ -0,0 +1,13 @@ |
|||
package controllers |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"net/http" |
|||
) |
|||
|
|||
func writeError(w http.ResponseWriter, status int, errStr string) { |
|||
w.WriteHeader(status) |
|||
json.NewEncoder(w).Encode(struct { |
|||
Error string `json:"error"` |
|||
}{errStr}) |
|||
} |
@ -0,0 +1,30 @@ |
|||
package controllers |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"github.com/geplauder/lithium/pipelines" |
|||
"net/http" |
|||
) |
|||
|
|||
type Metadata struct { |
|||
Name string `json:"name"` |
|||
Version string `json:"version"` |
|||
CommitHash string `json:"commit_hash"` |
|||
Pipelines []string `json:"pipelines"` |
|||
} |
|||
|
|||
const Name string = "Lithium" |
|||
const Version string = "0.1.0" |
|||
|
|||
func IndexHandler(pipelines []pipelines.IPipeline, w http.ResponseWriter, r *http.Request, gitCommit string) { |
|||
var pipelineNames []string |
|||
for _, x := range pipelines { |
|||
pipelineNames = append(pipelineNames, x.GetSlug()) |
|||
} |
|||
|
|||
w.Header().Set("Content-Type", "application/json") |
|||
err := json.NewEncoder(w).Encode(Metadata{Name, Version, gitCommit, pipelineNames}) |
|||
if err != nil { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
} |
|||
} |
@ -0,0 +1,16 @@ |
|||
package controllers |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"github.com/geplauder/lithium/pipelines" |
|||
"github.com/geplauder/lithium/storage" |
|||
"net/http" |
|||
) |
|||
|
|||
func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) { |
|||
w.Header().Set("Content-Type", "application/json") |
|||
err := json.NewEncoder(w).Encode(pipeline) |
|||
if err != nil { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
} |
|||
} |
@ -0,0 +1,91 @@ |
|||
package controllers |
|||
|
|||
import ( |
|||
"bytes" |
|||
"encoding/json" |
|||
"github.com/geplauder/lithium/pipelines" |
|||
"github.com/geplauder/lithium/storage" |
|||
"github.com/sirupsen/logrus" |
|||
"io" |
|||
"net/http" |
|||
) |
|||
|
|||
func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) { |
|||
// open file handler
|
|||
formFile, handler, err := r.FormFile("file") |
|||
if err != nil { |
|||
writeError(w, http.StatusUnprocessableEntity, err.Error()) |
|||
return |
|||
} |
|||
|
|||
defer formFile.Close() |
|||
|
|||
// check pipelines form param
|
|||
formPipeline := r.FormValue("pipeline") |
|||
if formPipeline == "" { |
|||
writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing") |
|||
return |
|||
} |
|||
|
|||
var execPipe pipelines.IPipeline |
|||
for _, pipe := range pipes { |
|||
if formPipeline == pipe.GetSlug() { |
|||
execPipe = pipe |
|||
break |
|||
} |
|||
} |
|||
|
|||
if execPipe == nil { |
|||
writeError(w, http.StatusUnprocessableEntity, "pipeline not found") |
|||
return |
|||
} |
|||
|
|||
bucket := r.FormValue("bucket") |
|||
if bucket == "" { |
|||
writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing") |
|||
return |
|||
} |
|||
|
|||
// open file
|
|||
file, err := handler.Open() |
|||
if err != nil { |
|||
writeError(w, http.StatusInternalServerError, "error reading uploaded file") |
|||
return |
|||
} |
|||
|
|||
defer file.Close() |
|||
|
|||
// read file to buffer
|
|||
buf := bytes.NewBuffer(nil) |
|||
_, err = io.Copy(buf, file) |
|||
if err != nil { |
|||
writeError(w, http.StatusInternalServerError, "error reading file from buffer") |
|||
return |
|||
} |
|||
|
|||
// store uploaded file
|
|||
_, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes()) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
// execute pipeline
|
|||
output, err := execPipe.Run("source.jpg", bucket, storageProvider) |
|||
if err != nil { |
|||
writeError(w, http.StatusInternalServerError, "error executing pipeline") |
|||
return |
|||
} |
|||
|
|||
w.Header().Set("Content-Type", "application/json") |
|||
|
|||
err = json.NewEncoder(w).Encode(struct { |
|||
Message string `json:"message"` |
|||
OutputFiles []string `json:"output_files"` |
|||
}{"ok", []string{output}}) |
|||
|
|||
if err != nil { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
} |
|||
|
|||
logrus.Info("Pipeline routes registered successfully") |
|||
} |
@ -1,11 +0,0 @@ |
|||
{"level":"info","msg":"Pipeline routes registered successfully","time":"2022-01-23T18:28:12+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-23T18:28:12+01:00"} |
|||
{"level":"info","msg":"Pipeline routes registered successfully","time":"2022-01-23T18:28:17+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-23T18:28:17+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-26T17:37:28+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-26T17:47:13+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-26T17:48:17+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T15:30:47+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T15:38:27+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T15:45:25+01:00"} |
|||
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T16:03:30+01:00"} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue