Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
4.2 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "github.com/geplauder/lithium/auth"
  6. "github.com/geplauder/lithium/pipelines"
  7. "github.com/geplauder/lithium/settings"
  8. "github.com/geplauder/lithium/storage"
  9. "github.com/gorilla/mux"
  10. "github.com/spf13/afero"
  11. "io"
  12. "net/http"
  13. )
  14. const Name string = "Lithium"
  15. const Version string = "0.1.0"
  16. type Metadata struct {
  17. Name string `json:"name"`
  18. Version string `json:"version"`
  19. }
  20. func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) {
  21. w.Header().Set("Content-Type", "application/json")
  22. err := json.NewEncoder(w).Encode(pipeline)
  23. if err != nil {
  24. w.WriteHeader(http.StatusInternalServerError)
  25. }
  26. }
  27. func IndexHandler(w http.ResponseWriter, r *http.Request) {
  28. w.Header().Set("Content-Type", "application/json")
  29. err := json.NewEncoder(w).Encode(Metadata{Name, Version})
  30. if err != nil {
  31. w.WriteHeader(http.StatusInternalServerError)
  32. }
  33. }
  34. func writeError(w http.ResponseWriter, status int, errStr string) {
  35. w.WriteHeader(status)
  36. json.NewEncoder(w).Encode(struct {
  37. Error string `json:"error"`
  38. }{errStr})
  39. }
  40. func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  41. // open file handler
  42. formFile, handler, err := r.FormFile("file")
  43. if err != nil {
  44. writeError(w, http.StatusUnprocessableEntity, err.Error())
  45. return
  46. }
  47. defer formFile.Close()
  48. // check pipelines form param
  49. formPipeline := r.FormValue("pipeline")
  50. if formPipeline == "" {
  51. writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing")
  52. return
  53. }
  54. var execPipe pipelines.IPipeline
  55. for _, pipe := range pipes {
  56. if formPipeline == pipe.GetSlug() {
  57. execPipe = pipe
  58. break
  59. }
  60. }
  61. if execPipe == nil {
  62. writeError(w, http.StatusUnprocessableEntity, "pipeline not found")
  63. return
  64. }
  65. bucket := r.FormValue("bucket")
  66. if bucket == "" {
  67. writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing")
  68. return
  69. }
  70. // open file
  71. file, err := handler.Open()
  72. if err != nil {
  73. writeError(w, http.StatusInternalServerError, "error reading uploaded file")
  74. return
  75. }
  76. defer file.Close()
  77. // read file to buffer
  78. buf := bytes.NewBuffer(nil)
  79. _, err = io.Copy(buf, file)
  80. if err != nil {
  81. writeError(w, http.StatusInternalServerError, "error reading file from buffer")
  82. return
  83. }
  84. // store uploaded file
  85. _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes())
  86. if err != nil {
  87. return
  88. }
  89. // execute pipeline
  90. output, err := execPipe.Run("source.jpg", bucket, storageProvider)
  91. if err != nil {
  92. writeError(w, http.StatusInternalServerError, "error executing pipeline")
  93. return
  94. }
  95. w.Header().Set("Content-Type", "application/json")
  96. err = json.NewEncoder(w).Encode(struct {
  97. Message string `json:"message"`
  98. OutputFiles []string `json:"output_files"`
  99. }{"ok", []string{output}})
  100. if err != nil {
  101. w.WriteHeader(http.StatusInternalServerError)
  102. }
  103. }
  104. func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  105. r.HandleFunc("/", IndexHandler).Methods("GET")
  106. r.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) {
  107. UploadHandler(w, r, pipelines, storageProvider)
  108. }).Methods("POST")
  109. r.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) {
  110. for _, pipeline := range pipelines {
  111. if pipeline.GetSlug() == mux.Vars(r)["pipeline"] {
  112. PipelineHandler(pipeline, storageProvider, w, r)
  113. return
  114. }
  115. }
  116. w.WriteHeader(404)
  117. }).Methods("GET")
  118. }
  119. func main() {
  120. appSettings, err := settings.LoadSettings(afero.NewOsFs())
  121. if err != nil {
  122. panic(err)
  123. }
  124. var storageProvider storage.IStorageProvider
  125. if appSettings.StorageProvider.Type == 0 {
  126. storageProvider = storage.GetFileSystemStorageProvider(appSettings.StorageProvider.BasePath, "")
  127. } else {
  128. panic("Invalid file system provided!")
  129. }
  130. pipes := pipelines.LoadPipelines()
  131. authMiddleware := auth.CreateAuthenticationMiddleware(appSettings.Token)
  132. r := mux.NewRouter()
  133. r.Use(authMiddleware.Middleware)
  134. RegisterRoutes(r, pipes, storageProvider)
  135. err = http.ListenAndServe(appSettings.Endpoint, r)
  136. if err != nil {
  137. panic(err)
  138. }
  139. }