Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
3.2 KiB

  1. package pipelines
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/fs"
  8. "os"
  9. "path/filepath"
  10. "github.com/disintegration/imaging"
  11. "github.com/geplauder/lithium/storage"
  12. "github.com/sirupsen/logrus"
  13. )
  14. // Pipelines
  15. const (
  16. Image PipelineType = iota
  17. Video
  18. )
  19. type PipelineType int
  20. type IPipeline interface {
  21. GetName() string
  22. GetSlug() string
  23. GetType() PipelineType
  24. GetSteps() []Step
  25. Run(string, string, storage.IStorageProvider) (string, error)
  26. }
  27. type Pipeline struct {
  28. Name string `json:"name" faker:"name"`
  29. Slug string `json:"slug" faker:"word"`
  30. Type PipelineType `json:"type" faker:"-"`
  31. RemoveMetadata bool `json:"remove_metadata" faker:"-"`
  32. Steps []Step `json:"steps" faker:"-"`
  33. Output struct {
  34. Format string `json:"format"`
  35. Quality int `json:"quality"`
  36. } `json:"output" faker:"-"`
  37. }
  38. func (p Pipeline) Run(srcPath, bucketName string, storageProvider storage.IStorageProvider) (string, error) {
  39. src, err := imaging.Open(storageProvider.GetPath(bucketName, srcPath))
  40. if err != nil {
  41. return "", errors.New(fmt.Sprintf("error opening file for processing: %s", err))
  42. }
  43. for _, step := range p.GetSteps() {
  44. runner, err := step.GetExecutable()
  45. if err != nil {
  46. return "", err
  47. }
  48. src, err = runner.Execute(src)
  49. if err != nil {
  50. return "", err
  51. }
  52. }
  53. outputFormat := p.Output.Format
  54. if outputFormat == "" {
  55. outputFormat = "jpg"
  56. }
  57. format, err := imaging.FormatFromExtension(outputFormat)
  58. if err != nil {
  59. return "", errors.New(fmt.Sprintf("output format '%s' is not supported", outputFormat))
  60. }
  61. var options []imaging.EncodeOption
  62. if p.Output.Quality != 0 {
  63. options = append(options, imaging.JPEGQuality(p.Output.Quality))
  64. }
  65. // encode image to io buffer
  66. buffer := new(bytes.Buffer)
  67. if err := imaging.Encode(buffer, src, format, options...); err != nil {
  68. return "", err
  69. }
  70. const fileName = "output.jpg" // TODO make variable
  71. _, err = storageProvider.StoreRaw(bucketName, fileName, buffer.Bytes())
  72. if err != nil {
  73. return "", err
  74. }
  75. return fileName, nil
  76. }
  77. func (p Pipeline) GetName() string {
  78. return p.Name
  79. }
  80. func (p Pipeline) GetSlug() string {
  81. return p.Slug
  82. }
  83. func (p Pipeline) GetType() PipelineType {
  84. return p.Type
  85. }
  86. func (p Pipeline) GetSteps() []Step {
  87. return p.Steps
  88. }
  89. type ImagePipeline struct {
  90. Pipeline
  91. }
  92. type VideoPipeline struct {
  93. Pipeline
  94. }
  95. // Deserialization
  96. func DeserializePipelines(pipelines [][]byte) []IPipeline {
  97. var values []IPipeline
  98. for _, pipeline := range pipelines {
  99. var deserializedObject Pipeline
  100. err := json.Unmarshal(pipeline, &deserializedObject)
  101. if err != nil {
  102. logrus.Fatalf("Could not deserialize pipelines config: %s", err)
  103. }
  104. values = append(values, deserializedObject)
  105. }
  106. return values
  107. }
  108. func LoadPipelines() []IPipeline {
  109. var files [][]byte
  110. path, _ := os.Getwd()
  111. err := filepath.Walk(path+"/config", func(path string, info fs.FileInfo, err error) error {
  112. if err == nil && info.IsDir() == false {
  113. data, _ := os.ReadFile(path)
  114. files = append(files, data)
  115. }
  116. return nil
  117. })
  118. if err != nil {
  119. logrus.Fatal("Unexpected error while loading pipelines: ", err)
  120. }
  121. return DeserializePipelines(files)
  122. }