3 Commits

  1. 13
      controllers/controllers.go
  2. 30
      controllers/index.go
  3. 16
      controllers/pipeline.go
  4. 91
      controllers/upload.go
  5. 11
      lithium.log
  6. 127
      main.go
  7. 7
      main_test.go

13
controllers/controllers.go

@ -0,0 +1,13 @@
package controllers
import (
"encoding/json"
"net/http"
)
func writeError(w http.ResponseWriter, status int, errStr string) {
w.WriteHeader(status)
json.NewEncoder(w).Encode(struct {
Error string `json:"error"`
}{errStr})
}

30
controllers/index.go

@ -0,0 +1,30 @@
package controllers
import (
"encoding/json"
"github.com/geplauder/lithium/pipelines"
"net/http"
)
type Metadata struct {
Name string `json:"name"`
Version string `json:"version"`
CommitHash string `json:"commit_hash"`
Pipelines []string `json:"pipelines"`
}
const Name string = "Lithium"
const Version string = "0.1.0"
func IndexHandler(pipelines []pipelines.IPipeline, w http.ResponseWriter, r *http.Request, gitCommit string) {
var pipelineNames []string
for _, x := range pipelines {
pipelineNames = append(pipelineNames, x.GetSlug())
}
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(Metadata{Name, Version, gitCommit, pipelineNames})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}

16
controllers/pipeline.go

@ -0,0 +1,16 @@
package controllers
import (
"encoding/json"
"github.com/geplauder/lithium/pipelines"
"github.com/geplauder/lithium/storage"
"net/http"
)
func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(pipeline)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}

91
controllers/upload.go

@ -0,0 +1,91 @@
package controllers
import (
"bytes"
"encoding/json"
"github.com/geplauder/lithium/pipelines"
"github.com/geplauder/lithium/storage"
"github.com/sirupsen/logrus"
"io"
"net/http"
)
func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
// open file handler
formFile, handler, err := r.FormFile("file")
if err != nil {
writeError(w, http.StatusUnprocessableEntity, err.Error())
return
}
defer formFile.Close()
// check pipelines form param
formPipeline := r.FormValue("pipeline")
if formPipeline == "" {
writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing")
return
}
var execPipe pipelines.IPipeline
for _, pipe := range pipes {
if formPipeline == pipe.GetSlug() {
execPipe = pipe
break
}
}
if execPipe == nil {
writeError(w, http.StatusUnprocessableEntity, "pipeline not found")
return
}
bucket := r.FormValue("bucket")
if bucket == "" {
writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing")
return
}
// open file
file, err := handler.Open()
if err != nil {
writeError(w, http.StatusInternalServerError, "error reading uploaded file")
return
}
defer file.Close()
// read file to buffer
buf := bytes.NewBuffer(nil)
_, err = io.Copy(buf, file)
if err != nil {
writeError(w, http.StatusInternalServerError, "error reading file from buffer")
return
}
// store uploaded file
_, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes())
if err != nil {
return
}
// execute pipeline
output, err := execPipe.Run("source.jpg", bucket, storageProvider)
if err != nil {
writeError(w, http.StatusInternalServerError, "error executing pipeline")
return
}
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(struct {
Message string `json:"message"`
OutputFiles []string `json:"output_files"`
}{"ok", []string{output}})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
logrus.Info("Pipeline routes registered successfully")
}

11
lithium.log

@ -1,11 +0,0 @@
{"level":"info","msg":"Pipeline routes registered successfully","time":"2022-01-23T18:28:12+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-23T18:28:12+01:00"}
{"level":"info","msg":"Pipeline routes registered successfully","time":"2022-01-23T18:28:17+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-23T18:28:17+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-26T17:37:28+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-26T17:47:13+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-01-26T17:48:17+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T15:30:47+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T15:38:27+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T15:45:25+01:00"}
{"level":"info","msg":"Lithium started, listening for requests...","time":"2022-02-06T16:03:30+01:00"}

127
main.go

@ -1,8 +1,7 @@
package main
import (
"bytes"
"encoding/json"
"github.com/geplauder/lithium/controllers"
"net/http"
"os"
@ -17,142 +16,24 @@ import (
"github.com/spf13/afero"
)
const Name string = "Lithium"
const Version string = "0.1.0"
var GitCommit string
type Metadata struct {
Name string `json:"name"`
Version string `json:"version"`
CommitHash string `json:"commit_hash"`
Pipelines []string `json:"pipelines"`
}
func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(pipeline)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
func IndexHandler(pipelines []pipelines.IPipeline, w http.ResponseWriter, r *http.Request) {
var pipelineNames []string
for _, x := range pipelines {
pipelineNames = append(pipelineNames, x.GetSlug())
}
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(Metadata{Name, Version, GitCommit, pipelineNames})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
func writeError(w http.ResponseWriter, status int, errStr string) {
w.WriteHeader(status)
json.NewEncoder(w).Encode(struct {
Error string `json:"error"`
}{errStr})
}
func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
// open file handler
formFile, handler, err := r.FormFile("file")
if err != nil {
writeError(w, http.StatusUnprocessableEntity, err.Error())
return
}
defer formFile.Close()
// check pipelines form param
formPipeline := r.FormValue("pipeline")
if formPipeline == "" {
writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing")
return
}
var execPipe pipelines.IPipeline
for _, pipe := range pipes {
if formPipeline == pipe.GetSlug() {
execPipe = pipe
break
}
}
if execPipe == nil {
writeError(w, http.StatusUnprocessableEntity, "pipeline not found")
return
}
bucket := r.FormValue("bucket")
if bucket == "" {
writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing")
return
}
// open file
file, err := handler.Open()
if err != nil {
writeError(w, http.StatusInternalServerError, "error reading uploaded file")
return
}
defer file.Close()
// read file to buffer
buf := bytes.NewBuffer(nil)
_, err = io.Copy(buf, file)
if err != nil {
writeError(w, http.StatusInternalServerError, "error reading file from buffer")
return
}
// store uploaded file
_, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes())
if err != nil {
return
}
// execute pipeline
output, err := execPipe.Run("source.jpg", bucket, storageProvider)
if err != nil {
writeError(w, http.StatusInternalServerError, "error executing pipeline")
return
}
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(struct {
Message string `json:"message"`
OutputFiles []string `json:"output_files"`
}{"ok", []string{output}})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
logrus.Info("Pipeline routes registered successfully")
}
func RegisterRoutes(r *mux.Router, appSettings settings.Settings, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
index := r.Methods(http.MethodGet).Subrouter()
index.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
IndexHandler(pipelines, w, r)
controllers.IndexHandler(pipelines, w, r, GitCommit)
})
upload := r.Methods(http.MethodPost).Subrouter()
upload.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) {
UploadHandler(w, r, pipelines, storageProvider)
controllers.UploadHandler(w, r, pipelines, storageProvider)
})
pipeline := r.Methods(http.MethodGet).Subrouter()
pipeline.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) {
for _, pipeline := range pipelines {
if pipeline.GetSlug() == mux.Vars(r)["pipeline"] {
PipelineHandler(pipeline, storageProvider, w, r)
controllers.PipelineHandler(pipeline, storageProvider, w, r)
return
}
}

7
main_test.go

@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/geplauder/lithium/controllers"
"net/http"
"net/http/httptest"
"testing"
@ -26,7 +27,7 @@ func TestIndexRoute(t *testing.T) {
request := httptest.NewRequest(http.MethodGet, "/", nil)
responseRecorder := httptest.NewRecorder()
IndexHandler([]pipelines.IPipeline{}, responseRecorder, request)
controllers.IndexHandler([]pipelines.IPipeline{}, responseRecorder, request, "")
assert.Equal(t, 200, responseRecorder.Code, "Response code should be 200")
assert.NotNil(t, responseRecorder.Body, "Response should contain body")
@ -36,11 +37,11 @@ func TestIndexRoute(t *testing.T) {
request := httptest.NewRequest(http.MethodGet, "/", nil)
responseRecorder := httptest.NewRecorder()
IndexHandler([]pipelines.IPipeline{data}, responseRecorder, request)
controllers.IndexHandler([]pipelines.IPipeline{data}, responseRecorder, request, "")
assert.Equal(t, 200, responseRecorder.Code, "Response code should be 200")
var body = Metadata{}
var body = controllers.Metadata{}
err = json.Unmarshal(responseRecorder.Body.Bytes(), &body)
assert.Nil(t, err)
assert.Equal(t, 1, len(body.Pipelines))

Loading…
Cancel
Save