libpqxx
pipeline.hxx
1 
13 #ifndef PQXX_H_PIPELINE
14 #define PQXX_H_PIPELINE
15 
16 #include "pqxx/compiler-public.hxx"
17 #include "pqxx/compiler-internal-pre.hxx"
18 
19 #include <limits>
20 #include <map>
21 #include <string>
22 
23 #include "pqxx/transaction_base.hxx"
24 
25 
26 // Methods tested in eg. test module test01 are marked with "//[t01]".
27 
28 namespace pqxx
29 {
30 
32 
48 class PQXX_LIBEXPORT pipeline : public internal::transactionfocus
49 {
50 public:
51  using query_id = long;
52 
53  pipeline(const pipeline &) =delete;
54  pipeline &operator=(const pipeline &) =delete;
55 
56  explicit pipeline( //[t69]
58  const std::string &Name=std::string{});
59 
60  ~pipeline() noexcept;
61 
63 
69  query_id insert(const std::string &); //[t69]
70 
72 
73  void complete(); //[t71]
74 
76 
85  void flush(); //[t70]
86 
88 
96  void cancel();
97 
99  bool is_finished(query_id) const; //[t71]
100 
102 
108  result retrieve(query_id qid) //[t71]
109  { return retrieve(m_queries.find(qid)).second; }
110 
112 
113  std::pair<query_id, result> retrieve(); //[t69]
114 
115  bool empty() const noexcept { return m_queries.empty(); } //[t69]
116 
118 
129  int retain(int retain_max=2); //[t70]
130 
131 
133  void resume(); //[t70]
134 
135 private:
136  class PQXX_PRIVATE Query
137  {
138  public:
139  explicit Query(const std::string &q) : m_query{q}, m_res{} {}
140 
141  const result &get_result() const noexcept { return m_res; }
142  void set_result(const result &r) noexcept { m_res = r; }
143  const std::string &get_query() const noexcept { return m_query; }
144 
145  private:
146  std::string m_query;
147  result m_res;
148  };
149 
150  using QueryMap = std::map<query_id,Query>;
151 
152  void attach();
153  void detach();
154 
156  static constexpr query_id qid_limit() noexcept
157  {
158  // Parenthesise this to work around an eternal Visual C++ problem:
159  // Without the extra parentheses, unless NOMINMAX is defined, the
160  // preprocessor will mistake this "max" for its annoying built-in macro
161  // of the same name.
162  return (std::numeric_limits<query_id>::max)();
163  }
164 
166  PQXX_PRIVATE query_id generate_id();
167 
168  bool have_pending() const noexcept
169  { return m_issuedrange.second != m_issuedrange.first; }
170 
171  PQXX_PRIVATE void issue();
172 
174  void set_error_at(query_id qid) noexcept
175  { if (qid < m_error) m_error = qid; }
176 
178  [[noreturn]] PQXX_PRIVATE void internal_error(const std::string &err);
179 
180  PQXX_PRIVATE bool obtain_result(bool expect_none=false);
181 
182  PQXX_PRIVATE void obtain_dummy();
183  PQXX_PRIVATE void get_further_available_results();
184  PQXX_PRIVATE void check_end_results();
185 
187  PQXX_PRIVATE void receive_if_available();
188 
190  PQXX_PRIVATE void receive(pipeline::QueryMap::const_iterator stop);
191  std::pair<pipeline::query_id, result>
192  retrieve(pipeline::QueryMap::iterator);
193 
194  QueryMap m_queries;
195  std::pair<QueryMap::iterator,QueryMap::iterator> m_issuedrange;
196  int m_retain = 0;
197  int m_num_waiting = 0;
198  query_id m_q_id = 0;
199 
201  bool m_dummy_pending = false;
202 
204  query_id m_error = qid_limit();
205 };
206 
207 } // namespace
208 
209 #include "pqxx/compiler-internal-post.hxx"
210 #endif
pqxx::result::size
PQXX_PURE size_type size() const noexcept
Definition: result.cxx:94
pqxx::pipeline::complete
void complete()
Wait for all ongoing or pending operations to complete.
Definition: pipeline.cxx:85
pqxx::internal
Private namespace for libpqxx's internal use; do not access.
Definition: connection_base.hxx:43
pqxx::pipeline::pipeline
pipeline(const pipeline &)=delete
pqxx::pipeline::empty
bool empty() const noexcept
Definition: pipeline.hxx:115
pqxx::pipeline
Processes several queries in FIFO manner, optimized for high throughput.
Definition: pipeline.hxx:48
pqxx::pipeline::is_finished
bool is_finished(query_id) const
Is result for given query available?
Definition: pipeline.cxx:123
pqxx::result
Result set containing data returned by a query or command.
Definition: result.hxx:69
pqxx::pipeline::resume
void resume()
Resume retained query emission (harmless when not needed)
Definition: pipeline.cxx:158
pqxx::pipeline::insert
query_id insert(const std::string &)
Add query to the pipeline.
Definition: pipeline.cxx:62
pqxx::pipeline::flush
void flush()
Forget all ongoing or pending operations and retrieved results.
Definition: pipeline.cxx:97
pqxx::sql_error
Exception class for failed queries.
Definition: except.hxx:130
pqxx::internal::enc_group
encoding_group enc_group(int libpq_enc_id)
Definition: encodings.cxx:637
pqxx::pipeline::retain
int retain(int retain_max=2)
Set maximum number of queries to retain before issuing them to the backend.
Definition: pipeline.cxx:142
pqxx::separated_list
std::string separated_list(const std::string &sep, ITER begin, ITER end, ACCESS access)
Represent sequence of values as a string, joined by a given separator.
Definition: util.hxx:95
pqxx::internal_error
Internal error in libpqxx library.
Definition: except.hxx:207
pqxx::broken_connection
Exception class for lost or failed backend connection.
Definition: except.hxx:118
pqxx::pipeline::retrieve
std::pair< query_id, result > retrieve()
Retrieve oldest unretrieved result (possibly wait for one)
Definition: pipeline.cxx:134
pqxx::result::at
const row at(size_type) const
Definition: result.cxx:131
pqxx::internal::namedclass
Helper base class: object descriptions for error messages and such.
Definition: util.hxx:233
pqxx::internal::transactionfocus
Definition: transaction_base.hxx:43
pqxx::to_string
std::string to_string(const field &Obj)
Convert a field to a string.
Definition: result.cxx:451
pqxx::pipeline::~pipeline
~pipeline() noexcept
Definition: pipeline.cxx:43
pqxx::pipeline::query_id
long query_id
Definition: pipeline.hxx:51
pqxx::range_error
Something is out of range, similar to std::out_of_range.
Definition: except.hxx:251
pqxx::transaction_base
Interface definition (and common code) for "transaction" classes.
Definition: transaction_base.hxx:136
pqxx
The home of all libpqxx classes, functions, templates, etc.
Definition: array.hxx:25
pqxx::pipeline::cancel
void cancel()
Cancel ongoing query, if any.
Definition: pipeline.cxx:111