Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
2.7 KiB

  1. package pipelines
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/disintegration/imaging"
  8. "github.com/geplauder/lithium/storage"
  9. "io/fs"
  10. "log"
  11. "os"
  12. "path/filepath"
  13. )
  14. // Pipelines
  15. const (
  16. Image PipelineType = iota
  17. Video
  18. )
  19. type PipelineType int
  20. type IPipeline interface {
  21. GetName() string
  22. GetSlug() string
  23. GetType() PipelineType
  24. GetSteps() []Step
  25. Run(string, string, storage.IStorageProvider) (string, error)
  26. }
  27. type Pipeline struct {
  28. Name string `json:"name" faker:"name"`
  29. Slug string `json:"slug" faker:"word"`
  30. Type PipelineType `json:"type" faker:"-"`
  31. RemoveMetadata bool `json:"remove_metadata" faker:"-"`
  32. Steps []Step `json:"steps" faker:"-"`
  33. }
  34. func (p Pipeline) Run(srcPath, bucketName string, storageProvider storage.IStorageProvider) (string, error) {
  35. fmt.Println("path: ", storageProvider.GetPath(bucketName, srcPath))
  36. src, err := imaging.Open(storageProvider.GetPath(bucketName, srcPath))
  37. if err != nil {
  38. return "", errors.New(fmt.Sprintf("error opening file for processing: %s", err))
  39. }
  40. for _, step := range p.GetSteps() {
  41. runner, err := step.GetExecutable()
  42. if err != nil {
  43. return "", err
  44. }
  45. src, err = runner.Execute(src)
  46. if err != nil {
  47. return "", err
  48. }
  49. }
  50. // encode image to io buffer
  51. buffer := new(bytes.Buffer)
  52. if err := imaging.Encode(buffer, src, imaging.JPEG); err != nil {
  53. return "", err
  54. }
  55. const fileName = "output.jpg" // TODO make variable
  56. _, err = storageProvider.StoreRaw(bucketName, fileName, buffer.Bytes())
  57. if err != nil {
  58. return "", err
  59. }
  60. return fileName, nil
  61. }
  62. func (p Pipeline) GetName() string {
  63. return p.Name
  64. }
  65. func (p Pipeline) GetSlug() string {
  66. return p.Slug
  67. }
  68. func (p Pipeline) GetType() PipelineType {
  69. return p.Type
  70. }
  71. func (p Pipeline) GetSteps() []Step {
  72. return p.Steps
  73. }
  74. type ImagePipeline struct {
  75. Pipeline
  76. }
  77. type VideoPipeline struct {
  78. Pipeline
  79. }
  80. // Deserialization
  81. func DeserializePipelines(pipelines [][]byte) []IPipeline {
  82. var values []IPipeline
  83. for _, pipeline := range pipelines {
  84. var deserializedObject Pipeline
  85. err := json.Unmarshal(pipeline, &deserializedObject)
  86. if err != nil {
  87. log.Fatalf("Could not deserialize pipelines config: %s", err)
  88. }
  89. values = append(values, deserializedObject)
  90. }
  91. return values
  92. }
  93. func LoadPipelines() []IPipeline {
  94. var files [][]byte
  95. path, _ := os.Getwd()
  96. err := filepath.Walk(path+"/config", func(path string, info fs.FileInfo, err error) error {
  97. if err == nil && info.IsDir() == false {
  98. fmt.Println(path)
  99. data, _ := os.ReadFile(path)
  100. files = append(files, data)
  101. }
  102. return nil
  103. })
  104. if err != nil {
  105. panic(err)
  106. }
  107. return DeserializePipelines(files)
  108. }