summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/aclk-schemas/.gitignore11
-rw-r--r--aclk/aclk-schemas/.travis.yml4
-rw-r--r--aclk/aclk-schemas/LICENSE674
-rw-r--r--aclk/aclk-schemas/Makefile74
-rw-r--r--aclk/aclk-schemas/README.md2
-rw-r--r--aclk/aclk-schemas/buf.yml9
-rw-r--r--aclk/aclk-schemas/proto/aclk/v1/lib.proto22
-rw-r--r--aclk/aclk-schemas/proto/agent/v1/cmds.proto79
-rw-r--r--aclk/aclk-schemas/proto/agent/v1/connection.proto59
-rw-r--r--aclk/aclk-schemas/proto/agent/v1/disconnect.proto16
-rw-r--r--aclk/aclk-schemas/proto/alarm/v1/config.proto61
-rw-r--r--aclk/aclk-schemas/proto/alarm/v1/stream.proto148
-rw-r--r--aclk/aclk-schemas/proto/chart/v1/config.proto37
-rw-r--r--aclk/aclk-schemas/proto/chart/v1/dimension.proto24
-rw-r--r--aclk/aclk-schemas/proto/chart/v1/instance.proto32
-rw-r--r--aclk/aclk-schemas/proto/chart/v1/stream.proto86
-rw-r--r--aclk/aclk-schemas/proto/context/v1/context.proto57
-rw-r--r--aclk/aclk-schemas/proto/context/v1/stream.proto34
-rw-r--r--aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto37
-rw-r--r--aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto32
-rw-r--r--aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto148
-rw-r--r--aclk/aclk.c97
-rw-r--r--aclk/aclk.h2
-rw-r--r--aclk/aclk_query.c35
-rw-r--r--aclk/aclk_rx_msgs.c4
-rw-r--r--aclk/aclk_tx_msgs.c4
-rw-r--r--aclk/aclk_util.c4
-rw-r--r--aclk/aclk_util.h3
-rw-r--r--aclk/https_client.c119
-rw-r--r--aclk/https_client.h55
30 files changed, 1848 insertions, 121 deletions
diff --git a/aclk/aclk-schemas/.gitignore b/aclk/aclk-schemas/.gitignore
new file mode 100644
index 000000000..bd495e9a7
--- /dev/null
+++ b/aclk/aclk-schemas/.gitignore
@@ -0,0 +1,11 @@
+*.pb.go
+
+#Agent
+*.pb.cc
+*.pb.h
+*.pb.o
+*.pb.Po
+.dirstamp
+
+#Jetbrains
+.idea
diff --git a/aclk/aclk-schemas/.travis.yml b/aclk/aclk-schemas/.travis.yml
new file mode 100644
index 000000000..7c99550fe
--- /dev/null
+++ b/aclk/aclk-schemas/.travis.yml
@@ -0,0 +1,4 @@
+---
+language: minimal
+install: make deps
+script: make CI
diff --git a/aclk/aclk-schemas/LICENSE b/aclk/aclk-schemas/LICENSE
new file mode 100644
index 000000000..f288702d2
--- /dev/null
+++ b/aclk/aclk-schemas/LICENSE
@@ -0,0 +1,674 @@
+ GNU GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The GNU General Public License is a free, copyleft license for
+software and other kinds of works.
+
+ The licenses for most software and other practical works are designed
+to take away your freedom to share and change the works. By contrast,
+the GNU General Public License is intended to guarantee your freedom to
+share and change all versions of a program--to make sure it remains free
+software for all its users. We, the Free Software Foundation, use the
+GNU General Public License for most of our software; it applies also to
+any other work released this way by its authors. You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+them if you wish), that you receive source code or can get it if you
+want it, that you can change the software or use pieces of it in new
+free programs, and that you know you can do these things.
+
+ To protect your rights, we need to prevent others from denying you
+these rights or asking you to surrender the rights. Therefore, you have
+certain responsibilities if you distribute copies of the software, or if
+you modify it: responsibilities to respect the freedom of others.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must pass on to the recipients the same
+freedoms that you received. You must make sure that they, too, receive
+or can get the source code. And you must show them these terms so they
+know their rights.
+
+ Developers that use the GNU GPL protect your rights with two steps:
+(1) assert copyright on the software, and (2) offer you this License
+giving you legal permission to copy, distribute and/or modify it.
+
+ For the developers' and authors' protection, the GPL clearly explains
+that there is no warranty for this free software. For both users' and
+authors' sake, the GPL requires that modified versions be marked as
+changed, so that their problems will not be attributed erroneously to
+authors of previous versions.
+
+ Some devices are designed to deny users access to install or run
+modified versions of the software inside them, although the manufacturer
+can do so. This is fundamentally incompatible with the aim of
+protecting users' freedom to change the software. The systematic
+pattern of such abuse occurs in the area of products for individuals to
+use, which is precisely where it is most unacceptable. Therefore, we
+have designed this version of the GPL to prohibit the practice for those
+products. If such problems arise substantially in other domains, we
+stand ready to extend this provision to those domains in future versions
+of the GPL, as needed to protect the freedom of users.
+
+ Finally, every program is threatened constantly by software patents.
+States should not allow patents to restrict development and use of
+software on general-purpose computers, but in those that do, we wish to
+avoid the special danger that patents applied to a free program could
+make it effectively proprietary. To prevent this, the GPL assures that
+patents cannot be used to render the program non-free.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ TERMS AND CONDITIONS
+
+ 0. Definitions.
+
+ "This License" refers to version 3 of the GNU General Public License.
+
+ "Copyright" also means copyright-like laws that apply to other kinds of
+works, such as semiconductor masks.
+
+ "The Program" refers to any copyrightable work licensed under this
+License. Each licensee is addressed as "you". "Licensees" and
+"recipients" may be individuals or organizations.
+
+ To "modify" a work means to copy from or adapt all or part of the work
+in a fashion requiring copyright permission, other than the making of an
+exact copy. The resulting work is called a "modified version" of the
+earlier work or a work "based on" the earlier work.
+
+ A "covered work" means either the unmodified Program or a work based
+on the Program.
+
+ To "propagate" a work means to do anything with it that, without
+permission, would make you directly or secondarily liable for
+infringement under applicable copyright law, except executing it on a
+computer or modifying a private copy. Propagation includes copying,
+distribution (with or without modification), making available to the
+public, and in some countries other activities as well.
+
+ To "convey" a work means any kind of propagation that enables other
+parties to make or receive copies. Mere interaction with a user through
+a computer network, with no transfer of a copy, is not conveying.
+
+ An interactive user interface displays "Appropriate Legal Notices"
+to the extent that it includes a convenient and prominently visible
+feature that (1) displays an appropriate copyright notice, and (2)
+tells the user that there is no warranty for the work (except to the
+extent that warranties are provided), that licensees may convey the
+work under this License, and how to view a copy of this License. If
+the interface presents a list of user commands or options, such as a
+menu, a prominent item in the list meets this criterion.
+
+ 1. Source Code.
+
+ The "source code" for a work means the preferred form of the work
+for making modifications to it. "Object code" means any non-source
+form of a work.
+
+ A "Standard Interface" means an interface that either is an official
+standard defined by a recognized standards body, or, in the case of
+interfaces specified for a particular programming language, one that
+is widely used among developers working in that language.
+
+ The "System Libraries" of an executable work include anything, other
+than the work as a whole, that (a) is included in the normal form of
+packaging a Major Component, but which is not part of that Major
+Component, and (b) serves only to enable use of the work with that
+Major Component, or to implement a Standard Interface for which an
+implementation is available to the public in source code form. A
+"Major Component", in this context, means a major essential component
+(kernel, window system, and so on) of the specific operating system
+(if any) on which the executable work runs, or a compiler used to
+produce the work, or an object code interpreter used to run it.
+
+ The "Corresponding Source" for a work in object code form means all
+the source code needed to generate, install, and (for an executable
+work) run the object code and to modify the work, including scripts to
+control those activities. However, it does not include the work's
+System Libraries, or general-purpose tools or generally available free
+programs which are used unmodified in performing those activities but
+which are not part of the work. For example, Corresponding Source
+includes interface definition files associated with source files for
+the work, and the source code for shared libraries and dynamically
+linked subprograms that the work is specifically designed to require,
+such as by intimate data communication or control flow between those
+subprograms and other parts of the work.
+
+ The Corresponding Source need not include anything that users
+can regenerate automatically from other parts of the Corresponding
+Source.
+
+ The Corresponding Source for a work in source code form is that
+same work.
+
+ 2. Basic Permissions.
+
+ All rights granted under this License are granted for the term of
+copyright on the Program, and are irrevocable provided the stated
+conditions are met. This License explicitly affirms your unlimited
+permission to run the unmodified Program. The output from running a
+covered work is covered by this License only if the output, given its
+content, constitutes a covered work. This License acknowledges your
+rights of fair use or other equivalent, as provided by copyright law.
+
+ You may make, run and propagate covered works that you do not
+convey, without conditions so long as your license otherwise remains
+in force. You may convey covered works to others for the sole purpose
+of having them make modifications exclusively for you, or provide you
+with facilities for running those works, provided that you comply with
+the terms of this License in conveying all material for which you do
+not control copyright. Those thus making or running the covered works
+for you must do so exclusively on your behalf, under your direction
+and control, on terms that prohibit them from making any copies of
+your copyrighted material outside their relationship with you.
+
+ Conveying under any other circumstances is permitted solely under
+the conditions stated below. Sublicensing is not allowed; section 10
+makes it unnecessary.
+
+ 3. Protecting Users' Legal Rights From Anti-Circumvention Law.
+
+ No covered work shall be deemed part of an effective technological
+measure under any applicable law fulfilling obligations under article
+11 of the WIPO copyright treaty adopted on 20 December 1996, or
+similar laws prohibiting or restricting circumvention of such
+measures.
+
+ When you convey a covered work, you waive any legal power to forbid
+circumvention of technological measures to the extent such circumvention
+is effected by exercising rights under this License with respect to
+the covered work, and you disclaim any intention to limit operation or
+modification of the work as a means of enforcing, against the work's
+users, your or third parties' legal rights to forbid circumvention of
+technological measures.
+
+ 4. Conveying Verbatim Copies.
+
+ You may convey verbatim copies of the Program's source code as you
+receive it, in any medium, provided that you conspicuously and
+appropriately publish on each copy an appropriate copyright notice;
+keep intact all notices stating that this License and any
+non-permissive terms added in accord with section 7 apply to the code;
+keep intact all notices of the absence of any warranty; and give all
+recipients a copy of this License along with the Program.
+
+ You may charge any price or no price for each copy that you convey,
+and you may offer support or warranty protection for a fee.
+
+ 5. Conveying Modified Source Versions.
+
+ You may convey a work based on the Program, or the modifications to
+produce it from the Program, in the form of source code under the
+terms of section 4, provided that you also meet all of these conditions:
+
+ a) The work must carry prominent notices stating that you modified
+ it, and giving a relevant date.
+
+ b) The work must carry prominent notices stating that it is
+ released under this License and any conditions added under section
+ 7. This requirement modifies the requirement in section 4 to
+ "keep intact all notices".
+
+ c) You must license the entire work, as a whole, under this
+ License to anyone who comes into possession of a copy. This
+ License will therefore apply, along with any applicable section 7
+ additional terms, to the whole of the work, and all its parts,
+ regardless of how they are packaged. This License gives no
+ permission to license the work in any other way, but it does not
+ invalidate such permission if you have separately received it.
+
+ d) If the work has interactive user interfaces, each must display
+ Appropriate Legal Notices; however, if the Program has interactive
+ interfaces that do not display Appropriate Legal Notices, your
+ work need not make them do so.
+
+ A compilation of a covered work with other separate and independent
+works, which are not by their nature extensions of the covered work,
+and which are not combined with it such as to form a larger program,
+in or on a volume of a storage or distribution medium, is called an
+"aggregate" if the compilation and its resulting copyright are not
+used to limit the access or legal rights of the compilation's users
+beyond what the individual works permit. Inclusion of a covered work
+in an aggregate does not cause this License to apply to the other
+parts of the aggregate.
+
+ 6. Conveying Non-Source Forms.
+
+ You may convey a covered work in object code form under the terms
+of sections 4 and 5, provided that you also convey the
+machine-readable Corresponding Source under the terms of this License,
+in one of these ways:
+
+ a) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by the
+ Corresponding Source fixed on a durable physical medium
+ customarily used for software interchange.
+
+ b) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by a
+ written offer, valid for at least three years and valid for as
+ long as you offer spare parts or customer support for that product
+ model, to give anyone who possesses the object code either (1) a
+ copy of the Corresponding Source for all the software in the
+ product that is covered by this License, on a durable physical
+ medium customarily used for software interchange, for a price no
+ more than your reasonable cost of physically performing this
+ conveying of source, or (2) access to copy the
+ Corresponding Source from a network server at no charge.
+
+ c) Convey individual copies of the object code with a copy of the
+ written offer to provide the Corresponding Source. This
+ alternative is allowed only occasionally and noncommercially, and
+ only if you received the object code with such an offer, in accord
+ with subsection 6b.
+
+ d) Convey the object code by offering access from a designated
+ place (gratis or for a charge), and offer equivalent access to the
+ Corresponding Source in the same way through the same place at no
+ further charge. You need not require recipients to copy the
+ Corresponding Source along with the object code. If the place to
+ copy the object code is a network server, the Corresponding Source
+ may be on a different server (operated by you or a third party)
+ that supports equivalent copying facilities, provided you maintain
+ clear directions next to the object code saying where to find the
+ Corresponding Source. Regardless of what server hosts the
+ Corresponding Source, you remain obligated to ensure that it is
+ available for as long as needed to satisfy these requirements.
+
+ e) Convey the object code using peer-to-peer transmission, provided
+ you inform other peers where the object code and Corresponding
+ Source of the work are being offered to the general public at no
+ charge under subsection 6d.
+
+ A separable portion of the object code, whose source code is excluded
+from the Corresponding Source as a System Library, need not be
+included in conveying the object code work.
+
+ A "User Product" is either (1) a "consumer product", which means any
+tangible personal property which is normally used for personal, family,
+or household purposes, or (2) anything designed or sold for incorporation
+into a dwelling. In determining whether a product is a consumer product,
+doubtful cases shall be resolved in favor of coverage. For a particular
+product received by a particular user, "normally used" refers to a
+typical or common use of that class of product, regardless of the status
+of the particular user or of the way in which the particular user
+actually uses, or expects or is expected to use, the product. A product
+is a consumer product regardless of whether the product has substantial
+commercial, industrial or non-consumer uses, unless such uses represent
+the only significant mode of use of the product.
+
+ "Installation Information" for a User Product means any methods,
+procedures, authorization keys, or other information required to install
+and execute modified versions of a covered work in that User Product from
+a modified version of its Corresponding Source. The information must
+suffice to ensure that the continued functioning of the modified object
+code is in no case prevented or interfered with solely because
+modification has been made.
+
+ If you convey an object code work under this section in, or with, or
+specifically for use in, a User Product, and the conveying occurs as
+part of a transaction in which the right of possession and use of the
+User Product is transferred to the recipient in perpetuity or for a
+fixed term (regardless of how the transaction is characterized), the
+Corresponding Source conveyed under this section must be accompanied
+by the Installation Information. But this requirement does not apply
+if neither you nor any third party retains the ability to install
+modified object code on the User Product (for example, the work has
+been installed in ROM).
+
+ The requirement to provide Installation Information does not include a
+requirement to continue to provide support service, warranty, or updates
+for a work that has been modified or installed by the recipient, or for
+the User Product in which it has been modified or installed. Access to a
+network may be denied when the modification itself materially and
+adversely affects the operation of the network or violates the rules and
+protocols for communication across the network.
+
+ Corresponding Source conveyed, and Installation Information provided,
+in accord with this section must be in a format that is publicly
+documented (and with an implementation available to the public in
+source code form), and must require no special password or key for
+unpacking, reading or copying.
+
+ 7. Additional Terms.
+
+ "Additional permissions" are terms that supplement the terms of this
+License by making exceptions from one or more of its conditions.
+Additional permissions that are applicable to the entire Program shall
+be treated as though they were included in this License, to the extent
+that they are valid under applicable law. If additional permissions
+apply only to part of the Program, that part may be used separately
+under those permissions, but the entire Program remains governed by
+this License without regard to the additional permissions.
+
+ When you convey a copy of a covered work, you may at your option
+remove any additional permissions from that copy, or from any part of
+it. (Additional permissions may be written to require their own
+removal in certain cases when you modify the work.) You may place
+additional permissions on material, added by you to a covered work,
+for which you have or can give appropriate copyright permission.
+
+ Notwithstanding any other provision of this License, for material you
+add to a covered work, you may (if authorized by the copyright holders of
+that material) supplement the terms of this License with terms:
+
+ a) Disclaiming warranty or limiting liability differently from the
+ terms of sections 15 and 16 of this License; or
+
+ b) Requiring preservation of specified reasonable legal notices or
+ author attributions in that material or in the Appropriate Legal
+ Notices displayed by works containing it; or
+
+ c) Prohibiting misrepresentation of the origin of that material, or
+ requiring that modified versions of such material be marked in
+ reasonable ways as different from the original version; or
+
+ d) Limiting the use for publicity purposes of names of licensors or
+ authors of the material; or
+
+ e) Declining to grant rights under trademark law for use of some
+ trade names, trademarks, or service marks; or
+
+ f) Requiring indemnification of licensors and authors of that
+ material by anyone who conveys the material (or modified versions of
+ it) with contractual assumptions of liability to the recipient, for
+ any liability that these contractual assumptions directly impose on
+ those licensors and authors.
+
+ All other non-permissive additional terms are considered "further
+restrictions" within the meaning of section 10. If the Program as you
+received it, or any part of it, contains a notice stating that it is
+governed by this License along with a term that is a further
+restriction, you may remove that term. If a license document contains
+a further restriction but permits relicensing or conveying under this
+License, you may add to a covered work material governed by the terms
+of that license document, provided that the further restriction does
+not survive such relicensing or conveying.
+
+ If you add terms to a covered work in accord with this section, you
+must place, in the relevant source files, a statement of the
+additional terms that apply to those files, or a notice indicating
+where to find the applicable terms.
+
+ Additional terms, permissive or non-permissive, may be stated in the
+form of a separately written license, or stated as exceptions;
+the above requirements apply either way.
+
+ 8. Termination.
+
+ You may not propagate or modify a covered work except as expressly
+provided under this License. Any attempt otherwise to propagate or
+modify it is void, and will automatically terminate your rights under
+this License (including any patent licenses granted under the third
+paragraph of section 11).
+
+ However, if you cease all violation of this License, then your
+license from a particular copyright holder is reinstated (a)
+provisionally, unless and until the copyright holder explicitly and
+finally terminates your license, and (b) permanently, if the copyright
+holder fails to notify you of the violation by some reasonable means
+prior to 60 days after the cessation.
+
+ Moreover, your license from a particular copyright holder is
+reinstated permanently if the copyright holder notifies you of the
+violation by some reasonable means, this is the first time you have
+received notice of violation of this License (for any work) from that
+copyright holder, and you cure the violation prior to 30 days after
+your receipt of the notice.
+
+ Termination of your rights under this section does not terminate the
+licenses of parties who have received copies or rights from you under
+this License. If your rights have been terminated and not permanently
+reinstated, you do not qualify to receive new licenses for the same
+material under section 10.
+
+ 9. Acceptance Not Required for Having Copies.
+
+ You are not required to accept this License in order to receive or
+run a copy of the Program. Ancillary propagation of a covered work
+occurring solely as a consequence of using peer-to-peer transmission
+to receive a copy likewise does not require acceptance. However,
+nothing other than this License grants you permission to propagate or
+modify any covered work. These actions infringe copyright if you do
+not accept this License. Therefore, by modifying or propagating a
+covered work, you indicate your acceptance of this License to do so.
+
+ 10. Automatic Licensing of Downstream Recipients.
+
+ Each time you convey a covered work, the recipient automatically
+receives a license from the original licensors, to run, modify and
+propagate that work, subject to this License. You are not responsible
+for enforcing compliance by third parties with this License.
+
+ An "entity transaction" is a transaction transferring control of an
+organization, or substantially all assets of one, or subdividing an
+organization, or merging organizations. If propagation of a covered
+work results from an entity transaction, each party to that
+transaction who receives a copy of the work also receives whatever
+licenses to the work the party's predecessor in interest had or could
+give under the previous paragraph, plus a right to possession of the
+Corresponding Source of the work from the predecessor in interest, if
+the predecessor has it or can get it with reasonable efforts.
+
+ You may not impose any further restrictions on the exercise of the
+rights granted or affirmed under this License. For example, you may
+not impose a license fee, royalty, or other charge for exercise of
+rights granted under this License, and you may not initiate litigation
+(including a cross-claim or counterclaim in a lawsuit) alleging that
+any patent claim is infringed by making, using, selling, offering for
+sale, or importing the Program or any portion of it.
+
+ 11. Patents.
+
+ A "contributor" is a copyright holder who authorizes use under this
+License of the Program or a work on which the Program is based. The
+work thus licensed is called the contributor's "contributor version".
+
+ A contributor's "essential patent claims" are all patent claims
+owned or controlled by the contributor, whether already acquired or
+hereafter acquired, that would be infringed by some manner, permitted
+by this License, of making, using, or selling its contributor version,
+but do not include claims that would be infringed only as a
+consequence of further modification of the contributor version. For
+purposes of this definition, "control" includes the right to grant
+patent sublicenses in a manner consistent with the requirements of
+this License.
+
+ Each contributor grants you a non-exclusive, worldwide, royalty-free
+patent license under the contributor's essential patent claims, to
+make, use, sell, offer for sale, import and otherwise run, modify and
+propagate the contents of its contributor version.
+
+ In the following three paragraphs, a "patent license" is any express
+agreement or commitment, however denominated, not to enforce a patent
+(such as an express permission to practice a patent or covenant not to
+sue for patent infringement). To "grant" such a patent license to a
+party means to make such an agreement or commitment not to enforce a
+patent against the party.
+
+ If you convey a covered work, knowingly relying on a patent license,
+and the Corresponding Source of the work is not available for anyone
+to copy, free of charge and under the terms of this License, through a
+publicly available network server or other readily accessible means,
+then you must either (1) cause the Corresponding Source to be so
+available, or (2) arrange to deprive yourself of the benefit of the
+patent license for this particular work, or (3) arrange, in a manner
+consistent with the requirements of this License, to extend the patent
+license to downstream recipients. "Knowingly relying" means you have
+actual knowledge that, but for the patent license, your conveying the
+covered work in a country, or your recipient's use of the covered work
+in a country, would infringe one or more identifiable patents in that
+country that you have reason to believe are valid.
+
+ If, pursuant to or in connection with a single transaction or
+arrangement, you convey, or propagate by procuring conveyance of, a
+covered work, and grant a patent license to some of the parties
+receiving the covered work authorizing them to use, propagate, modify
+or convey a specific copy of the covered work, then the patent license
+you grant is automatically extended to all recipients of the covered
+work and works based on it.
+
+ A patent license is "discriminatory" if it does not include within
+the scope of its coverage, prohibits the exercise of, or is
+conditioned on the non-exercise of one or more of the rights that are
+specifically granted under this License. You may not convey a covered
+work if you are a party to an arrangement with a third party that is
+in the business of distributing software, under which you make payment
+to the third party based on the extent of your activity of conveying
+the work, and under which the third party grants, to any of the
+parties who would receive the covered work from you, a discriminatory
+patent license (a) in connection with copies of the covered work
+conveyed by you (or copies made from those copies), or (b) primarily
+for and in connection with specific products or compilations that
+contain the covered work, unless you entered into that arrangement,
+or that patent license was granted, prior to 28 March 2007.
+
+ Nothing in this License shall be construed as excluding or limiting
+any implied license or other defenses to infringement that may
+otherwise be available to you under applicable patent law.
+
+ 12. No Surrender of Others' Freedom.
+
+ If conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot convey a
+covered work so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you may
+not convey it at all. For example, if you agree to terms that obligate you
+to collect a royalty for further conveying from those to whom you convey
+the Program, the only way you could satisfy both those terms and this
+License would be to refrain entirely from conveying the Program.
+
+ 13. Use with the GNU Affero General Public License.
+
+ Notwithstanding any other provision of this License, you have
+permission to link or combine any covered work with a work licensed
+under version 3 of the GNU Affero General Public License into a single
+combined work, and to convey the resulting work. The terms of this
+License will continue to apply to the part which is the covered work,
+but the special requirements of the GNU Affero General Public License,
+section 13, concerning interaction through a network will apply to the
+combination as such.
+
+ 14. Revised Versions of this License.
+
+ The Free Software Foundation may publish revised and/or new versions of
+the GNU General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+ Each version is given a distinguishing version number. If the
+Program specifies that a certain numbered version of the GNU General
+Public License "or any later version" applies to it, you have the
+option of following the terms and conditions either of that numbered
+version or of any later version published by the Free Software
+Foundation. If the Program does not specify a version number of the
+GNU General Public License, you may choose any version ever published
+by the Free Software Foundation.
+
+ If the Program specifies that a proxy can decide which future
+versions of the GNU General Public License can be used, that proxy's
+public statement of acceptance of a version permanently authorizes you
+to choose that version for the Program.
+
+ Later license versions may give you additional or different
+permissions. However, no additional obligations are imposed on any
+author or copyright holder as a result of your choosing to follow a
+later version.
+
+ 15. Disclaimer of Warranty.
+
+ THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
+APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
+HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
+OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
+IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
+ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+ 16. Limitation of Liability.
+
+ IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
+THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
+GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
+USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
+DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
+PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
+EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGES.
+
+ 17. Interpretation of Sections 15 and 16.
+
+ If the disclaimer of warranty and limitation of liability provided
+above cannot be given local legal effect according to their terms,
+reviewing courts shall apply local law that most closely approximates
+an absolute waiver of all civil liability in connection with the
+Program, unless a warranty or assumption of liability accompanies a
+copy of the Program in return for a fee.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Programs
+
+ If you develop a new program, and you want it to be of the greatest
+possible use to the public, the best way to achieve this is to make it
+free software which everyone can redistribute and change under these terms.
+
+ To do so, attach the following notices to the program. It is safest
+to attach them to the start of each source file to most effectively
+state the exclusion of warranty; and each file should have at least
+the "copyright" line and a pointer to where the full notice is found.
+
+ <one line to give the program's name and a brief idea of what it does.>
+ Copyright (C) <year> <name of author>
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+Also add information on how to contact you by electronic and paper mail.
+
+ If the program does terminal interaction, make it output a short
+notice like this when it starts in an interactive mode:
+
+ <program> Copyright (C) <year> <name of author>
+ This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
+ This is free software, and you are welcome to redistribute it
+ under certain conditions; type `show c' for details.
+
+The hypothetical commands `show w' and `show c' should show the appropriate
+parts of the General Public License. Of course, your program's commands
+might be different; for a GUI interface, you would use an "about box".
+
+ You should also get your employer (if you work as a programmer) or school,
+if any, to sign a "copyright disclaimer" for the program, if necessary.
+For more information on this, and how to apply and follow the GNU GPL, see
+<https://www.gnu.org/licenses/>.
+
+ The GNU General Public License does not permit incorporating your program
+into proprietary programs. If your program is a subroutine library, you
+may consider it more useful to permit linking proprietary applications with
+the library. If this is what you want to do, use the GNU Lesser General
+Public License instead of this License. But first, please read
+<https://www.gnu.org/licenses/why-not-lgpl.html>.
diff --git a/aclk/aclk-schemas/Makefile b/aclk/aclk-schemas/Makefile
new file mode 100644
index 000000000..8f4003070
--- /dev/null
+++ b/aclk/aclk-schemas/Makefile
@@ -0,0 +1,74 @@
+SHELL := /usr/bin/env bash -o pipefail
+
+# This controls the location of the cache.
+PROJECT := cloud-schemas
+# This controls the remote HTTPS git location to compare against for breaking changes in CI.
+#
+# Most CI providers only clone the branch under test and to a certain depth, so when
+# running buf check breaking in CI, it is generally preferable to compare against
+# the remote repository directly.
+#
+# Basic authentication is available, see https://buf.build/docs/inputs#https for more details.
+HTTPS_GIT := https://github.com/netdata/cloud-schemas.git
+
+# This controls the version of buf to install and use.
+BUF_VERSION := 0.6.0
+
+### Everything below this line is meant to be static, i.e. only adjust the above variables. ###
+
+UNAME_OS := $(shell uname -s)
+UNAME_ARCH := $(shell uname -m)
+# Buf will be cached to ~/.cache/buf-example.
+CACHE_BASE := $(HOME)/.cache/$(PROJECT)
+# This allows switching between i.e a Docker container and your local setup without overwriting.
+CACHE := $(CACHE_BASE)/$(UNAME_OS)/$(UNAME_ARCH)
+# The location where buf will be installed.
+CACHE_BIN := $(CACHE)/bin
+# Marker files are put into this directory to denote the current version of binaries that are installed.
+CACHE_VERSIONS := $(CACHE)/versions
+
+# Update the $PATH so we can use buf directly
+export PATH := $(abspath $(CACHE_BIN)):$(PATH)
+
+# BUF points to the marker file for the installed version.
+#
+# If BUF_VERSION is changed, the binary will be re-downloaded.
+BUF := $(CACHE_VERSIONS)/buf/$(BUF_VERSION)
+$(BUF):
+ @rm -f $(CACHE_BIN)/buf
+ @mkdir -p $(CACHE_BIN)
+ curl -sSL \
+ "https://github.com/bufbuild/buf/releases/download/v$(BUF_VERSION)/buf-$(UNAME_OS)-$(UNAME_ARCH)" \
+ -o "$(CACHE_BIN)/buf"
+ chmod +x "$(CACHE_BIN)/buf"
+ @rm -rf $(dir $(BUF))
+ @mkdir -p $(dir $(BUF))
+ @touch $(BUF)
+
+.DEFAULT_GOAL := local
+
+# deps allows us to install deps without running any checks.
+
+.PHONY: deps
+deps: $(BUF)
+
+# local is what we run when testing locally.
+# This does breaking change detection against our local git repository.
+
+.PHONY: local
+local: $(BUF)
+ buf check lint ./proto
+ buf check breaking --against '.git#branch=master'
+
+# https is what we run when testing in most CI providers.
+# This does breaking change detection against our git repository.
+
+.PHONY: CI
+CI: $(BUF)
+ buf check lint ./proto
+ buf check breaking --against ".git#branch=master"
+
+.PHONY: clean
+clean:
+ git clean -xdf
+ rm -rf $(CACHE_BASE) \ No newline at end of file
diff --git a/aclk/aclk-schemas/README.md b/aclk/aclk-schemas/README.md
new file mode 100644
index 000000000..aa6188977
--- /dev/null
+++ b/aclk/aclk-schemas/README.md
@@ -0,0 +1,2 @@
+# aclk-schemas
+Protobuf schemas used in ACLK connection
diff --git a/aclk/aclk-schemas/buf.yml b/aclk/aclk-schemas/buf.yml
new file mode 100644
index 000000000..532053d91
--- /dev/null
+++ b/aclk/aclk-schemas/buf.yml
@@ -0,0 +1,9 @@
+build:
+ roots:
+ - proto
+lint:
+ use:
+ - DEFAULT
+breaking:
+ use:
+ - FILE
diff --git a/aclk/aclk-schemas/proto/aclk/v1/lib.proto b/aclk/aclk-schemas/proto/aclk/v1/lib.proto
new file mode 100644
index 000000000..f32c32c6e
--- /dev/null
+++ b/aclk/aclk-schemas/proto/aclk/v1/lib.proto
@@ -0,0 +1,22 @@
+syntax = "proto3";
+
+package aclk_lib.v1;
+
+import "google/protobuf/timestamp.proto";
+
+option go_package = "aclk_lib/v1;aclklib";
+
+// ACLKMessagePosition is used by sequenced messages to define their exact position
+message ACLKMessagePosition {
+ uint64 sequence_id = 1;
+ // auto generated in Agent's DB upon sequence_id creation
+ google.protobuf.Timestamp seq_id_created_at = 2;
+ uint64 previous_sequence_id = 3;
+}
+
+message Capability {
+ string name = 1;
+ uint32 version = 2;
+ // version == 0 is equivalent to not having the capability at all
+ bool enabled = 3;
+}
diff --git a/aclk/aclk-schemas/proto/agent/v1/cmds.proto b/aclk/aclk-schemas/proto/agent/v1/cmds.proto
new file mode 100644
index 000000000..c37c00c3a
--- /dev/null
+++ b/aclk/aclk-schemas/proto/agent/v1/cmds.proto
@@ -0,0 +1,79 @@
+syntax = "proto3";
+option go_package = "agent/v1;agent";
+
+package agent.v1;
+
+import "google/protobuf/timestamp.proto";
+import "proto/aclk/v1/lib.proto";
+
+message CancelPendingRequest {
+ // must match the ID sent with the request originally made
+ // other than this agent will not put conditions on it
+ // and will treat it as opaque string (it simply has to match)
+ // However this doesn't mean there are no conditions on the id
+ // made on the request side
+ string request_id = 1;
+
+ // time when the cancellation request was generated
+ google.protobuf.Timestamp timestamp = 2;
+
+ // optional might be useful for debugging purposes
+ string trace_id = 3;
+}
+
+// AgentCommand is sent from the Cloud to the Agent at `/agent/{claim_id}/inbound/v1/cmd/AgentCommand`
+// the message includes the resource that the Cloud needs to GET from the Agent HTTP API along with related metadata
+message AgentCommand {
+ // the topic to which the Cloud awaits for the AgentCommandResponse.
+ // example: `/svc/agent-data-ctrl/2d7b7edd-561e-4aec-8ac1-466a585520f5/resp`
+ string callback_topic = 1;
+ // the topic to which the Cloud awaits for the AgentCommandAck.
+ // example: `/svc/agent-data-ctrl/2d7b7edd-561e-4aec-8ac1-466a585520f5/resp`
+ string ack_topic = 2;
+ // unique identifier for the AgentCommand
+ // example: `617038b3-7c2a-4617-a78f-ab37bd820198`
+ string message_id = 3;
+ // defined in milliseconds, the time the Agent has to respond before Cloud
+ // considering the request as timed-out
+ // example: `60000`
+ uint64 timeout = 4;
+ // defined in milliseconds, the time the Agent has to send back to the Cloud
+ // an AgentCommandAck message signaling that is still working on the request
+ // example: `3000`
+ uint64 ack_timeout = 5;
+ // the requested Agent resource
+ // example: `/api/v2/data?query_params_go_here`
+ string resource = 6;
+}
+
+// AgentCommandAck is sent from the Agent to the Cloud at predefined intervals (`AgentCommand.ack_timeout`) to predefined topic (`AgentCommand.ack_topic`)
+// signaling that the Agent is still working to serve an AgentCommand (referenced by the message_id) that the Cloud sent
+message AgentCommandAck {
+ // unique identifier to reference AgentCommand on which the Agent is still working on serving
+ // example: `617038b3-7c2a-4617-a78f-ab37bd820198`
+ string message_id = 1;
+ // the timestamp when the Agent created this AgentCommandAck message
+ google.protobuf.Timestamp created_at = 2;
+ // integer revealing the progress of completion to serve the AgentCommand with the given message_id
+ // example: `25`
+ uint32 progress_percent = 3;
+}
+
+// AgentCommandResponse is sent from the Agent to the Cloud at `/agent/{claim_id}/inbound/v1/cmd/AgentCommand`
+// the message includes the resource that the Cloud needs to GET from the Agent HTTP API along with related metadata
+message AgentCommandResponse {
+ // unique identifier for the AgentCommand
+ // example: `617038b3-7c2a-4617-a78f-ab37bd820198`
+ string message_id = 1;
+ // the (http) status code of the Agent's API response
+ // example: `200`
+ uint32 status_code = 2;
+ // the dumped raw (http) response the Agent's API returned
+ bytes response = 3;
+ // the Agent's timestamp (aka legacy `timestamp`)
+ google.protobuf.Timestamp timestamp = 4;
+ // the timestamp when the Agent received the AgentCommand for execution (aka legacy `t-rx`)
+ google.protobuf.Timestamp received_at = 5;
+ // the amount of microseconds the Agent needed to execute the HTTP request of the AgentCommand (aka legacy`t-exec`)
+ uint64 exec_time = 6;
+}
diff --git a/aclk/aclk-schemas/proto/agent/v1/connection.proto b/aclk/aclk-schemas/proto/agent/v1/connection.proto
new file mode 100644
index 000000000..4321b0b90
--- /dev/null
+++ b/aclk/aclk-schemas/proto/agent/v1/connection.proto
@@ -0,0 +1,59 @@
+syntax = "proto3";
+option go_package = "agent/v1;agent";
+
+package agent.v1;
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+import "proto/aclk/v1/lib.proto";
+
+message UpdateAgentConnection {
+ string claim_id = 1;
+ bool reachable = 2;
+
+ int64 session_id = 3;
+
+ ConnectionUpdateSource update_source = 4;
+
+ // mqtt_broker_addr shard to use for reaching the agent
+ // cloud injects this information
+ string mqtt_broker_addr = 5;
+
+ google.protobuf.Timestamp updated_at = 6;
+
+ // vmq_instance_id broker shard to use for reaching the agent
+ // cloud injects this information
+ int32 vmq_instance_id = 7;
+
+ // > 15 optional fields:
+ // How long the system was running until connection (only applicable when reachable=true)
+ google.protobuf.Duration system_uptime = 15;
+
+ // How long the netdata agent was running until connection (only applicable when reachable=true)
+ google.protobuf.Duration agent_uptime = 16;
+
+ repeated aclk_lib.v1.Capability capabilities = 17;
+}
+
+message SendNodeInstances {
+ string claim_id = 1;
+ Config config = 2;
+}
+
+// ConnectionUpdateSource is to determine whether the connection update was issued
+enum ConnectionUpdateSource {
+ // CONNECTION_UPDATE_SOURCE_UNSPECIFIED acts as default value for protobuf and is never specified
+ CONNECTION_UPDATE_SOURCE_UNSPECIFIED = 0;
+ // CONNECTION_UPDATE_SOURCE_AGENT A direct message from an agent
+ CONNECTION_UPDATE_SOURCE_AGENT = 1;
+ // CONNECTION_UPDATE_SOURCE_LWT message delivered as the Last Will and Testiment from MQTT broker if an agent connection with the broker is lost
+ CONNECTION_UPDATE_SOURCE_LWT = 2;
+ // CONNECTION_UPDATE_SOURCE_HEURISTIC A cloud generated message to sanitize incorrect internal state
+ CONNECTION_UPDATE_SOURCE_HEURISTIC = 3;
+}
+
+message Config {
+ bool bearer_protection = 1;
+ bool cloud_only_notifications = 2;
+ bool universal_dashboards = 3;
+}
diff --git a/aclk/aclk-schemas/proto/agent/v1/disconnect.proto b/aclk/aclk-schemas/proto/agent/v1/disconnect.proto
new file mode 100644
index 000000000..852ef702a
--- /dev/null
+++ b/aclk/aclk-schemas/proto/agent/v1/disconnect.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+
+package agent.v1;
+
+import "google/protobuf/timestamp.proto";
+
+option go_package = "agent/v1;agent";
+
+// Sent by Cloud to instruct Agent to disconnect ASAP
+message DisconnectReq {
+ uint64 reconnect_after_seconds = 1;
+ bool permaban = 2;
+ google.protobuf.Timestamp created_at = 3;
+ uint32 error_code = 4;
+ string error_description = 5;
+}
diff --git a/aclk/aclk-schemas/proto/alarm/v1/config.proto b/aclk/aclk-schemas/proto/alarm/v1/config.proto
new file mode 100644
index 000000000..430078fcf
--- /dev/null
+++ b/aclk/aclk-schemas/proto/alarm/v1/config.proto
@@ -0,0 +1,61 @@
+syntax = "proto3";
+
+package alarms.v1;
+
+option go_package = "alarms/v1;alarms";
+
+message SendAlarmConfiguration{
+ string config_hash = 1;
+}
+
+message ProvideAlarmConfiguration {
+ string config_hash = 1;
+ AlarmConfiguration config = 2;
+}
+
+message AlarmConfiguration{
+ string alarm = 1;
+ string template = 2;
+ string on_chart = 3;
+
+ string classification = 4;
+ string type = 5;
+ string component = 6;
+
+ string os = 7;
+ string hosts = 8;
+ string plugin = 9;
+ string module = 10;
+ string charts = 11;
+ string families = 12;
+ string lookup = 13;
+ string every = 14;
+ string units = 15;
+
+ string green = 16;
+ string red = 17;
+
+ string calculation_expr = 18;
+ string warning_expr = 19;
+ string critical_expr = 20;
+
+ string recipient = 21;
+ string exec = 22;
+ string delay = 23;
+ string repeat = 24;
+ string info = 25;
+ string options = 26;
+ string host_labels = 27;
+
+ //parsed values from above config values
+ //indicated by p_
+ int32 p_db_lookup_after = 28;
+ int32 p_db_lookup_before = 29;
+ string p_db_lookup_dimensions = 30;
+ string p_db_lookup_method = 31;
+ string p_db_lookup_options = 32;
+ int32 p_update_every = 33;
+
+ string chart_labels = 34;
+ string summary = 35;
+}
diff --git a/aclk/aclk-schemas/proto/alarm/v1/stream.proto b/aclk/aclk-schemas/proto/alarm/v1/stream.proto
new file mode 100644
index 000000000..44f190682
--- /dev/null
+++ b/aclk/aclk-schemas/proto/alarm/v1/stream.proto
@@ -0,0 +1,148 @@
+syntax = "proto3";
+
+package alarms.v1;
+
+import "google/protobuf/timestamp.proto";
+
+option go_package = "alarms/v1;alarms";
+
+message SendAlarmLogHealth {
+ string node_id = 1;
+}
+
+message AlarmLogHealth {
+ string claim_id = 1;
+ string node_id = 2;
+ bool enabled = 3;
+ AlarmLogStatus status = 4;
+ LogEntries log_entries = 5;
+}
+
+message LogEntries {
+ int64 first_sequence_id = 1;
+ google.protobuf.Timestamp first_when = 2;
+
+ int64 last_sequence_id = 3;
+ google.protobuf.Timestamp last_when = 4;
+}
+
+enum AlarmLogStatus {
+ ALARM_LOG_STATUS_UNSPECIFIED = 0;
+ ALARM_LOG_STATUS_RUNNING = 1;
+ ALARM_LOG_STATUS_IDLE = 2;
+}
+
+message StartAlarmStreaming {
+ string node_id = 1;
+ uint64 batch_id = 2 [deprecated=true];
+ uint64 start_sequnce_id = 3 [deprecated=true];
+ // Instructs the agent to sync all configured alarms
+ bool resets = 4;
+}
+
+message SendAlarmCheckpoint {
+ string node_id = 1;
+ string claim_id = 2;
+}
+
+message AlarmCheckpoint {
+ string node_id = 1;
+ string claim_id = 2;
+ bytes checksum = 3;
+}
+
+message AlarmLogEntry {
+ string node_id = 1;
+ string claim_id = 2;
+
+ // The chart's id field
+ string chart = 3;
+ string name = 4;
+ string family = 5;
+ uint64 batch_id = 6 [deprecated=true];
+ uint64 sequence_id = 7 [deprecated=true];
+ uint64 when = 8;
+
+ string config_hash = 9;
+
+ int32 utc_offset = 10;
+ string timezone = 11;
+
+ // Paths that can be custom for the same alarm, but depend on installation path for each user. Should be here or in config ?
+ string exec_path = 12;
+ string conf_source = 13;
+ string command = 14;
+
+ // In seconds, uint32 is safe ?
+ uint32 duration = 15;
+ uint32 non_clear_duration = 16;
+
+ AlarmStatus status = 17;
+ AlarmStatus old_status = 18;
+ uint64 delay = 19;
+ uint64 delay_up_to_timestamp = 20;
+ // Todo: verify that we need these. sequence_id doesn't suffice?
+ // uint64 updated_by_id = 12;
+ // uint64 updates_id = 13;
+ uint64 last_repeat = 21;
+ bool silenced = 22;
+
+ // Check if string values are needed
+ string value_string = 23;
+ string old_value_string = 24;
+
+ double value = 25;
+ double old_value = 26;
+
+ // Updated alarm entry, when the status of the alarm has been updated by a later entry
+ bool updated = 27;
+
+ // Rendered_info
+ string rendered_info = 28;
+
+ // The chart's context field
+ string chart_context = 29;
+
+ // Counter of alert transitions for this alert chain
+ uint64 event_id = 30;
+
+ // A unique uuid for this alert event
+ string transition_id = 31;
+
+ // The chart's name field
+ string chart_name = 32;
+
+ // The rendered summary
+ string summary = 33;
+}
+
+enum AlarmStatus {
+ ALARM_STATUS_NULL = 0;
+ ALARM_STATUS_UNKNOWN = 1;
+ ALARM_STATUS_REMOVED = 2;
+ ALARM_STATUS_NOT_A_NUMBER = 3;
+ ALARM_STATUS_CLEAR = 4;
+ ALARM_STATUS_WARNING = 5;
+ ALARM_STATUS_CRITICAL = 6;
+}
+
+// SendAlarmSnapshot: send from cloud to the agent, to initiate an AlarmSnapshot image of current alarms back to the cloud
+message SendAlarmSnapshot {
+ string node_id = 1;
+ string claim_id = 2;
+ uint64 snapshot_id = 3 [deprecated=true];
+ uint64 sequence_id = 4 [deprecated=true];
+ string snapshot_uuid = 5;
+}
+
+// Agent responds with AlarmSnapshot to a SendAlarmSnapshot message
+message AlarmSnapshot{
+ string node_id = 1;
+ string claim_id = 2;
+ uint64 snapshot_id = 3 [deprecated=true]; // Same id from SendAlarmSnapshot message
+ uint32 chunks = 4; // In case full snapshot can not fit in a single message, indicates the total number of messages for this snapshot_id
+ uint32 chunk_size = 5; // How many alerts this chunk contains
+ uint32 chunk = 6; // Chunk index of this message
+ repeated AlarmLogEntry alarms = 7; // a list of AlarmLogEntry's
+ string snapshot_uuid = 8;
+}
diff --git a/aclk/aclk-schemas/proto/chart/v1/config.proto b/aclk/aclk-schemas/proto/chart/v1/config.proto
new file mode 100644
index 000000000..f0c5e3a35
--- /dev/null
+++ b/aclk/aclk-schemas/proto/chart/v1/config.proto
@@ -0,0 +1,37 @@
+syntax = "proto3";
+
+package chart.v1;
+
+option go_package = "chart/config/v1;chartconfig";
+
+// UpdateChartConfigs command contains the list of missing chart configs from the cloud to agent
+message UpdateChartConfigs {
+ // claim_id, node_id pair is used to identify the Node Instance
+ string claim_id = 1;
+ string node_id = 2;
+ // list of config hashes missing from cloud and requested from the agent
+ repeated string config_hashes = 3;
+}
+
+message ChartConfigsUpdated {
+ repeated ChartConfigUpdated configs = 1;
+}
+
+message ChartConfigUpdated {
+ string type = 1;
+ string family = 2;
+ string context = 3;
+ string title = 4;
+ uint64 priority = 5;
+ string plugin = 6;
+ string module = 7;
+ ChartType chart_type = 8;
+ string units = 9;
+ string config_hash = 10;
+}
+
+enum ChartType {
+ LINE = 0;
+ AREA = 1;
+ STACKED = 2;
+}
diff --git a/aclk/aclk-schemas/proto/chart/v1/dimension.proto b/aclk/aclk-schemas/proto/chart/v1/dimension.proto
new file mode 100644
index 000000000..8bcb564b8
--- /dev/null
+++ b/aclk/aclk-schemas/proto/chart/v1/dimension.proto
@@ -0,0 +1,24 @@
+syntax = "proto3";
+
+package chart.v1;
+
+import "google/protobuf/timestamp.proto";
+
+import "proto/aclk/v1/lib.proto";
+
+option go_package = "chart/dimension/v1;chartdimension";
+
+// ChartDimensionUpdated is a single event sent from the Agent to the Cloud containing chart dimension data.
+//
+// ChartDimensionUpdated messages are dispatched in bulk to the Cloud wrapped in ChartsAndDimensionsUpdated messages.
+message ChartDimensionUpdated {
+ string id = 1;
+ string chart_id = 2;
+ string node_id = 3;
+ string claim_id = 4;
+ string name = 5;
+ google.protobuf.Timestamp created_at = 6;
+ // null value means that the dimension is currently collected (live)
+ google.protobuf.Timestamp last_timestamp = 7;
+ aclk_lib.v1.ACLKMessagePosition position = 8;
+}
diff --git a/aclk/aclk-schemas/proto/chart/v1/instance.proto b/aclk/aclk-schemas/proto/chart/v1/instance.proto
new file mode 100644
index 000000000..25c99e7c7
--- /dev/null
+++ b/aclk/aclk-schemas/proto/chart/v1/instance.proto
@@ -0,0 +1,32 @@
+syntax = "proto3";
+
+package chart.v1;
+
+import "proto/aclk/v1/lib.proto";
+
+option go_package = "chart/instance/v1;chartinstance";
+
+// ChartInstanceUpdated is a single event sent from the Agent to the Cloud containing chart instance data.
+//
+// ChartInstanceUpdated messages are dispatched in bulk to the Cloud wrapped in ChartsAndDimensionsUpdated messages.
+message ChartInstanceUpdated {
+ string id = 1;
+ string claim_id = 2;
+ string node_id = 3;
+ string name = 4;
+ map<string, string> chart_labels = 5;
+ MemoryMode memory_mode = 6;
+ // in seconds
+ uint32 update_every_interval = 7;
+ string config_hash = 8;
+ aclk_lib.v1.ACLKMessagePosition position = 9;
+}
+
+enum MemoryMode {
+ NONE = 0;
+ RAM = 1;
+ MAP = 2;
+ SAVE = 3;
+ ALLOC = 4;
+ DB_ENGINE = 5;
+}
diff --git a/aclk/aclk-schemas/proto/chart/v1/stream.proto b/aclk/aclk-schemas/proto/chart/v1/stream.proto
new file mode 100644
index 000000000..9473538f2
--- /dev/null
+++ b/aclk/aclk-schemas/proto/chart/v1/stream.proto
@@ -0,0 +1,86 @@
+syntax = "proto3";
+
+package chart.v1;
+
+import "google/protobuf/timestamp.proto";
+
+import "proto/chart/v1/instance.proto";
+import "proto/chart/v1/dimension.proto";
+
+option go_package = "chart/stream/v1;chartstream";
+
+// StreamChartsAndDimensions is a Command produced by the Cloud, consumed by the Agent.
+//
+// It instructs the Agent to start sending ChartsAndDimensionsUpdated messages for a NodeInstance
+// after the last sequence_id that the Cloud has successfully ingested.
+message StreamChartsAndDimensions {
+ // claim_id, node_id pair is used to identify the Node Instance
+ string claim_id = 1;
+ string node_id = 2;
+
+ // sequence_id last verified sequence sent by the Agent
+ uint64 sequence_id = 3;
+ // batch_id identifies the stream_id and gets incremented every time the Cloud sends a new StreamChartsAndDimensions command
+ uint64 batch_id = 4;
+ // seq_id_created_at autogenerated timestamp in Agent's DB upon sequence_id creation
+ google.protobuf.Timestamp seq_id_created_at = 5;
+}
+
+
+// ChartsAndDimensionsAck is an Event produced by the Cloud, consumed by the Agent.
+//
+// This Event is an acknowledgment from the Cloud side that Chart messages up to a specific last_sequence_id
+// have been successfully ingested, and could be potentially deleted from the Agent's DB.
+message ChartsAndDimensionsAck {
+ string claim_id = 1;
+ string node_id = 2;
+ // the last verified stored message's seq_id
+ uint64 last_sequence_id = 3;
+}
+
+// ResetChartMessages is a Command produced by the Agent, consumed by the Cloud.
+//
+// This Command instructs the Cloud to clear its Chart state for a specific NodeInstance and re-sync
+// because of a ResetReason.
+message ResetChartMessages {
+ // claim_id, node_id pair is used to identify the Node Instance
+ string claim_id = 1;
+ string node_id = 2;
+
+ ResetReason reason = 3;
+}
+
+enum ResetReason {
+ DB_EMPTY = 0;
+ SEQ_ID_NOT_EXISTS = 1;
+ TIMESTAMP_MISMATCH = 2;
+}
+
+// ChartsAndDimensionsUpdated is a wrapper Event (`fat` message) produced by the Agent, consumed by the Cloud.
+//
+// It potentially includes a collection of ChartInstanceUpdated messages and|or a collection of ChartDimensionUpdated messages.
+message ChartsAndDimensionsUpdated {
+ repeated chart.v1.ChartInstanceUpdated charts = 1;
+ repeated chart.v1.ChartDimensionUpdated dimensions = 2;
+ uint64 batch_id = 3;
+}
+
+// RetentionUpdated includes the available retentions (in seconds) of the dimensions - of a specific node instance and memory-mode -
+// on a per update_every level.
+// This message is sent over upon Agent Database rotation events to inform the Cloud in total about the newly updated data retentions
+// of a node instance's dimensions.
+message RetentionUpdated {
+ // claim_id, node_id pair is used to identify the Node Instance
+ string claim_id = 1;
+ string node_id = 2;
+ // the memory_mode used by the node instance's chart instances
+ chart.v1.MemoryMode memory_mode = 3;
+ // this mapping identifies the newly updated available retention (in seconds) of the node instance's dimensions
+ // the keys are the update_every categories of various dimensions (1, 2, 4, 10 etc.),
+ // and the values are the available retention (in seconds) of each dimension belonging to the update_every category
+ // denoted by the key
+ map<uint32, uint32> interval_durations = 4;
+ // the timestamp when the db rotation event took place. Can be used in conjunction with the interval_durations
+ // to compute the beginning of each `updated_every` group's retention
+ google.protobuf.Timestamp rotation_timestamp = 5;
+}
diff --git a/aclk/aclk-schemas/proto/context/v1/context.proto b/aclk/aclk-schemas/proto/context/v1/context.proto
new file mode 100644
index 000000000..eb771f8eb
--- /dev/null
+++ b/aclk/aclk-schemas/proto/context/v1/context.proto
@@ -0,0 +1,57 @@
+syntax = "proto3";
+
+package context.v1;
+
+option go_package = "context/v1;context";
+
+// ContextsUpdated is an Event produced by the Agent, consumed by the Cloud.
+//
+// it contains a collection of ContextUpdated messages for a specific NodeInstance.
+message ContextsUpdated {
+ // contexUpdates contains the collection of context updates
+ repeated ContextUpdated contextUpdates = 1;
+ // claim_id, node_id pair identifies the node instance
+ string claim_id = 2;
+ string node_id = 3;
+ // version_hash is the contexts version_hash result the cloud should
+ // get after applying this message updates.
+ uint64 version_hash = 4;
+ // it's and always increasing number to compare
+ // which version_hash is more recent between multiple
+ // ContextsUpdated messages. Bigger means more recent.
+ uint64 created_at = 5;
+}
+
+// ContextUpdated contains context data.
+message ContextUpdated {
+ // context id
+ string id = 1;
+ // context version is an epoch in seconds.
+ uint64 version = 2;
+ // first_entry, last_entry are epochs in seconds
+ uint64 first_entry = 3;
+ uint64 last_entry = 4;
+ // deleted flag is used to signal a context deletion
+ bool deleted = 5;
+ // context configuration fields
+ string title = 6;
+ uint64 priority = 7;
+ string chart_type = 8;
+ string units = 9;
+ string family = 10;
+}
+
+// ContextsSnapshot is an Event produced by the Agent, consumed by the Cloud.
+//
+// it contains a snapshot of the existing contexts on the Agent.
+// snapshot version and context versions are epochs in seconds so we can
+// identify if a context version was generated after a specific snapshot.
+message ContextsSnapshot {
+ // contexts contains the collection of existing contexts
+ repeated ContextUpdated contexts = 1;
+ // claim_id, node_id pair identifies the node instance
+ string claim_id = 2;
+ string node_id = 3;
+ // version is an epoch in seconds
+ uint64 version = 4;
+}
diff --git a/aclk/aclk-schemas/proto/context/v1/stream.proto b/aclk/aclk-schemas/proto/context/v1/stream.proto
new file mode 100644
index 000000000..a6e7e3abf
--- /dev/null
+++ b/aclk/aclk-schemas/proto/context/v1/stream.proto
@@ -0,0 +1,34 @@
+syntax = "proto3";
+
+package context.v1;
+
+option go_package = "context/v1;context";
+
+// ContextsCheckpoint is a Command produced by the Cloud, consumed by the Agent.
+//
+// It informs the Agent the contexts' version_hash that the cloud has for a specific NodeInstance.
+message ContextsCheckpoint {
+ // claim_id, node_id pair is used to identify the NodeInstance.
+ string claim_id = 1;
+ string node_id = 2;
+ // version_hash tells the Agent the current version hash for the contexts received
+ // if the version hash calculated by the Agent is different, Agent will request
+ // to re-sync all contexts.
+ uint64 version_hash= 3;
+}
+
+// StopStreamingContexts is a Command produced by the Cloud, consumed by the Agent.
+//
+// It instructs the Agent to stop sending ContextsUpdated messages for a NodeInstance
+// due to a reason.
+message StopStreamingContexts {
+ // claim_id, node_id pair is used to identify the node instance
+ string claim_id = 1;
+ string node_id = 2;
+
+ StopStreamingContextsReason reason = 3;
+}
+
+enum StopStreamingContextsReason {
+ RATE_LIMIT_EXCEEDED = 0;
+}
diff --git a/aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto b/aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto
new file mode 100644
index 000000000..f0c02461e
--- /dev/null
+++ b/aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto
@@ -0,0 +1,37 @@
+syntax = "proto3";
+option go_package = "nodeinstance/connection/v1;nodeinstanceconnection";
+
+package nodeinstance.v1;
+
+import "google/protobuf/timestamp.proto";
+import "proto/aclk/v1/lib.proto";
+
+message UpdateNodeInstanceConnection {
+ string claim_id = 1;
+ string node_id = 2;
+
+ // liveness whether node data are actively streamed to the agent.
+ bool liveness = 3;
+
+ // queryable whether the agent has data about the node.
+ bool queryable = 4;
+
+ int64 session_id = 5;
+
+ google.protobuf.Timestamp updated_at = 6;
+
+ // mqtt_broker_addr shard to use for reaching the agent
+ // cloud injects this information.
+ string mqtt_broker_addr = 7;
+
+ // vmq_instance_id broker shard to use for reaching the agent
+ // cloud injects this information.
+ int32 vmq_instance_id = 8;
+
+ // hops is the number of streaming hops between collection of node data
+ // and the claimed agent. Zero if no streaming is involved.
+ int32 hops = 9;
+
+ // capabilities of node instance NOT the NODE or agent!!!
+ repeated aclk_lib.v1.Capability capabilities = 10;
+}
diff --git a/aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto b/aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto
new file mode 100644
index 000000000..922337154
--- /dev/null
+++ b/aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto
@@ -0,0 +1,32 @@
+syntax = "proto3";
+option go_package = "node_instance/creation/v1;node_instancecreation";
+
+package nodeinstance.create.v1;
+
+message CreateNodeInstance {
+ // Claim ID of the Agent the Node Instance belongs to.
+ // Eventually, the NodeInstance will be identified by the compilation of
+ // this claim_id and NodeID returned by `CreateNodeInstanceResult`
+ string claim_id = 1;
+ // Machine GUID of the Machine the request comes from
+ // Used to look for an existing NodeID in the space claim_id belongs to
+ string machine_guid = 2;
+ string hostname = 3;
+
+ // vmq_instance_id broker shard to use for reaching the agent
+ // cloud injects this information.
+ int32 vmq_instance_id = 4;
+ // mqtt_broker_addr shard to use for reaching the agent
+ // cloud injects this information.
+ string mqtt_broker_addr = 5;
+
+ // hops is the number of streaming hops between collection of node data
+ // and the claimed agent. Zero if no streaming is involved.
+ int32 hops = 6;
+}
+
+message CreateNodeInstanceResult {
+ string node_id = 1;
+ string machine_guid = 2;
+}
+
diff --git a/aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto b/aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto
new file mode 100644
index 000000000..7aa9d0448
--- /dev/null
+++ b/aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto
@@ -0,0 +1,148 @@
+syntax = "proto3";
+option go_package = "node_instance/info/v1;nodeinstanceinfo";
+
+package nodeinstance.info.v1;
+
+import "google/protobuf/timestamp.proto";
+import "proto/aclk/v1/lib.proto";
+
+// UpdateNodeInfo (Command)
+//
+// pulsar topic: `UpdateNodeInfo` (sharded)
+//
+// key: `claim_id,node_id`
+//
+// Publishers: `netdata/agent`
+// Subscribers: `cloud-node-mqtt-output-service`
+//
+// When:
+// On nodeinstance connect
+//
+message UpdateNodeInfo {
+ string node_id = 7;
+
+ string claim_id = 1;
+
+ NodeInfo data = 2;
+ // to be obsoleted in future
+ // all new fields should go into node_info
+ // or node_instance_info respectively
+
+ google.protobuf.Timestamp updated_at = 3;
+
+ int64 session_id = 4;
+
+ string machine_guid = 5;
+
+ bool child = 6;
+
+ MachineLearningInfo ml_info = 8;
+ // to be obsoleted in far future
+
+ NodeInfo2 node_info = 9;
+ // node_info shows data about actual node
+ // for example feature (ml) for this
+ // node (child) might be available/enabled on the node (child) directly
+ // but not available trough the parent (node_instance)
+
+ NodeInstanceInfo node_instance_info = 10;
+ // info specific to the node_instance for this node available trough agent
+ // who sends this message.
+ // e.g. machine learning is enabled for this node and processing is done
+ // by the actual agent (parent). (child itself might or might not be
+ // ml ml_capable by itself (see node_info))
+}
+
+message NodeInfo2 {
+ repeated aclk_lib.v1.Capability capabilities = 1;
+}
+
+message NodeInstanceInfo {
+ repeated aclk_lib.v1.Capability capabilities = 1;
+}
+
+// NodeInfo describes the metadata of a node
+message NodeInfo {
+ string name = 1;
+
+ string os = 2;
+ string os_name = 3;
+ string os_version = 4;
+
+ string kernel_name = 5;
+ string kernel_version = 6;
+
+ string architecture = 7;
+
+ // number of cpu cores in the node
+ uint32 cpus = 8;
+
+ // human readable (value + unit) frequency of cpu
+ string cpu_frequency = 9;
+
+ // human readable (value + unit) size of node's memory
+ string memory = 10;
+
+ // human readable (value + unit) size of all (sum) node's disks
+ string disk_space = 11;
+
+ // version of the netdata agent
+ string version = 12;
+
+ // release channel of netdata agent (example: nightly)
+ string release_channel = 13;
+
+ string timezone = 14;
+
+ // virtualization_type example: kvm (optional)
+ string virtualization_type = 15;
+
+ // container_type example: docker (optional)
+ string container_type = 16;
+
+ string custom_info = 17;
+
+ // [Obsolete] repeated string services = 18;
+ reserved 18;
+
+ string machine_guid = 19;
+
+ // [Obsolete] repeated MirroredHostStatus mirrored_hosts_status = 20;
+ reserved 20;
+
+ map<string, string> host_labels = 21;
+
+ MachineLearningInfo ml_info = 22;
+
+ // [Obsolete] repeated string collectors = 23;
+ reserved 23;
+}
+
+message MachineLearningInfo {
+ // have ML capability
+ bool ml_capable = 1;
+
+ // runs ML functionality
+ bool ml_enabled = 2;
+}
+
+// UpdateNodeCollectors (Command)
+//
+// key: `claim_id,node_id`
+//
+// Publishers: `netdata/agent`
+//
+// When:
+// On nodeinstance connect (after agent settles) and on detection of change of collectors
+//
+
+message CollectorInfo {
+ string module = 1;
+ string plugin = 2;
+}
+
+message UpdateNodeCollectors {
+ string claim_id = 1;
+ string node_id = 2;
+ repeated CollectorInfo collectors = 3;
+}
diff --git a/aclk/aclk.c b/aclk/aclk.c
index 854408ce6..e95d7d6ab 100644
--- a/aclk/aclk.c
+++ b/aclk/aclk.c
@@ -154,7 +154,9 @@ biofailed:
static int wait_till_cloud_enabled()
{
- netdata_log_info("Waiting for Cloud to be enabled");
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "Waiting for Cloud to be enabled");
+
while (!netdata_cloud_enabled) {
sleep_usec(USEC_PER_SEC * 1);
if (!service_running(SERVICE_ACLK))
@@ -233,17 +235,22 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
switch(log_type) {
case MQTT_WSS_LOG_ERROR:
case MQTT_WSS_LOG_FATAL:
+ nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str);
+ return;
+
case MQTT_WSS_LOG_WARN:
- error_report("%s", str);
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str);
return;
+
case MQTT_WSS_LOG_INFO:
- netdata_log_info("%s", str);
+ nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str);
return;
+
case MQTT_WSS_LOG_DEBUG:
- netdata_log_debug(D_ACLK, "%s", str);
return;
+
default:
- netdata_log_error("Unknown log type from mqtt_wss");
+ nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss");
}
}
@@ -297,7 +304,9 @@ static void puback_callback(uint16_t packet_id)
#endif
if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) {
- netdata_log_info("Shutdown message has been acknowledged by the cloud. Exiting gracefully");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Shutdown message has been acknowledged by the cloud. Exiting gracefully");
+
aclk_shared_state.mqtt_shutdown_msg_rcvd = 1;
}
}
@@ -335,9 +344,11 @@ static int handle_connection(mqtt_wss_client client)
}
if (disconnect_req || aclk_kill_link) {
- netdata_log_info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
- disconnect_req ? "true" : "false",
- aclk_kill_link ? "true" : "false");
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)",
+ disconnect_req ? "true" : "false",
+ aclk_kill_link ? "true" : "false");
+
disconnect_req = 0;
aclk_kill_link = 0;
aclk_graceful_disconnect(client);
@@ -390,7 +401,9 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
void aclk_graceful_disconnect(mqtt_wss_client client)
{
- netdata_log_info("Preparing to gracefully shutdown ACLK connection");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Preparing to gracefully shutdown ACLK connection");
+
aclk_queue_lock();
aclk_queue_flush();
@@ -403,17 +416,22 @@ void aclk_graceful_disconnect(mqtt_wss_client client)
break;
}
if (aclk_shared_state.mqtt_shutdown_msg_rcvd) {
- netdata_log_info("MQTT App Layer `disconnect` message sent successfully");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "MQTT App Layer `disconnect` message sent successfully");
break;
}
}
- netdata_log_info("ACLK link is down");
- netdata_log_access("ACLK DISCONNECTED");
+
+ nd_log(NDLS_DAEMON, NDLP_WARNING, "ACLK link is down");
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
+
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
- netdata_log_info("Attempting to gracefully shutdown the MQTT/WSS connection");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Attempting to gracefully shutdown the MQTT/WSS connection");
+
mqtt_wss_disconnect(client, 1000);
}
@@ -455,7 +473,9 @@ static int aclk_block_till_recon_allowed() {
next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC);
last_backoff_value = (float)recon_delay / MSEC_PER_SEC;
- netdata_log_info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC);
+
// we want to wake up from time to time to check netdata_exit
while (recon_delay)
{
@@ -593,7 +613,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
return 1;
}
- netdata_log_info("Attempting connection now");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Attempting connection now");
+
memset(&base_url, 0, sizeof(url_t));
if (url_parse(aclk_cloud_base_url, &base_url)) {
aclk_status = ACLK_STATUS_INVALID_CLOUD_URL;
@@ -680,7 +702,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
error_report("Can't use encoding=proto without at least \"proto\" capability.");
continue;
}
- netdata_log_info("New ACLK protobuf protocol negotiated successfully (/env response).");
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "New ACLK protobuf protocol negotiated successfully (/env response).");
memset(&auth_url, 0, sizeof(url_t));
if (url_parse(aclk_env->auth_endpoint, &auth_url)) {
@@ -750,9 +774,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
if (!ret) {
last_conn_time_mqtt = now_realtime_sec();
- netdata_log_info("ACLK connection successfully established");
+ nd_log(NDLS_DAEMON, NDLP_INFO, "ACLK connection successfully established");
aclk_status = ACLK_STATUS_CONNECTED;
- netdata_log_access("ACLK CONNECTED");
+ nd_log(NDLS_ACCESS, NDLP_INFO, "ACLK CONNECTED");
mqtt_connected_actions(client);
return 0;
}
@@ -798,7 +822,9 @@ void *aclk_main(void *ptr)
netdata_thread_disable_cancelability();
#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK )
- netdata_log_info("Killing ACLK thread -> cloud functionality has been disabled");
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "Killing ACLK thread -> cloud functionality has been disabled");
+
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
return NULL;
#endif
@@ -857,7 +883,7 @@ void *aclk_main(void *ptr)
aclk_stats_upd_online(0);
last_disconnect_time = now_realtime_sec();
aclk_connected = 0;
- netdata_log_access("ACLK DISCONNECTED");
+ nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED");
}
} while (service_running(SERVICE_ACLK));
@@ -891,7 +917,7 @@ exit:
return NULL;
}
-void aclk_host_state_update(RRDHOST *host, int cmd)
+void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
{
uuid_t node_id;
int ret = 0;
@@ -924,7 +950,9 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
rrdhost_aclk_state_unlock(localhost);
create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
create_query->data.bin_payload.msg_name = "CreateNodeInstance";
- netdata_log_info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops);
+
aclk_queue_query(create_query);
return;
}
@@ -934,7 +962,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
node_instance_connection_t node_state_update = {
.hops = host->system_info->hops,
.live = cmd,
- .queryable = 1,
+ .queryable = queryable,
.session_id = aclk_session_newarch
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
@@ -947,8 +975,9 @@ void aclk_host_state_update(RRDHOST *host, int cmd)
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- netdata_log_info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd,
- host->system_info->hops);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing status update for node=%s, live=%d, hops=%u, queryable=%d",
+ (char*)node_state_update.node_id, cmd, host->system_info->hops, queryable);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
@@ -990,9 +1019,10 @@ void aclk_send_node_instances()
node_state_update.claim_id = localhost->aclk_state.claimed_id;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
rrdhost_aclk_state_unlock(localhost);
- netdata_log_info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id,
- list->live,
- list->hops);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing status update for node=%s, live=%d, hops=%d, queryable=1",
+ (char*)node_state_update.node_id, list->live, list->hops);
freez((void*)node_state_update.capabilities);
freez((void*)node_state_update.node_id);
@@ -1014,8 +1044,11 @@ void aclk_send_node_instances()
node_instance_creation.claim_id = localhost->aclk_state.claimed_id,
create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);
rrdhost_aclk_state_unlock(localhost);
- netdata_log_info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid,
- list->hops);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "Queuing registration for host=%s, hops=%d",
+ (char*)node_instance_creation.machine_guid, list->hops);
+
freez((void *)node_instance_creation.machine_guid);
aclk_queue_query(create_query);
}
@@ -1322,7 +1355,7 @@ void add_aclk_host_labels(void) {
void aclk_queue_node_info(RRDHOST *host, bool immediate)
{
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (likely(wc))
wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec();
}
diff --git a/aclk/aclk.h b/aclk/aclk.h
index 0badc1a62..72d1a2e11 100644
--- a/aclk/aclk.h
+++ b/aclk/aclk.h
@@ -75,7 +75,7 @@ extern struct aclk_shared_state {
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;
-void aclk_host_state_update(RRDHOST *host, int cmd);
+void aclk_host_state_update(RRDHOST *host, int cmd, int queryable);
void aclk_send_node_instances(void);
void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index abacbca83..5e3574b97 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -6,7 +6,6 @@
#include "../../web/server/web_client_cache.h"
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
-#define ACLK_MAX_WEB_RESPONSE_SIZE (30 * 1024 * 1024)
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
@@ -90,6 +89,12 @@ static bool aclk_web_client_interrupt_cb(struct web_client *w __maybe_unused, vo
}
static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "aclk"),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
int retval = 0;
BUFFER *local_buffer = NULL;
size_t size = 0;
@@ -110,7 +115,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
usec_t t;
web_client_timeout_checkpoint_set(w, query->timeout);
if(web_client_timeout_checkpoint_and_check(w, &t)) {
- netdata_log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout);
+ nd_log(NDLS_ACCESS, NDLP_ERR, "QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout);
retval = 1;
w->response.code = HTTP_RESP_SERVICE_UNAVAILABLE;
aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0);
@@ -131,13 +136,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
w->response.code = (short)web_client_api_request_with_node_selection(localhost, w, path);
web_client_timeout_checkpoint_response_ready(w, &t);
- if(buffer_strlen(w->response.data) > ACLK_MAX_WEB_RESPONSE_SIZE) {
- buffer_flush(w->response.data);
- buffer_strcat(w->response.data, "response is too big");
- w->response.data->content_type = CT_TEXT_PLAIN;
- w->response.code = HTTP_RESP_CONTENT_TOO_LONG;
- }
-
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_q_process_total += t;
@@ -217,25 +215,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
// send msg.
w->response.code = aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len);
- struct timeval tv;
-
cleanup:
- now_monotonic_high_precision_timeval(&tv);
- netdata_log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
- w->id
- , gettid()
- , query_thr->idx
- , "DATA"
- , sent
- , size
- , size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0)
- , dt_usec(&w->timings.tv_ready, &w->timings.tv_in) / 1000.0
- , dt_usec(&tv, &w->timings.tv_ready) / 1000.0
- , dt_usec(&tv, &w->timings.tv_in) / 1000.0
- , w->response.code
- , strip_control_characters((char *)buffer_tostring(w->url_as_received))
- );
-
+ web_client_log_completed_request(w, false);
web_client_release_to_cache(w);
pending_req_list_rm(query->msg_id);
diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c
index 6e4cd93fb..0e91e28c0 100644
--- a/aclk/aclk_rx_msgs.c
+++ b/aclk/aclk_rx_msgs.c
@@ -108,7 +108,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
}
start = payload + 4;
- if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) {
+ if(!(end = strstr(payload, HTTP_1_1 HTTP_ENDL))) {
errno = 0;
netdata_log_error("Doesn't look like HTTP GET request.");
return 1;
@@ -455,7 +455,7 @@ int cancel_pending_req(const char *msg, size_t msg_len)
return 1;
}
- netdata_log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id);
+ nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id);
if (mark_pending_req_cancelled(cmd.request_id))
error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id);
diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c
index 4102c818d..0e4182a72 100644
--- a/aclk/aclk_tx_msgs.c
+++ b/aclk/aclk_tx_msgs.c
@@ -86,7 +86,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec
int rc = mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
if (rc == MQTT_WSS_ERR_TOO_BIG_FOR_SERVER)
- return HTTP_RESP_FORBIDDEN;
+ return HTTP_RESP_CONTENT_TOO_LONG;
#ifdef NETDATA_INTERNAL_CHECKS
aclk_stats_msg_published(packet_id);
@@ -194,7 +194,7 @@ int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_
int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
switch (rc) {
- case HTTP_RESP_FORBIDDEN:
+ case HTTP_RESP_CONTENT_TOO_LONG:
aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, NULL, 0);
break;
case HTTP_RESP_INTERNAL_SERVER_ERROR:
diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c
index 00920e069..3bf2e3f18 100644
--- a/aclk/aclk_util.c
+++ b/aclk/aclk_util.c
@@ -1,6 +1,9 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_util.h"
+
+#ifdef ENABLE_ACLK
+
#include "aclk_proxy.h"
#include "daemon/common.h"
@@ -437,6 +440,7 @@ void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt
freez(proxy);
}
+#endif /* ENABLE_ACLK */
#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h
index 6b7e4e9c2..38ef5b0bc 100644
--- a/aclk/aclk_util.h
+++ b/aclk/aclk_util.h
@@ -3,6 +3,8 @@
#define ACLK_UTIL_H
#include "libnetdata/libnetdata.h"
+
+#ifdef ENABLE_ACLK
#include "mqtt_wss_client.h"
#define CLOUD_EC_MALFORMED_NODE_ID 1
@@ -112,6 +114,7 @@ unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, un
#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0)
void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type);
+#endif /* ENABLE_ACLK */
int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len);
diff --git a/aclk/https_client.c b/aclk/https_client.c
index 623082027..5385786b8 100644
--- a/aclk/https_client.c
+++ b/aclk/https_client.c
@@ -4,20 +4,12 @@
#include "https_client.h"
-#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
-
#include "aclk_util.h"
#include "daemon/global_statistics.h"
#define DEFAULT_CHUNKED_RESPONSE_BUFFER_SIZE (4096)
-enum http_parse_state {
- HTTP_PARSE_INITIAL = 0,
- HTTP_PARSE_HEADERS,
- HTTP_PARSE_CONTENT
-};
-
static const char *http_req_type_to_str(http_req_type_t req) {
switch (req) {
case HTTP_REQ_GET:
@@ -33,39 +25,33 @@ static const char *http_req_type_to_str(http_req_type_t req) {
#define TRANSFER_ENCODING_CHUNKED (-2)
-typedef struct {
- enum http_parse_state state;
- int content_length;
- int http_code;
-
- // for chunked data only
- char *chunked_response;
- size_t chunked_response_size;
- size_t chunked_response_written;
-
- enum chunked_content_state {
- CHUNKED_CONTENT_CHUNK_SIZE = 0,
- CHUNKED_CONTENT_CHUNK_DATA,
- CHUNKED_CONTENT_CHUNK_END_CRLF,
- CHUNKED_CONTENT_FINAL_CRLF
- } chunked_content_state;
-
- size_t chunk_size;
- size_t chunk_got;
-} http_parse_ctx;
-
#define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 }
-static inline void http_parse_ctx_clear(http_parse_ctx *ctx) {
+void http_parse_ctx_create(http_parse_ctx *ctx)
+{
ctx->state = HTTP_PARSE_INITIAL;
ctx->content_length = -1;
ctx->http_code = 0;
+ ctx->headers = c_rhash_new(0);
+ ctx->flags = HTTP_PARSE_FLAGS_DEFAULT;
+}
+
+void http_parse_ctx_destroy(http_parse_ctx *ctx)
+{
+ c_rhash_iter_t iter;
+ const char *key;
+
+ c_rhash_iter_t_initialize(&iter);
+ while ( !c_rhash_iter_str_keys(ctx->headers, &iter, &key) ) {
+ void *val;
+ c_rhash_get_ptr_by_str(ctx->headers, key, &val);
+ freez(val);
+ }
+
+ c_rhash_destroy(ctx->headers);
}
#define POLL_TO_MS 100
-#define NEED_MORE_DATA 0
-#define PARSE_SUCCESS 1
-#define PARSE_ERROR -1
#define HTTP_LINE_TERM "\x0D\x0A"
#define RESP_PROTO "HTTP/1.1 "
#define HTTP_KEYVAL_SEPARATOR ": "
@@ -76,7 +62,7 @@ static int process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const ch
{
// currently we care only about specific headers
// we can skip the rest
- if (!strcmp("content-length", key)) {
+ if (parse_ctx->content_length < 0 && !strcmp("content-length", key)) {
if (parse_ctx->content_length == TRANSFER_ENCODING_CHUNKED) {
netdata_log_error("Content-length and transfer-encoding: chunked headers are mutually exclusive");
return 1;
@@ -85,7 +71,7 @@ static int process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const ch
netdata_log_error("Duplicate content-length header");
return 1;
}
- parse_ctx->content_length = atoi(val);
+ parse_ctx->content_length = str2u(val);
if (parse_ctx->content_length < 0) {
netdata_log_error("Invalid content-length %d", parse_ctx->content_length);
return 1;
@@ -102,9 +88,20 @@ static int process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const ch
}
return 0;
}
+ char *val_cpy = strdupz(val);
+ c_rhash_insert_str_ptr(parse_ctx->headers, key, val_cpy);
return 0;
}
+const char *get_http_header_by_name(http_parse_ctx *ctx, const char *name)
+{
+ const char *ret;
+ if (c_rhash_get_ptr_by_str(ctx->headers, name, (void**)&ret))
+ return NULL;
+
+ return ret;
+}
+
static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx)
{
int idx, idx_end;
@@ -169,8 +166,8 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx)
case CHUNKED_CONTENT_CHUNK_SIZE:
if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx)) {
if (rbuf_bytes_available(buf) >= rbuf_get_capacity(buf))
- return PARSE_ERROR;
- return NEED_MORE_DATA;
+ return HTTP_PARSE_ERROR;
+ return HTTP_PARSE_NEED_MORE_DATA;
}
if (idx == 0) {
parse_ctx->chunked_content_state = CHUNKED_CONTENT_FINAL_CRLF;
@@ -178,7 +175,7 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx)
}
if (idx >= HTTP_HDR_BUFFER_SIZE) {
netdata_log_error("Chunk size is too long");
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
char buf_size[HTTP_HDR_BUFFER_SIZE];
rbuf_pop(buf, buf_size, idx);
@@ -186,13 +183,13 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx)
long chunk_size = strtol(buf_size, NULL, 16);
if (chunk_size < 0 || chunk_size == LONG_MAX) {
netdata_log_error("Chunk size out of range");
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
parse_ctx->chunk_size = chunk_size;
if (parse_ctx->chunk_size == 0) {
if (errno == EINVAL) {
netdata_log_error("Invalid chunk size");
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
parse_ctx->chunked_content_state = CHUNKED_CONTENT_CHUNK_END_CRLF;
continue;
@@ -204,7 +201,7 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx)
// fallthrough
case CHUNKED_CONTENT_CHUNK_DATA:
if (!(bytes_to_copy = rbuf_bytes_available(buf)))
- return NEED_MORE_DATA;
+ return HTTP_PARSE_NEED_MORE_DATA;
if (bytes_to_copy > parse_ctx->chunk_size - parse_ctx->chunk_got)
bytes_to_copy = parse_ctx->chunk_size - parse_ctx->chunk_got;
rbuf_pop(buf, parse_ctx->chunked_response + parse_ctx->chunked_response_written, bytes_to_copy);
@@ -217,19 +214,19 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx)
case CHUNKED_CONTENT_FINAL_CRLF:
case CHUNKED_CONTENT_CHUNK_END_CRLF:
if (rbuf_bytes_available(buf) < strlen(HTTP_LINE_TERM))
- return NEED_MORE_DATA;
+ return HTTP_PARSE_NEED_MORE_DATA;
char buf_crlf[strlen(HTTP_LINE_TERM)];
rbuf_pop(buf, buf_crlf, strlen(HTTP_LINE_TERM));
if (memcmp(buf_crlf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM))) {
netdata_log_error("CRLF expected");
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
if (parse_ctx->chunked_content_state == CHUNKED_CONTENT_FINAL_CRLF) {
if (parse_ctx->chunked_response_size != parse_ctx->chunked_response_written)
netdata_log_error("Chunked response size mismatch");
chunked_response_buffer_grow_by(parse_ctx, 1);
parse_ctx->chunked_response[parse_ctx->chunked_response_written] = 0;
- return PARSE_SUCCESS;
+ return HTTP_PARSE_SUCCESS;
}
if (parse_ctx->chunk_size == 0) {
parse_ctx->chunked_content_state = CHUNKED_CONTENT_FINAL_CRLF;
@@ -241,34 +238,34 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx)
} while(1);
}
-static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
+http_parse_rc parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
{
int idx;
char rc[4];
do {
if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx))
- return NEED_MORE_DATA;
+ return HTTP_PARSE_NEED_MORE_DATA;
switch (parse_ctx->state) {
case HTTP_PARSE_INITIAL:
if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) {
netdata_log_error("Expected response to start with \"%s\"", RESP_PROTO);
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
rbuf_bump_tail(buf, strlen(RESP_PROTO));
if (rbuf_pop(buf, rc, 4) != 4) {
netdata_log_error("Expected HTTP status code");
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
if (rc[3] != ' ') {
netdata_log_error("Expected space after HTTP return code");
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
rc[3] = 0;
parse_ctx->http_code = atoi(rc);
if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) {
netdata_log_error("HTTP code not in range 100 to 599");
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
}
rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
@@ -284,7 +281,7 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
break;
}
if (parse_http_hdr(buf, parse_ctx))
- return PARSE_ERROR;
+ return HTTP_PARSE_ERROR;
rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx);
rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM));
break;
@@ -294,11 +291,14 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx)
return process_chunked_content(buf, parse_ctx);
if (parse_ctx->content_length < 0)
- return PARSE_SUCCESS;
+ return HTTP_PARSE_SUCCESS;
+
+ if (parse_ctx->flags & HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT)
+ return HTTP_PARSE_SUCCESS;
if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length)
- return PARSE_SUCCESS;
- return NEED_MORE_DATA;
+ return HTTP_PARSE_SUCCESS;
+ return HTTP_PARSE_NEED_MORE_DATA;
}
} while(1);
}
@@ -486,7 +486,7 @@ static int read_parse_response(https_req_ctx_t *ctx) {
} while (ctx->poll_fd.events == 0 && rbuf_bytes_free(ctx->buf_rx) > 0);
} while (!(ret = parse_http_response(ctx->buf_rx, &ctx->parse_ctx)));
- if (ret != PARSE_SUCCESS) {
+ if (ret != HTTP_PARSE_SUCCESS) {
netdata_log_error("Error parsing HTTP response");
return 1;
}
@@ -500,7 +500,7 @@ static int handle_http_request(https_req_ctx_t *ctx) {
BUFFER *hdr = buffer_create(TX_BUFFER_SIZE, &netdata_buffers_statistics.buffers_aclk);
int rc = 0;
- http_parse_ctx_clear(&ctx->parse_ctx);
+ http_parse_ctx_create(&ctx->parse_ctx);
// Prepare data to send
switch (ctx->request->request_type) {
@@ -526,7 +526,7 @@ static int handle_http_request(https_req_ctx_t *ctx) {
buffer_strcat(hdr, ctx->request->url);
}
- buffer_strcat(hdr, " HTTP/1.1\x0D\x0A");
+ buffer_strcat(hdr, HTTP_1_1 HTTP_ENDL);
//TODO Headers!
if (ctx->request->request_type != HTTP_REQ_CONNECT) {
@@ -661,12 +661,15 @@ int https_request(https_req_t *request, https_req_response_t *response) {
ctx->request = &req;
if (handle_http_request(ctx)) {
netdata_log_error("Failed to CONNECT with proxy");
+ http_parse_ctx_destroy(&ctx->parse_ctx);
goto exit_sock;
}
if (ctx->parse_ctx.http_code != 200) {
netdata_log_error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code);
+ http_parse_ctx_destroy(&ctx->parse_ctx);
goto exit_sock;
}
+ http_parse_ctx_destroy(&ctx->parse_ctx);
netdata_log_info("Proxy accepted CONNECT upgrade");
}
ctx->request = request;
@@ -713,8 +716,10 @@ int https_request(https_req_t *request, https_req_response_t *response) {
// The actual request here
if (handle_http_request(ctx)) {
netdata_log_error("Couldn't process request");
+ http_parse_ctx_destroy(&ctx->parse_ctx);
goto exit_SSL;
}
+ http_parse_ctx_destroy(&ctx->parse_ctx);
response->http_code = ctx->parse_ctx.http_code;
if (ctx->parse_ctx.content_length == TRANSFER_ENCODING_CHUNKED) {
response->payload_size = ctx->parse_ctx.chunked_response_size;
diff --git a/aclk/https_client.h b/aclk/https_client.h
index daf4766f8..0b97fbb02 100644
--- a/aclk/https_client.h
+++ b/aclk/https_client.h
@@ -5,6 +5,9 @@
#include "libnetdata/libnetdata.h"
+#include "mqtt_websockets/c-rbuf/include/ringbuffer.h"
+#include "mqtt_websockets/c_rhash/include/c_rhash.h"
+
typedef enum http_req_type {
HTTP_REQ_GET = 0,
HTTP_REQ_POST,
@@ -77,4 +80,56 @@ void https_req_response_init(https_req_response_t *res);
int https_request(https_req_t *request, https_req_response_t *response);
+// we expose previously internal parser as this is usefull also from
+// other parts of the code
+enum http_parse_state {
+ HTTP_PARSE_INITIAL = 0,
+ HTTP_PARSE_HEADERS,
+ HTTP_PARSE_CONTENT
+};
+
+typedef uint32_t parse_ctx_flags_t;
+
+#define HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT ((parse_ctx_flags_t)0x01)
+
+#define HTTP_PARSE_FLAGS_DEFAULT ((parse_ctx_flags_t)0)
+
+typedef struct {
+ parse_ctx_flags_t flags;
+
+ enum http_parse_state state;
+ int content_length;
+ int http_code;
+
+ c_rhash headers;
+
+ // for chunked data only
+ char *chunked_response;
+ size_t chunked_response_size;
+ size_t chunked_response_written;
+
+ enum chunked_content_state {
+ CHUNKED_CONTENT_CHUNK_SIZE = 0,
+ CHUNKED_CONTENT_CHUNK_DATA,
+ CHUNKED_CONTENT_CHUNK_END_CRLF,
+ CHUNKED_CONTENT_FINAL_CRLF
+ } chunked_content_state;
+
+ size_t chunk_size;
+ size_t chunk_got;
+} http_parse_ctx;
+
+void http_parse_ctx_create(http_parse_ctx *ctx);
+void http_parse_ctx_destroy(http_parse_ctx *ctx);
+
+typedef enum {
+ HTTP_PARSE_ERROR = -1,
+ HTTP_PARSE_NEED_MORE_DATA = 0,
+ HTTP_PARSE_SUCCESS = 1
+} http_parse_rc;
+
+http_parse_rc parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx);
+
+const char *get_http_header_by_name(http_parse_ctx *ctx, const char *name);
+
#endif /* NETDATA_HTTPS_CLIENT_H */