Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

147 lines
2.9 KiB

  1. package pipelines
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/fs"
  8. "log"
  9. "os"
  10. "path/filepath"
  11. "github.com/disintegration/imaging"
  12. "github.com/geplauder/lithium/storage"
  13. )
  14. // Pipelines
  15. const (
  16. Image PipelineType = iota
  17. Video
  18. )
  19. type PipelineType int
  20. type IPipeline interface {
  21. GetName() string
  22. GetSlug() string
  23. GetType() PipelineType
  24. GetSteps() []Step
  25. Run(string, string, storage.IStorageProvider) (string, error)
  26. }
  27. type Pipeline struct {
  28. Name string `json:"name" faker:"name"`
  29. Slug string `json:"slug" faker:"word"`
  30. Type PipelineType `json:"type" faker:"-"`
  31. RemoveMetadata bool `json:"remove_metadata" faker:"-"`
  32. Steps []Step `json:"steps" faker:"-"`
  33. Output struct {
  34. Format int `json:"format"`
  35. Quality int `json:"quality"`
  36. } `json:"output" faker:"-"`
  37. }
  38. func (p Pipeline) Run(srcPath, bucketName string, storageProvider storage.IStorageProvider) (string, error) {
  39. src, err := imaging.Open(storageProvider.GetPath(bucketName, srcPath))
  40. if err != nil {
  41. return "", errors.New(fmt.Sprintf("error opening file for processing: %s", err))
  42. }
  43. for _, step := range p.GetSteps() {
  44. runner, err := step.GetExecutable()
  45. if err != nil {
  46. return "", err
  47. }
  48. src, err = runner.Execute(src)
  49. if err != nil {
  50. return "", err
  51. }
  52. }
  53. format := imaging.Format(p.Output.Format)
  54. var options []imaging.EncodeOption
  55. if p.Output.Quality != 0 {
  56. options = append(options, imaging.JPEGQuality(p.Output.Quality))
  57. }
  58. // encode image to io buffer
  59. buffer := new(bytes.Buffer)
  60. if err := imaging.Encode(buffer, src, format, options...); err != nil {
  61. return "", err
  62. }
  63. const fileName = "output.jpg" // TODO make variable
  64. _, err = storageProvider.StoreRaw(bucketName, fileName, buffer.Bytes())
  65. if err != nil {
  66. return "", err
  67. }
  68. return fileName, nil
  69. }
  70. func (p Pipeline) GetName() string {
  71. return p.Name
  72. }
  73. func (p Pipeline) GetSlug() string {
  74. return p.Slug
  75. }
  76. func (p Pipeline) GetType() PipelineType {
  77. return p.Type
  78. }
  79. func (p Pipeline) GetSteps() []Step {
  80. return p.Steps
  81. }
  82. type ImagePipeline struct {
  83. Pipeline
  84. }
  85. type VideoPipeline struct {
  86. Pipeline
  87. }
  88. // Deserialization
  89. func DeserializePipelines(pipelines [][]byte) []IPipeline {
  90. var values []IPipeline
  91. for _, pipeline := range pipelines {
  92. var deserializedObject Pipeline
  93. err := json.Unmarshal(pipeline, &deserializedObject)
  94. if err != nil {
  95. log.Fatalf("Could not deserialize pipelines config: %s", err)
  96. }
  97. values = append(values, deserializedObject)
  98. }
  99. return values
  100. }
  101. func LoadPipelines() []IPipeline {
  102. var files [][]byte
  103. path, _ := os.Getwd()
  104. err := filepath.Walk(path+"/config", func(path string, info fs.FileInfo, err error) error {
  105. if err == nil && info.IsDir() == false {
  106. data, _ := os.ReadFile(path)
  107. files = append(files, data)
  108. }
  109. return nil
  110. })
  111. if err != nil {
  112. panic(err)
  113. }
  114. return DeserializePipelines(files)
  115. }