3232 print(f"Results are a preview: {reader.is_preview}")
3333"""
3434
35- from io import BufferedReader , BytesIO
36-
37-
38- import xml .etree .ElementTree as et
39-
40- from collections import OrderedDict
35+ from io import BufferedReader
4136from json import loads as json_loads
4237
43- __all__ = ["ResultsReader" , "Message" , "JSONResultsReader" ]
44-
45- import deprecation
46-
4738
4839class Message :
4940 """This class represents informational messages that Splunk interleaves in the results stream.
@@ -70,205 +61,6 @@ def __hash__(self):
7061 return hash ((self .type , self .message ))
7162
7263
73- class _ConcatenatedStream :
74- """Lazily concatenate zero or more streams into a stream.
75-
76- As you read from the concatenated stream, you get characters from
77- each stream passed to ``_ConcatenatedStream``, in order.
78-
79- **Example**::
80-
81- from StringIO import StringIO
82- s = _ConcatenatedStream(StringIO("abc"), StringIO("def"))
83- assert s.read() == "abcdef"
84- """
85-
86- def __init__ (self , * streams ):
87- self .streams = list (streams )
88-
89- def read (self , n = None ):
90- """Read at most *n* characters from this stream.
91-
92- If *n* is ``None``, return all available characters.
93- """
94- response = b""
95- while len (self .streams ) > 0 and (n is None or n > 0 ):
96- txt = self .streams [0 ].read (n )
97- response += txt
98- if n is not None :
99- n -= len (txt )
100- if n is None or n > 0 :
101- del self .streams [0 ]
102- return response
103-
104-
105- class _XMLDTDFilter :
106- """Lazily remove all XML DTDs from a stream.
107-
108- All substrings matching the regular expression <?[^>]*> are
109- removed in their entirety from the stream. No regular expressions
110- are used, however, so everything still streams properly.
111-
112- **Example**::
113-
114- from StringIO import StringIO
115- s = _XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
116- assert s.read() == "<element></element>"
117- """
118-
119- def __init__ (self , stream ):
120- self .stream = stream
121-
122- def read (self , n = None ):
123- """Read at most *n* characters from this stream.
124-
125- If *n* is ``None``, return all available characters.
126- """
127- response = b""
128- while n is None or n > 0 :
129- c = self .stream .read (1 )
130- if c == b"" :
131- break
132- if c == b"<" :
133- c += self .stream .read (1 )
134- if c == b"<?" :
135- while True :
136- q = self .stream .read (1 )
137- if q == b">" :
138- break
139- else :
140- response += c
141- if n is not None :
142- n -= len (c )
143- else :
144- response += c
145- if n is not None :
146- n -= 1
147- return response
148-
149-
150- @deprecation .deprecated (
151- details = "Use the JSONResultsReader function instead in conjunction with the 'output_mode' query param set to 'json'"
152- )
153- class ResultsReader :
154- """This class returns dictionaries and Splunk messages from an XML results
155- stream.
156-
157- ``ResultsReader`` is iterable, and returns a ``dict`` for results, or a
158- :class:`Message` object for Splunk messages. This class has one field,
159- ``is_preview``, which is ``True`` when the results are a preview from a
160- running search, or ``False`` when the results are from a completed search.
161-
162- This function has no network activity other than what is implicit in the
163- stream it operates on.
164-
165- :param `stream`: The stream to read from (any object that supports
166- ``.read()``).
167-
168- **Example**::
169-
170- import results
171- response = ... # the body of an HTTP response
172- reader = results.ResultsReader(response)
173- for result in reader:
174- if isinstance(result, dict):
175- print(f"Result: {result}")
176- elif isinstance(result, results.Message):
177- print(f"Message: {result}")
178- print(f"is_preview = {reader.is_preview}")
179- """
180-
181- # Be sure to update the docstrings of client.Jobs.oneshot,
182- # client.Job.results_preview and client.Job.results to match any
183- # changes made to ResultsReader.
184- #
185- # This wouldn't be a class, just the _parse_results function below,
186- # except that you cannot get the current generator inside the
187- # function creating that generator. Thus it's all wrapped up for
188- # the sake of one field.
189- def __init__ (self , stream ):
190- # The search/jobs/exports endpoint, when run with
191- # earliest_time=rt and latest_time=rt streams a sequence of
192- # XML documents, each containing a result, as opposed to one
193- # results element containing lots of results. Python's XML
194- # parsers are broken, and instead of reading one full document
195- # and returning the stream that follows untouched, they
196- # destroy the stream and throw an error. To get around this,
197- # we remove all the DTD definitions inline, then wrap the
198- # fragments in a fiction <doc> element to make the parser happy.
199- stream = _XMLDTDFilter (stream )
200- stream = _ConcatenatedStream (BytesIO (b"<doc>" ), stream , BytesIO (b"</doc>" ))
201- self .is_preview = None
202- self ._gen = self ._parse_results (stream )
203-
204- def __iter__ (self ):
205- return self
206-
207- def __next__ (self ):
208- return next (self ._gen )
209-
210- def _parse_results (self , stream ):
211- """Parse results and messages out of *stream*."""
212- result = None
213- values = None
214- try :
215- for event , elem in et .iterparse (stream , events = ("start" , "end" )):
216- if elem .tag == "results" and event == "start" :
217- # The wrapper element is a <results preview="0|1">. We
218- # don't care about it except to tell is whether these
219- # are preview results, or the final results from the
220- # search.
221- is_preview = elem .attrib ["preview" ] == "1"
222- self .is_preview = is_preview
223- if elem .tag == "result" :
224- if event == "start" :
225- result = OrderedDict ()
226- elif event == "end" :
227- yield result
228- result = None
229- elem .clear ()
230-
231- elif elem .tag == "field" and result is not None :
232- # We need the 'result is not None' check because
233- # 'field' is also the element name in the <meta>
234- # header that gives field order, which is not what we
235- # want at all.
236- if event == "start" :
237- values = []
238- elif event == "end" :
239- field_name = elem .attrib ["k" ]
240- if len (values ) == 1 :
241- result [field_name ] = values [0 ]
242- else :
243- result [field_name ] = values
244- # Calling .clear() is necessary to let the
245- # element be garbage collected. Otherwise
246- # arbitrarily large results sets will use
247- # arbitrarily large memory intead of
248- # streaming.
249- elem .clear ()
250-
251- elif elem .tag in ("text" , "v" ) and event == "end" :
252- text = "" .join (elem .itertext ())
253- values .append (text )
254- elem .clear ()
255-
256- elif elem .tag == "msg" :
257- if event == "start" :
258- msg_type = elem .attrib ["type" ]
259- elif event == "end" :
260- text = elem .text if elem .text is not None else ""
261- yield Message (msg_type , text )
262- elem .clear ()
263- except SyntaxError as pe :
264- # This is here to handle the same incorrect return from
265- # splunk that is described in __init__.
266- if "no element found" in pe .msg :
267- return
268- else :
269- raise
270-
271-
27264class JSONResultsReader :
27365 """This class returns dictionaries and Splunk messages from a JSON results
27466 stream.
@@ -303,7 +95,7 @@ class JSONResultsReader:
30395 # except that you cannot get the current generator inside the
30496 # function creating that generator. Thus it's all wrapped up for
30597 # the sake of one field.
306- def __init__ (self , stream ):
98+ def __init__ (self , stream ) -> None :
30799 # The search/jobs/exports endpoint, when run with
308100 # earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
309101 # JSON documents, each containing a result, as opposed to one
0 commit comments