Skip to content

Commit 6a5b84c

Browse files
authored
Merge pull request #6 from clue-labs/assoc
Add AssocDecoder to respect field names and parse CSV into assoc arrays
2 parents d0f7642 + a8b819b commit 6a5b84c

File tree

9 files changed

+446
-18
lines changed

9 files changed

+446
-18
lines changed

README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ rows efficiently without having to load the whole file into memory at once.
2727
* [CSV format](#csv-format)
2828
* [Usage](#usage)
2929
* [Decoder](#decoder)
30+
* [AssocDecoder](#assocdecoder)
3031
* [Encoder](#encoder)
3132
* [Install](#install)
3233
* [Tests](#tests)
@@ -221,6 +222,54 @@ $stream->pipe($logger);
221222
For more details, see ReactPHP's
222223
[`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface).
223224

225+
### AssocDecoder
226+
227+
The `AssocDecoder` (parser) class can be used to make sure you only get back
228+
complete, valid CSV elements when reading from a stream.
229+
It wraps a given
230+
[`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
231+
and exposes its data through the same interface, but emits the CSV elements
232+
as parsed assoc arrays instead of just chunks of strings:
233+
234+
```
235+
name,id
236+
test,1
237+
"hello world",2
238+
```
239+
```php
240+
$stdin = new ReadableResourceStream(STDIN, $loop);
241+
242+
$stream = new Decoder($stdin);
243+
244+
$stream->on('data', function ($data) {
245+
// data is a parsed element from the CSV stream
246+
// line 1: $data = array('name' => 'test', 'id' => '1');
247+
// line 2: $data = array('name' => 'hello world', 'id' => '2');
248+
var_dump($data);
249+
});
250+
```
251+
252+
Whether field names are used is application-dependant, so this library makes no
253+
attempt at *guessing* whether the first line contains field names or field
254+
values. For many common use cases it's a good idea to include them and
255+
explicitly use this class instead of the underlying [`Decoder`](#decoder).
256+
257+
In fact, it uses the [`Decoder`](#decoder) class internally. The only difference
258+
is that this class requires the first line to include the name of headers and
259+
will use this as keys for all following row data which will be emitted as
260+
assoc arrays.
261+
262+
This implies that the input stream MUST start with one row of header names and
263+
MUST use the same number of columns for all records. If the input stream does
264+
not emit any data, if any row does not contain the same number of columns,
265+
if the input stream does not represent a valid CSV stream or if the input stream
266+
emits an `error` event, this decoder will emit an appropriate `error` event and
267+
close the input stream.
268+
269+
This class otherwise accepts the same arguments and follows the exact same
270+
behavior of the underlying [`Decoder`](#decoder) class. For more details, see
271+
the [`Decoder`](#decoder) class.
272+
224273
### Encoder
225274

226275
The `Encoder` (serializer) class can be used to make sure anything you write to

examples/01-count.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
<?php
22

3-
use Clue\React\Csv\Decoder;
3+
// $ php examples/01-count.php < examples/users.csv
4+
5+
use Clue\React\Csv\AssocDecoder;
46
use React\EventLoop\Factory;
57
use React\Stream\ReadableResourceStream;
68
use React\Stream\WritableResourceStream;
@@ -15,7 +17,7 @@
1517

1618
$delimiter = isset($argv[1]) ? $argv[1] : ',';
1719

18-
$decoder = new Decoder($in, $delimiter);
20+
$decoder = new AssocDecoder($in, $delimiter);
1921

2022
$count = 0;
2123
$decoder->on('data', function () use (&$count) {
@@ -32,7 +34,7 @@
3234
});
3335

3436
$info->write('You can pipe/write a valid CSV stream to STDIN' . PHP_EOL);
35-
$info->write('The resulting number of records (rows) will be printed to STDOUT' . PHP_EOL);
37+
$info->write('The resulting number of records (rows minus header row) will be printed to STDOUT' . PHP_EOL);
3638
$info->write('Invalid CSV will raise an error on STDERR and exit with code 1' . PHP_EOL);
3739

3840
$loop->run();

examples/02-validate.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
<?php
22

3+
// $ php examples/02-validate.php < examples/users.csv
4+
5+
use Clue\React\Csv\Decoder;
6+
use Clue\React\Csv\Encoder;
37
use React\EventLoop\Factory;
48
use React\Stream\ReadableResourceStream;
59
use React\Stream\WritableResourceStream;
6-
use Clue\React\Csv\Decoder;
7-
use Clue\React\Csv\Encoder;
810

911
require __DIR__ . '/../vendor/autoload.php';
1012

examples/11-csv2ndjson.php

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
<?php
22

3+
// $ php examples/11-csv2ndjson.php < examples/users.csv > examples/users.ndjson
4+
5+
use Clue\React\Csv\AssocDecoder;
36
use React\EventLoop\Factory;
47
use React\Stream\ReadableResourceStream;
58
use React\Stream\WritableResourceStream;
6-
use Clue\React\Csv\Decoder;
79
use React\Stream\ThroughStream;
810

911
require __DIR__ . '/../vendor/autoload.php';
@@ -17,20 +19,18 @@
1719

1820
$delimiter = isset($argv[1]) ? $argv[1] : ',';
1921

20-
$decoder = new Decoder($in, $delimiter);
22+
$decoder = new AssocDecoder($in, $delimiter);
2123

22-
$headers = array();
23-
$encoder = new ThroughStream(function ($data) use (&$headers) {
24-
return json_encode(array_combine($headers, $data), JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE) . "\n";
25-
});
26-
$encoder->pipe($out);
24+
$encoder = new ThroughStream(function ($data) {
25+
$data = \array_filter($data, function ($one) {
26+
return ($one !== '');
27+
});
2728

28-
// first row from decoder will be used as header values, then start piping to encoder
29-
$decoder->once('data', function ($data) use (&$headers, $decoder, $encoder) {
30-
$headers = $data;
31-
$decoder->pipe($encoder);
29+
return \json_encode($data, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE) . "\n";
3230
});
3331

32+
$decoder->pipe($encoder)->pipe($out);
33+
3434
$decoder->on('error', function (Exception $e) use ($info, &$exit) {
3535
$info->write('ERROR: ' . $e->getMessage() . PHP_EOL);
3636
$exit = 1;

examples/users.csv

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
name,birthday,ip
2+
first,2017-01-01,1.1.1.1
3+
second,2006-01-01,2.1.1.1
4+
third,1995-01-01,3.1.1.1
5+
fourth,1984-01-01,
6+
fifth,1973-01-01,5.1.1.1
7+
sixth,1962-01-01,6.1.1.1
8+
seventh,1951-01-01,7.1.1.1
9+
eighth,1940-01-01,8.1.1.1
10+
nineth,1939-01-01,9.1.1.1
11+
tenth,1928-01-01,

src/AssocDecoder.php

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
<?php
2+
3+
namespace Clue\React\Csv;
4+
5+
use Evenement\EventEmitter;
6+
use React\Stream\ReadableStreamInterface;
7+
use React\Stream\WritableStreamInterface;
8+
use React\Stream\Util;
9+
10+
/**
11+
* The AssocDecoder / Parser reads from a plain stream and emits assoc data arrays for each CSV record
12+
*/
13+
class AssocDecoder extends EventEmitter implements ReadableStreamInterface
14+
{
15+
private $input;
16+
private $expected;
17+
private $headers = array();
18+
private $closed = false;
19+
20+
/**
21+
* @param ReadableStreamInterface $input
22+
* @param string $delimiter
23+
* @param string $enclosure
24+
* @param string $escapeChar
25+
* @param int $maxlength
26+
*/
27+
public function __construct(ReadableStreamInterface $input, $delimiter = ',', $enclosure = '"', $escapeChar = '\\', $maxlength = 65536)
28+
{
29+
$this->input = new Decoder($input, $delimiter, $enclosure, $escapeChar, $maxlength);
30+
31+
if (!$input->isReadable()) {
32+
$this->close();
33+
return;
34+
}
35+
36+
$this->input->on('data', array($this, 'handleData'));
37+
$this->input->on('end', array($this, 'handleEnd'));
38+
$this->input->on('error', array($this, 'handleError'));
39+
$this->input->on('close', array($this, 'close'));
40+
}
41+
42+
public function isReadable()
43+
{
44+
return !$this->closed;
45+
}
46+
47+
public function close()
48+
{
49+
if ($this->closed) {
50+
return;
51+
}
52+
53+
$this->closed = true;
54+
$this->input->close();
55+
56+
$this->emit('close');
57+
$this->removeAllListeners();
58+
}
59+
60+
public function pause()
61+
{
62+
$this->input->pause();
63+
}
64+
65+
public function resume()
66+
{
67+
$this->input->resume();
68+
}
69+
70+
public function pipe(WritableStreamInterface $dest, array $options = array())
71+
{
72+
Util::pipe($this, $dest, $options);
73+
74+
return $dest;
75+
}
76+
77+
/** @internal */
78+
public function handleData($data)
79+
{
80+
if ($this->expected === null) {
81+
$this->headers = $data;
82+
$this->expected = \count($data);
83+
} else {
84+
if (\count($data) !== $this->expected) {
85+
$this->handleError(new \UnexpectedValueException(
86+
'Expected record with ' . $this->expected . ' columns, but got ' . \count($data) . ' instead')
87+
);
88+
return;
89+
}
90+
91+
$this->emit('data', array(
92+
\array_combine($this->headers, $data)
93+
));
94+
}
95+
}
96+
97+
/** @internal */
98+
public function handleEnd()
99+
{
100+
if ($this->headers === array()) {
101+
$this->handleError(new \UnderflowException('Stream ended without headers'));
102+
}
103+
104+
if (!$this->closed) {
105+
$this->emit('end');
106+
$this->close();
107+
}
108+
}
109+
110+
/** @internal */
111+
public function handleError(\Exception $error)
112+
{
113+
$this->emit('error', array($error));
114+
$this->close();
115+
}
116+
}

src/Decoder.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ class Decoder extends EventEmitter implements ReadableStreamInterface
2323
private $offset = 0;
2424
private $closed = false;
2525

26+
/**
27+
* @param ReadableStreamInterface $input
28+
* @param string $delimiter
29+
* @param string $enclosure
30+
* @param string $escapeChar
31+
* @param int $maxlength
32+
*/
2633
public function __construct(ReadableStreamInterface $input, $delimiter = ',', $enclosure = '"', $escapeChar = '\\', $maxlength = 65536)
2734
{
2835
$this->input = $input;
@@ -32,7 +39,8 @@ public function __construct(ReadableStreamInterface $input, $delimiter = ',', $e
3239
$this->maxlength = $maxlength;
3340

3441
if (!$input->isReadable()) {
35-
return $this->close();
42+
$this->close();
43+
return;
3644
}
3745

3846
$this->input->on('data', array($this, 'handleData'));

src/Encoder.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ class Encoder extends EventEmitter implements WritableStreamInterface
1818
private $enclosure;
1919
private $escapeChar;
2020

21+
/**
22+
* @param WritableStreamInterface $output
23+
* @param string $delimiter
24+
* @param string $enclosure
25+
* @param string $escapeChar
26+
* @throws \BadMethodCallException
27+
*/
2128
public function __construct(WritableStreamInterface $output, $delimiter = ',', $enclosure = '"', $escapeChar = '\\')
2229
{
2330
// @codeCoverageIgnoreStart
@@ -32,7 +39,8 @@ public function __construct(WritableStreamInterface $output, $delimiter = ',', $
3239
$this->escapeChar = $escapeChar;
3340

3441
if (!$output->isWritable()) {
35-
return $this->close();
42+
$this->close();
43+
return;
3644
}
3745

3846
$this->temp = fopen('php://memory', 'r+');

0 commit comments

Comments
 (0)