Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
5.7 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "io"
  6. "net/http"
  7. "os"
  8. "github.com/geplauder/lithium/middlewares"
  9. "github.com/geplauder/lithium/pipelines"
  10. "github.com/geplauder/lithium/settings"
  11. "github.com/geplauder/lithium/storage"
  12. "github.com/gorilla/mux"
  13. "github.com/sirupsen/logrus"
  14. "github.com/spf13/afero"
  15. )
  16. const Name string = "Lithium"
  17. const Version string = "0.1.0"
  18. var GitCommit string
  19. type Metadata struct {
  20. Name string `json:"name"`
  21. Version string `json:"version"`
  22. CommitHash string `json:"commit_hash"`
  23. Pipelines []string `json:"pipelines"`
  24. }
  25. func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) {
  26. w.Header().Set("Content-Type", "application/json")
  27. err := json.NewEncoder(w).Encode(pipeline)
  28. if err != nil {
  29. w.WriteHeader(http.StatusInternalServerError)
  30. }
  31. }
  32. func IndexHandler(pipelines []pipelines.IPipeline, w http.ResponseWriter, r *http.Request) {
  33. var pipelineNames []string
  34. for _, x := range pipelines {
  35. pipelineNames = append(pipelineNames, x.GetName())
  36. }
  37. w.Header().Set("Content-Type", "application/json")
  38. err := json.NewEncoder(w).Encode(Metadata{Name, Version, GitCommit, pipelineNames})
  39. if err != nil {
  40. w.WriteHeader(http.StatusInternalServerError)
  41. }
  42. }
  43. func writeError(w http.ResponseWriter, status int, errStr string) {
  44. w.WriteHeader(status)
  45. json.NewEncoder(w).Encode(struct {
  46. Error string `json:"error"`
  47. }{errStr})
  48. }
  49. func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  50. // open file handler
  51. formFile, handler, err := r.FormFile("file")
  52. if err != nil {
  53. writeError(w, http.StatusUnprocessableEntity, err.Error())
  54. return
  55. }
  56. defer formFile.Close()
  57. // check pipelines form param
  58. formPipeline := r.FormValue("pipeline")
  59. if formPipeline == "" {
  60. writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing")
  61. return
  62. }
  63. var execPipe pipelines.IPipeline
  64. for _, pipe := range pipes {
  65. if formPipeline == pipe.GetSlug() {
  66. execPipe = pipe
  67. break
  68. }
  69. }
  70. if execPipe == nil {
  71. writeError(w, http.StatusUnprocessableEntity, "pipeline not found")
  72. return
  73. }
  74. bucket := r.FormValue("bucket")
  75. if bucket == "" {
  76. writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing")
  77. return
  78. }
  79. // open file
  80. file, err := handler.Open()
  81. if err != nil {
  82. writeError(w, http.StatusInternalServerError, "error reading uploaded file")
  83. return
  84. }
  85. defer file.Close()
  86. // read file to buffer
  87. buf := bytes.NewBuffer(nil)
  88. _, err = io.Copy(buf, file)
  89. if err != nil {
  90. writeError(w, http.StatusInternalServerError, "error reading file from buffer")
  91. return
  92. }
  93. // store uploaded file
  94. _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes())
  95. if err != nil {
  96. return
  97. }
  98. // execute pipeline
  99. output, err := execPipe.Run("source.jpg", bucket, storageProvider)
  100. if err != nil {
  101. writeError(w, http.StatusInternalServerError, "error executing pipeline")
  102. return
  103. }
  104. w.Header().Set("Content-Type", "application/json")
  105. err = json.NewEncoder(w).Encode(struct {
  106. Message string `json:"message"`
  107. OutputFiles []string `json:"output_files"`
  108. }{"ok", []string{output}})
  109. if err != nil {
  110. w.WriteHeader(http.StatusInternalServerError)
  111. }
  112. logrus.Info("Pipeline routes registered successfully")
  113. }
  114. func RegisterRoutes(r *mux.Router, appSettings settings.Settings, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  115. index := r.Methods(http.MethodGet).Subrouter()
  116. index.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  117. IndexHandler(pipelines, w, r)
  118. })
  119. upload := r.Methods(http.MethodPost).Subrouter()
  120. upload.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) {
  121. UploadHandler(w, r, pipelines, storageProvider)
  122. })
  123. pipeline := r.Methods(http.MethodGet).Subrouter()
  124. pipeline.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) {
  125. for _, pipeline := range pipelines {
  126. if pipeline.GetSlug() == mux.Vars(r)["pipeline"] {
  127. PipelineHandler(pipeline, storageProvider, w, r)
  128. return
  129. }
  130. }
  131. w.WriteHeader(404)
  132. })
  133. if appSettings.Authentication.Enabled {
  134. authMiddleware := middlewares.CreateAuthenticationMiddleware(appSettings.Authentication.Token)
  135. upload.Use(authMiddleware.Middleware)
  136. pipeline.Use(authMiddleware.Middleware)
  137. }
  138. }
  139. func main() {
  140. logFile, err := os.OpenFile("lithium.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  141. if err == nil {
  142. multiWriter := io.MultiWriter(os.Stdout, logFile)
  143. logrus.SetOutput(multiWriter)
  144. } else {
  145. logrus.SetOutput(os.Stdout)
  146. }
  147. logrus.SetFormatter(&logrus.JSONFormatter{})
  148. appSettings, err := settings.LoadSettings(afero.NewOsFs())
  149. if err != nil {
  150. logrus.Fatal("Unexpected error while loading settings: ", err)
  151. }
  152. var storageProvider storage.IStorageProvider
  153. if appSettings.StorageProvider.Type == 0 {
  154. storageProvider = storage.GetFileSystemStorageProvider(appSettings.StorageProvider.BasePath, "")
  155. } else {
  156. logrus.WithFields(logrus.Fields{
  157. "StorageProviderType": appSettings.StorageProvider.Type,
  158. }).Fatal("Invalid file system provided")
  159. }
  160. pipes := pipelines.LoadPipelines()
  161. r := mux.NewRouter()
  162. if appSettings.RateLimiter.Enabled {
  163. rateLimiterMiddleware, err := middlewares.CreateRateLimiterMiddleware(appSettings.RateLimiter.RequestsPerMinute, appSettings.RateLimiter.AllowedBurst)
  164. if err != nil {
  165. logrus.Fatal("Unexpected error while creating rate limiter middleware: ", err)
  166. }
  167. r.Use(rateLimiterMiddleware.Middleware)
  168. }
  169. RegisterRoutes(r, appSettings, pipes, storageProvider)
  170. logrus.Info("Lithium started, listening for requests...")
  171. err = http.ListenAndServe(appSettings.Endpoint, r)
  172. if err != nil {
  173. logrus.Fatal("Unexpected error while serving http server: ", err)
  174. }
  175. }