Micro-service for file storage and processing written in Go
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

149 lines
3.0 KiB

  1. package pipelines
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/disintegration/imaging"
  8. "github.com/geplauder/lithium/storage"
  9. "io/fs"
  10. "log"
  11. "os"
  12. "path/filepath"
  13. )
  14. // Pipelines
  15. const (
  16. Image PipelineType = iota
  17. Video
  18. )
  19. type PipelineType int
  20. type IPipeline interface {
  21. GetName() string
  22. GetSlug() string
  23. GetType() PipelineType
  24. GetSteps() []Step
  25. Run(string, string, storage.IStorageProvider) (string, error)
  26. }
  27. type Pipeline struct {
  28. Name string `json:"name" faker:"name"`
  29. Slug string `json:"slug" faker:"word"`
  30. Type PipelineType `json:"type" faker:"-"`
  31. RemoveMetadata bool `json:"remove_metadata" faker:"-"`
  32. Steps []Step `json:"steps" faker:"-"`
  33. Output struct {
  34. Format int `json:"format"`
  35. Quality int `json:"quality"`
  36. } `json:"output" faker:"-"`
  37. }
  38. func (p Pipeline) Run(srcPath, bucketName string, storageProvider storage.IStorageProvider) (string, error) {
  39. fmt.Println("path: ", storageProvider.GetPath(bucketName, srcPath))
  40. src, err := imaging.Open(storageProvider.GetPath(bucketName, srcPath))
  41. if err != nil {
  42. return "", errors.New(fmt.Sprintf("error opening file for processing: %s", err))
  43. }
  44. for _, step := range p.GetSteps() {
  45. runner, err := step.GetExecutable()
  46. if err != nil {
  47. return "", err
  48. }
  49. src, err = runner.Execute(src)
  50. if err != nil {
  51. return "", err
  52. }
  53. }
  54. format := imaging.Format(p.Output.Format)
  55. var options []imaging.EncodeOption
  56. if p.Output.Quality != 0 {
  57. options = append(options, imaging.JPEGQuality(p.Output.Quality))
  58. }
  59. // encode image to io buffer
  60. buffer := new(bytes.Buffer)
  61. if err := imaging.Encode(buffer, src, format, options...); err != nil {
  62. return "", err
  63. }
  64. const fileName = "output.jpg" // TODO make variable
  65. _, err = storageProvider.StoreRaw(bucketName, fileName, buffer.Bytes())
  66. if err != nil {
  67. return "", err
  68. }
  69. return fileName, nil
  70. }
  71. func (p Pipeline) GetName() string {
  72. return p.Name
  73. }
  74. func (p Pipeline) GetSlug() string {
  75. return p.Slug
  76. }
  77. func (p Pipeline) GetType() PipelineType {
  78. return p.Type
  79. }
  80. func (p Pipeline) GetSteps() []Step {
  81. return p.Steps
  82. }
  83. type ImagePipeline struct {
  84. Pipeline
  85. }
  86. type VideoPipeline struct {
  87. Pipeline
  88. }
  89. // Deserialization
  90. func DeserializePipelines(pipelines [][]byte) []IPipeline {
  91. var values []IPipeline
  92. for _, pipeline := range pipelines {
  93. var deserializedObject Pipeline
  94. err := json.Unmarshal(pipeline, &deserializedObject)
  95. if err != nil {
  96. log.Fatalf("Could not deserialize pipelines config: %s", err)
  97. }
  98. values = append(values, deserializedObject)
  99. }
  100. return values
  101. }
  102. func LoadPipelines() []IPipeline {
  103. var files [][]byte
  104. path, _ := os.Getwd()
  105. err := filepath.Walk(path+"/config", func(path string, info fs.FileInfo, err error) error {
  106. if err == nil && info.IsDir() == false {
  107. fmt.Println(path)
  108. data, _ := os.ReadFile(path)
  109. files = append(files, data)
  110. }
  111. return nil
  112. })
  113. if err != nil {
  114. panic(err)
  115. }
  116. return DeserializePipelines(files)
  117. }