Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

196 lines
5.1 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "io"
  6. "net/http"
  7. "os"
  8. "github.com/geplauder/lithium/middlewares"
  9. "github.com/geplauder/lithium/pipelines"
  10. "github.com/geplauder/lithium/settings"
  11. "github.com/geplauder/lithium/storage"
  12. "github.com/gorilla/mux"
  13. "github.com/sirupsen/logrus"
  14. "github.com/spf13/afero"
  15. )
  16. const Name string = "Lithium"
  17. const Version string = "0.1.0"
  18. var GitCommit string
  19. type Metadata struct {
  20. Name string `json:"name"`
  21. Version string `json:"version"`
  22. CommitHash string `json:"commit_hash"`
  23. }
  24. func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) {
  25. w.Header().Set("Content-Type", "application/json")
  26. err := json.NewEncoder(w).Encode(pipeline)
  27. if err != nil {
  28. w.WriteHeader(http.StatusInternalServerError)
  29. }
  30. }
  31. func IndexHandler(w http.ResponseWriter, r *http.Request) {
  32. w.Header().Set("Content-Type", "application/json")
  33. err := json.NewEncoder(w).Encode(Metadata{Name, Version, GitCommit})
  34. if err != nil {
  35. w.WriteHeader(http.StatusInternalServerError)
  36. }
  37. }
  38. func writeError(w http.ResponseWriter, status int, errStr string) {
  39. w.WriteHeader(status)
  40. json.NewEncoder(w).Encode(struct {
  41. Error string `json:"error"`
  42. }{errStr})
  43. }
  44. func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  45. // open file handler
  46. formFile, handler, err := r.FormFile("file")
  47. if err != nil {
  48. writeError(w, http.StatusUnprocessableEntity, err.Error())
  49. return
  50. }
  51. defer formFile.Close()
  52. // check pipelines form param
  53. formPipeline := r.FormValue("pipeline")
  54. if formPipeline == "" {
  55. writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing")
  56. return
  57. }
  58. var execPipe pipelines.IPipeline
  59. for _, pipe := range pipes {
  60. if formPipeline == pipe.GetSlug() {
  61. execPipe = pipe
  62. break
  63. }
  64. }
  65. if execPipe == nil {
  66. writeError(w, http.StatusUnprocessableEntity, "pipeline not found")
  67. return
  68. }
  69. bucket := r.FormValue("bucket")
  70. if bucket == "" {
  71. writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing")
  72. return
  73. }
  74. // open file
  75. file, err := handler.Open()
  76. if err != nil {
  77. writeError(w, http.StatusInternalServerError, "error reading uploaded file")
  78. return
  79. }
  80. defer file.Close()
  81. // read file to buffer
  82. buf := bytes.NewBuffer(nil)
  83. _, err = io.Copy(buf, file)
  84. if err != nil {
  85. writeError(w, http.StatusInternalServerError, "error reading file from buffer")
  86. return
  87. }
  88. // store uploaded file
  89. _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes())
  90. if err != nil {
  91. return
  92. }
  93. // execute pipeline
  94. output, err := execPipe.Run("source.jpg", bucket, storageProvider)
  95. if err != nil {
  96. writeError(w, http.StatusInternalServerError, "error executing pipeline")
  97. return
  98. }
  99. w.Header().Set("Content-Type", "application/json")
  100. err = json.NewEncoder(w).Encode(struct {
  101. Message string `json:"message"`
  102. OutputFiles []string `json:"output_files"`
  103. }{"ok", []string{output}})
  104. if err != nil {
  105. w.WriteHeader(http.StatusInternalServerError)
  106. }
  107. logrus.Info("Pipeline routes registered successfully")
  108. }
  109. func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  110. r.HandleFunc("/", IndexHandler).Methods("GET")
  111. r.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) {
  112. UploadHandler(w, r, pipelines, storageProvider)
  113. }).Methods("POST")
  114. r.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) {
  115. for _, pipeline := range pipelines {
  116. if pipeline.GetSlug() == mux.Vars(r)["pipeline"] {
  117. PipelineHandler(pipeline, storageProvider, w, r)
  118. return
  119. }
  120. }
  121. w.WriteHeader(404)
  122. }).Methods("GET")
  123. }
  124. func main() {
  125. logrus.SetFormatter(&logrus.JSONFormatter{})
  126. logrus.SetOutput(os.Stdout)
  127. appSettings, err := settings.LoadSettings(afero.NewOsFs())
  128. if err != nil {
  129. logrus.Fatal("Unexpected error while loading settings: ", err)
  130. }
  131. var storageProvider storage.IStorageProvider
  132. if appSettings.StorageProvider.Type == 0 {
  133. storageProvider = storage.GetFileSystemStorageProvider(appSettings.StorageProvider.BasePath, "")
  134. } else {
  135. logrus.WithFields(logrus.Fields{
  136. "StorageProviderType": appSettings.StorageProvider.Type,
  137. }).Fatal("Invalid file system provided")
  138. }
  139. pipes := pipelines.LoadPipelines()
  140. r := mux.NewRouter()
  141. if appSettings.Authentication.Enabled {
  142. authMiddleware := middlewares.CreateAuthenticationMiddleware(appSettings.Authentication.Token)
  143. r.Use(authMiddleware.Middleware)
  144. }
  145. if appSettings.RateLimiter.Enabled {
  146. rateLimiterMiddleware, err := middlewares.CreateRateLimiterMiddleware(appSettings.RateLimiter.RequestsPerMinute, appSettings.RateLimiter.AllowedBurst)
  147. if err != nil {
  148. logrus.Fatal("Unexpected error while creating rate limiter middleware: ", err)
  149. }
  150. r.Use(rateLimiterMiddleware.Middleware)
  151. }
  152. RegisterRoutes(r, pipes, storageProvider)
  153. logrus.Info("Lithium started, listening for requests...")
  154. err = http.ListenAndServe(appSettings.Endpoint, r)
  155. if err != nil {
  156. logrus.Fatal("Unexpected error while serving http server: ", err)
  157. }
  158. }