Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
3.7 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "github.com/geplauder/lithium/auth"
  6. "github.com/geplauder/lithium/pipelines"
  7. "github.com/geplauder/lithium/settings"
  8. "github.com/geplauder/lithium/storage"
  9. "github.com/gorilla/mux"
  10. "github.com/spf13/afero"
  11. "io"
  12. "net/http"
  13. "strings"
  14. )
  15. const Name string = "Lithium"
  16. const Version string = "0.1.0"
  17. type Metadata struct {
  18. Name string `json:"name"`
  19. Version string `json:"version"`
  20. }
  21. func PipelineHandler(pipeline pipelines.IPipeline, storageProvider storage.IStorageProvider, w http.ResponseWriter, r *http.Request) {
  22. w.Header().Set("Content-Type", "application/json")
  23. err := json.NewEncoder(w).Encode(pipeline)
  24. if err != nil {
  25. w.WriteHeader(http.StatusInternalServerError)
  26. }
  27. }
  28. func IndexHandler(w http.ResponseWriter, r *http.Request) {
  29. w.Header().Set("Content-Type", "application/json")
  30. err := json.NewEncoder(w).Encode(Metadata{Name, Version})
  31. if err != nil {
  32. w.WriteHeader(http.StatusInternalServerError)
  33. }
  34. }
  35. func writeError(w http.ResponseWriter, status int, errStr string) {
  36. w.WriteHeader(status)
  37. json.NewEncoder(w).Encode(struct {
  38. Error string `json:"error"`
  39. }{errStr})
  40. }
  41. func UploadHandler(w http.ResponseWriter, r *http.Request, pipes []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  42. // open file handler
  43. formFile, handler, err := r.FormFile("file")
  44. if err != nil {
  45. writeError(w, http.StatusUnprocessableEntity, err.Error())
  46. return
  47. }
  48. defer formFile.Close()
  49. // check pipelines form param
  50. formPipelines := strings.Split(r.FormValue("pipelines"), ",")
  51. if len(formPipelines) == 0 {
  52. writeError(w, http.StatusUnprocessableEntity, "pipeline parameter missing")
  53. return
  54. }
  55. bucket := r.FormValue("bucket")
  56. if bucket == "" {
  57. writeError(w, http.StatusUnprocessableEntity, "bucket parameter missing")
  58. return
  59. }
  60. // open file
  61. file, err := handler.Open()
  62. if err != nil {
  63. writeError(w, http.StatusInternalServerError, "error reading uploaded file")
  64. return
  65. }
  66. defer file.Close()
  67. // read file to buffer
  68. buf := bytes.NewBuffer(nil)
  69. _, err = io.Copy(buf, file)
  70. if err != nil {
  71. writeError(w, http.StatusInternalServerError, "error reading file from buffer")
  72. return
  73. }
  74. // store uploaded file
  75. _, err = storageProvider.StoreRaw(bucket, "source.jpg", buf.Bytes())
  76. if err != nil {
  77. return
  78. }
  79. w.Header().Set("Content-Type", "application/json")
  80. err = json.NewEncoder(w).Encode(struct {
  81. Message string `json:"message"`
  82. }{"ok"})
  83. if err != nil {
  84. w.WriteHeader(http.StatusInternalServerError)
  85. return
  86. }
  87. }
  88. func RegisterRoutes(r *mux.Router, pipelines []pipelines.IPipeline, storageProvider storage.IStorageProvider) {
  89. r.HandleFunc("/", IndexHandler).Methods("GET")
  90. r.HandleFunc("/upload", func(w http.ResponseWriter, r *http.Request) {
  91. UploadHandler(w, r, pipelines, storageProvider)
  92. }).Methods("POST")
  93. r.HandleFunc("/pipelines/{pipeline}", func(w http.ResponseWriter, r *http.Request) {
  94. for _, pipeline := range pipelines {
  95. if pipeline.GetSlug() == mux.Vars(r)["pipeline"] {
  96. PipelineHandler(pipeline, storageProvider, w, r)
  97. return
  98. }
  99. }
  100. w.WriteHeader(404)
  101. }).Methods("GET")
  102. }
  103. func main() {
  104. appSettings, err := settings.LoadSettings(afero.NewOsFs())
  105. if err != nil {
  106. panic(err)
  107. }
  108. var storageProvider storage.IStorageProvider
  109. if appSettings.StorageProvider.Type == 0 {
  110. storageProvider = storage.GetFileSystemStorageProvider(appSettings.StorageProvider.BasePath, "")
  111. } else {
  112. panic("Invalid file system provided!")
  113. }
  114. pipes := pipelines.LoadPipelines()
  115. authMiddleware := auth.CreateAuthenticationMiddleware(appSettings.Token)
  116. r := mux.NewRouter()
  117. r.Use(authMiddleware.Middleware)
  118. RegisterRoutes(r, pipes, storageProvider)
  119. err = http.ListenAndServe(appSettings.Endpoint, r)
  120. if err != nil {
  121. panic(err)
  122. }
  123. }