Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
4.6 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "io"
  6. "net/http"
  7. "github.com/geplauder/lithium/middlewares"
  8. "github.com/geplauder/lithium/pipelines"
  9. "github.com/geplauder/lithium/settings"
  10. "github.com/geplauder/lithium/storage"
  11. "github.com/gorilla/mux"
  12. "github.com/spf13/afero"
  13. )
  14. const Name string = "Lithium"
  15. const Version string = "0.1.0"
  16. var GitCommit string
  17. type Metadata struct {
  18. Name string `json:"name"`
  19. Version string `json:"version"`
  20. CommitHash string `json:"commit_hash"`
  21. }
  22. func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) {
  23. w.Header().Set("Content-Type", "application/json")
  24. err := json.NewEncoder(w).Encode(pipeline)
  25. if err != nil {
  26. w.WriteHeader(http.StatusInternalServerError)
  27. }
  28. }
  29. func IndexHandler(w http.ResponseWriter, r *http.Request) {
  30. w.Header().Set("Content-Type", "application/json")
  31. err := json.NewEncoder(w).Encode(Metadata{Name, Version, GitCommit})
  32. if err != nil {
  33. w.WriteHeader(http.StatusInternalServerError)
  34. }
  35. }
  36. func writeError(w http.ResponseWriter, status int, errStr string) {
  37. w.WriteHeader(status)
  38. json.NewEncoder(w).Encode(struct {
  39. Error string `json:"error"`
  40. }{errStr})
  41. }
  42. func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  43. // open file handler
  44. formFile, handler, err := r.FormFile("file")
  45. if err != nil {
  46. writeError(w, http.StatusUnprocessableEntity, err.Error())
  47. return
  48. }
  49. defer formFile.Close()
  50. // check pipelines form param
  51. formPipeline := r.FormValue("pipeline")
  52. if formPipeline == "" {
  53. writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing")
  54. return
  55. }
  56. var execPipe pipelines.IPipeline
  57. for _, pipe := range pipes {
  58. if formPipeline == pipe.GetSlug() {
  59. execPipe = pipe
  60. break
  61. }
  62. }
  63. if execPipe == nil {
  64. writeError(w, http.StatusUnprocessableEntity, "pipeline not found")
  65. return
  66. }
  67. bucket := r.FormValue("bucket")
  68. if bucket == "" {
  69. writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing")
  70. return
  71. }
  72. // open file
  73. file, err := handler.Open()
  74. if err != nil {
  75. writeError(w, http.StatusInternalServerError, "error reading uploaded file")
  76. return
  77. }
  78. defer file.Close()
  79. // read file to buffer
  80. buf := bytes.NewBuffer(nil)
  81. _, err = io.Copy(buf, file)
  82. if err != nil {
  83. writeError(w, http.StatusInternalServerError, "error reading file from buffer")
  84. return
  85. }
  86. // store uploaded file
  87. _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes())
  88. if err != nil {
  89. return
  90. }
  91. // execute pipeline
  92. output, err := execPipe.Run("source.jpg", bucket, storageProvider)
  93. if err != nil {
  94. writeError(w, http.StatusInternalServerError, "error executing pipeline")
  95. return
  96. }
  97. w.Header().Set("Content-Type", "application/json")
  98. err = json.NewEncoder(w).Encode(struct {
  99. Message string `json:"message"`
  100. OutputFiles []string `json:"output_files"`
  101. }{"ok", []string{output}})
  102. if err != nil {
  103. w.WriteHeader(http.StatusInternalServerError)
  104. }
  105. }
  106. func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  107. r.HandleFunc("/", IndexHandler).Methods("GET")
  108. r.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) {
  109. UploadHandler(w, r, pipelines, storageProvider)
  110. }).Methods("POST")
  111. r.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) {
  112. for _, pipeline := range pipelines {
  113. if pipeline.GetSlug() == mux.Vars(r)["pipeline"] {
  114. PipelineHandler(pipeline, storageProvider, w, r)
  115. return
  116. }
  117. }
  118. w.WriteHeader(404)
  119. }).Methods("GET")
  120. }
  121. func main() {
  122. appSettings, err := settings.LoadSettings(afero.NewOsFs())
  123. if err != nil {
  124. panic(err)
  125. }
  126. var storageProvider storage.IStorageProvider
  127. if appSettings.StorageProvider.Type == 0 {
  128. storageProvider = storage.GetFileSystemStorageProvider(appSettings.StorageProvider.BasePath, "")
  129. } else {
  130. panic("Invalid file system provided!")
  131. }
  132. pipes := pipelines.LoadPipelines()
  133. r := mux.NewRouter()
  134. if appSettings.Authentication.Enabled {
  135. authMiddleware := middlewares.CreateAuthenticationMiddleware(appSettings.Authentication.Token)
  136. r.Use(authMiddleware.Middleware)
  137. }
  138. if appSettings.RateLimiter.Enabled {
  139. rateLimiterMiddleware, err := middlewares.CreateRateLimiterMiddleware(appSettings.RateLimiter.RequestsPerMinute, appSettings.RateLimiter.AllowedBurst)
  140. if err != nil {
  141. panic(err)
  142. }
  143. r.Use(rateLimiterMiddleware.Middleware)
  144. }
  145. r.HandleFunc("/", IndexHandler)
  146. RegisterRoutes(r, pipes, storageProvider)
  147. err = http.ListenAndServe(appSettings.Endpoint, r)
  148. if err != nil {
  149. panic(err)
  150. }
  151. }