@@ -32,8 +32,10 @@ namespace data {
3232
3333namespace  {
3434
35+ //  TODO(yye): implement this func and support reading parquet data in
36+ //  streamining way.
3537Status OpenParquetFile (arrow::fs::FileSystem *fs,
36-                        arrow::io::RandomAccessFile *file, ) 
38+                        arrow::io::RandomAccessFile *file) {} 
3739
3840}  //  namespace
3941
@@ -616,55 +618,37 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase {
616618          : ArrowBaseIterator<Dataset>(params) {}
617619
618620     private: 
621+       //  TODO(yye): implementation of getting the first batch.
619622      Status SetupStreamsLocked (Env *env)
620623          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override  {
621624        TF_RETURN_IF_ERROR (ReadFile (current_file_idx_));
622625
626+         //  Open and read parquet file.
623627        while  (record_batches_.empty () &&
624628               ++current_file_idx_ < dataset ()->file_paths_ .size ()) {
625629          TF_RETURN_IF_ERROR (ReadFile (current_file_idx_));
626630        }
627631
628-         if  (current_batch_idx_ < record_batches_.size ()) {
629-           current_batch_ = record_batches_[current_batch_idx_];
630-         }
631632        return  OkStatus ();
632633      }
633634
635+       //  TODO(yye): implementation of getting the next batch.
634636      Status NextStreamLocked (Env *env)
635637          TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override  {
636-         ArrowBaseIterator<Dataset>::NextStreamLocked (env);
637-         if  (++current_batch_idx_ < record_batches_.size ()) {
638-           current_batch_ = record_batches_[current_batch_idx_];
639-         } else  if  (++current_file_idx_ < dataset ()->file_paths_ .size ()) {
640-           current_batch_idx_ = 0 ;
641-           while  (record_batches_.empty () &&
642-                  ++current_file_idx_ < dataset ()->file_paths_ .size ()) {
643-             TF_RETURN_IF_ERROR (ReadFile (current_file_idx_));
644-           }
645- 
646-           if  (!record_batches_.empty ()) {
647-             current_batch_ = record_batches_[current_batch_idx_];
648-           } else  {
649-             current_batch_ = nullptr ;
650-           }
651-         }
652638        return  OkStatus ();
653639      }
654640
655641      void  ResetStreamsLocked () TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override  {
656642        ArrowBaseIterator<Dataset>::ResetStreamsLocked ();
657643        current_file_idx_ = 0 ;
658-         current_batch_idx_ = 0 ;
659644        record_batches_.clear ();
660645      }
661646
662647      Status ReadFile (int  file_index) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
663648        Status res = OkStatus ();
664649        do  {
665650          std::shared_ptr<arrow::io::RandomAccessFile> file;
666-           res = ArrowUtil::OpenParquetFile (&fs_, &file,
667-                                            dataset ()->file_paths_ [file_index]);
651+           res = OpenParquetFile (&fs_, file.get ());
668652          if  (!res.ok ()) {
669653            break ;
670654          }
@@ -682,7 +666,7 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase {
682666          std::unique_ptr<parquet::arrow::FileReader> reader;
683667          builder->properties (properties)->Build (&reader);
684668
685-           if  (column_indices_.empty () || ! dataset ()-> same_header_ ) {
669+           if  (column_indices_.empty ()) {
686670            column_indices_.clear ();
687671            std::shared_ptr<arrow::Schema> schema;
688672            reader->GetSchema (&schema);
@@ -741,9 +725,12 @@ class ArrowParquetDatasetOp : public ArrowOpKernelBase {
741725      }
742726
743727      size_t  current_file_idx_ TF_GUARDED_BY (mu_) = 0;
728+       //  TODO(yye): stop maintaining/holding all the record batches.
744729      std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches_
745730          TF_GUARDED_BY (mu_);
746731      std::shared_ptr<arrow::fs::FileSystem> fs_ TF_GUARDED_BY (mu_) = nullptr;
732+ 
733+       //  Maintains the index of the columns to read.
747734      std::vector<int > column_indices_ TF_GUARDED_BY (mu_);
748735    };
749736
0 commit comments