diff options
Diffstat (limited to 'aclk')
30 files changed, 1848 insertions, 121 deletions
diff --git a/aclk/aclk-schemas/.gitignore b/aclk/aclk-schemas/.gitignore new file mode 100644 index 000000000..bd495e9a7 --- /dev/null +++ b/aclk/aclk-schemas/.gitignore @@ -0,0 +1,11 @@ +*.pb.go + +#Agent +*.pb.cc +*.pb.h +*.pb.o +*.pb.Po +.dirstamp + +#Jetbrains +.idea diff --git a/aclk/aclk-schemas/.travis.yml b/aclk/aclk-schemas/.travis.yml new file mode 100644 index 000000000..7c99550fe --- /dev/null +++ b/aclk/aclk-schemas/.travis.yml @@ -0,0 +1,4 @@ +--- +language: minimal +install: make deps +script: make CI diff --git a/aclk/aclk-schemas/LICENSE b/aclk/aclk-schemas/LICENSE new file mode 100644 index 000000000..f288702d2 --- /dev/null +++ b/aclk/aclk-schemas/LICENSE @@ -0,0 +1,674 @@ + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/> + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU General Public License is a free, copyleft license for +software and other kinds of works. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +the GNU General Public License is intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. We, the Free Software Foundation, use the +GNU General Public License for most of our software; it applies also to +any other work released this way by its authors. You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + To protect your rights, we need to prevent others from denying you +these rights or asking you to surrender the rights. Therefore, you have +certain responsibilities if you distribute copies of the software, or if +you modify it: responsibilities to respect the freedom of others. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must pass on to the recipients the same +freedoms that you received. You must make sure that they, too, receive +or can get the source code. And you must show them these terms so they +know their rights. + + Developers that use the GNU GPL protect your rights with two steps: +(1) assert copyright on the software, and (2) offer you this License +giving you legal permission to copy, distribute and/or modify it. + + For the developers' and authors' protection, the GPL clearly explains +that there is no warranty for this free software. For both users' and +authors' sake, the GPL requires that modified versions be marked as +changed, so that their problems will not be attributed erroneously to +authors of previous versions. + + Some devices are designed to deny users access to install or run +modified versions of the software inside them, although the manufacturer +can do so. This is fundamentally incompatible with the aim of +protecting users' freedom to change the software. The systematic +pattern of such abuse occurs in the area of products for individuals to +use, which is precisely where it is most unacceptable. Therefore, we +have designed this version of the GPL to prohibit the practice for those +products. If such problems arise substantially in other domains, we +stand ready to extend this provision to those domains in future versions +of the GPL, as needed to protect the freedom of users. + + Finally, every program is threatened constantly by software patents. +States should not allow patents to restrict development and use of +software on general-purpose computers, but in those that do, we wish to +avoid the special danger that patents applied to a free program could +make it effectively proprietary. To prevent this, the GPL assures that +patents cannot be used to render the program non-free. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Use with the GNU Affero General Public License. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU Affero General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the special requirements of the GNU Affero General Public License, +section 13, concerning interaction through a network will apply to the +combination as such. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + +Also add information on how to contact you by electronic and paper mail. + + If the program does terminal interaction, make it output a short +notice like this when it starts in an interactive mode: + + <program> Copyright (C) <year> <name of author> + This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, your program's commands +might be different; for a GUI interface, you would use an "about box". + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU GPL, see +<https://www.gnu.org/licenses/>. + + The GNU General Public License does not permit incorporating your program +into proprietary programs. If your program is a subroutine library, you +may consider it more useful to permit linking proprietary applications with +the library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. But first, please read +<https://www.gnu.org/licenses/why-not-lgpl.html>. diff --git a/aclk/aclk-schemas/Makefile b/aclk/aclk-schemas/Makefile new file mode 100644 index 000000000..8f4003070 --- /dev/null +++ b/aclk/aclk-schemas/Makefile @@ -0,0 +1,74 @@ +SHELL := /usr/bin/env bash -o pipefail + +# This controls the location of the cache. +PROJECT := cloud-schemas +# This controls the remote HTTPS git location to compare against for breaking changes in CI. +# +# Most CI providers only clone the branch under test and to a certain depth, so when +# running buf check breaking in CI, it is generally preferable to compare against +# the remote repository directly. +# +# Basic authentication is available, see https://buf.build/docs/inputs#https for more details. +HTTPS_GIT := https://github.com/netdata/cloud-schemas.git + +# This controls the version of buf to install and use. +BUF_VERSION := 0.6.0 + +### Everything below this line is meant to be static, i.e. only adjust the above variables. ### + +UNAME_OS := $(shell uname -s) +UNAME_ARCH := $(shell uname -m) +# Buf will be cached to ~/.cache/buf-example. +CACHE_BASE := $(HOME)/.cache/$(PROJECT) +# This allows switching between i.e a Docker container and your local setup without overwriting. +CACHE := $(CACHE_BASE)/$(UNAME_OS)/$(UNAME_ARCH) +# The location where buf will be installed. +CACHE_BIN := $(CACHE)/bin +# Marker files are put into this directory to denote the current version of binaries that are installed. +CACHE_VERSIONS := $(CACHE)/versions + +# Update the $PATH so we can use buf directly +export PATH := $(abspath $(CACHE_BIN)):$(PATH) + +# BUF points to the marker file for the installed version. +# +# If BUF_VERSION is changed, the binary will be re-downloaded. +BUF := $(CACHE_VERSIONS)/buf/$(BUF_VERSION) +$(BUF): + @rm -f $(CACHE_BIN)/buf + @mkdir -p $(CACHE_BIN) + curl -sSL \ + "https://github.com/bufbuild/buf/releases/download/v$(BUF_VERSION)/buf-$(UNAME_OS)-$(UNAME_ARCH)" \ + -o "$(CACHE_BIN)/buf" + chmod +x "$(CACHE_BIN)/buf" + @rm -rf $(dir $(BUF)) + @mkdir -p $(dir $(BUF)) + @touch $(BUF) + +.DEFAULT_GOAL := local + +# deps allows us to install deps without running any checks. + +.PHONY: deps +deps: $(BUF) + +# local is what we run when testing locally. +# This does breaking change detection against our local git repository. + +.PHONY: local +local: $(BUF) + buf check lint ./proto + buf check breaking --against '.git#branch=master' + +# https is what we run when testing in most CI providers. +# This does breaking change detection against our git repository. + +.PHONY: CI +CI: $(BUF) + buf check lint ./proto + buf check breaking --against ".git#branch=master" + +.PHONY: clean +clean: + git clean -xdf + rm -rf $(CACHE_BASE)
\ No newline at end of file diff --git a/aclk/aclk-schemas/README.md b/aclk/aclk-schemas/README.md new file mode 100644 index 000000000..aa6188977 --- /dev/null +++ b/aclk/aclk-schemas/README.md @@ -0,0 +1,2 @@ +# aclk-schemas +Protobuf schemas used in ACLK connection diff --git a/aclk/aclk-schemas/buf.yml b/aclk/aclk-schemas/buf.yml new file mode 100644 index 000000000..532053d91 --- /dev/null +++ b/aclk/aclk-schemas/buf.yml @@ -0,0 +1,9 @@ +build: + roots: + - proto +lint: + use: + - DEFAULT +breaking: + use: + - FILE diff --git a/aclk/aclk-schemas/proto/aclk/v1/lib.proto b/aclk/aclk-schemas/proto/aclk/v1/lib.proto new file mode 100644 index 000000000..f32c32c6e --- /dev/null +++ b/aclk/aclk-schemas/proto/aclk/v1/lib.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package aclk_lib.v1; + +import "google/protobuf/timestamp.proto"; + +option go_package = "aclk_lib/v1;aclklib"; + +// ACLKMessagePosition is used by sequenced messages to define their exact position +message ACLKMessagePosition { + uint64 sequence_id = 1; + // auto generated in Agent's DB upon sequence_id creation + google.protobuf.Timestamp seq_id_created_at = 2; + uint64 previous_sequence_id = 3; +} + +message Capability { + string name = 1; + uint32 version = 2; + // version == 0 is equivalent to not having the capability at all + bool enabled = 3; +} diff --git a/aclk/aclk-schemas/proto/agent/v1/cmds.proto b/aclk/aclk-schemas/proto/agent/v1/cmds.proto new file mode 100644 index 000000000..c37c00c3a --- /dev/null +++ b/aclk/aclk-schemas/proto/agent/v1/cmds.proto @@ -0,0 +1,79 @@ +syntax = "proto3"; +option go_package = "agent/v1;agent"; + +package agent.v1; + +import "google/protobuf/timestamp.proto"; +import "proto/aclk/v1/lib.proto"; + +message CancelPendingRequest { + // must match the ID sent with the request originally made + // other than this agent will not put conditions on it + // and will treat it as opaque string (it simply has to match) + // However this doesn't mean there are no conditions on the id + // made on the request side + string request_id = 1; + + // time when the cancellation request was generated + google.protobuf.Timestamp timestamp = 2; + + // optional might be useful for debugging purposes + string trace_id = 3; +} + +// AgentCommand is sent from the Cloud to the Agent at `/agent/{claim_id}/inbound/v1/cmd/AgentCommand` +// the message includes the resource that the Cloud needs to GET from the Agent HTTP API along with related metadata +message AgentCommand { + // the topic to which the Cloud awaits for the AgentCommandResponse. + // example: `/svc/agent-data-ctrl/2d7b7edd-561e-4aec-8ac1-466a585520f5/resp` + string callback_topic = 1; + // the topic to which the Cloud awaits for the AgentCommandAck. + // example: `/svc/agent-data-ctrl/2d7b7edd-561e-4aec-8ac1-466a585520f5/resp` + string ack_topic = 2; + // unique identifier for the AgentCommand + // example: `617038b3-7c2a-4617-a78f-ab37bd820198` + string message_id = 3; + // defined in milliseconds, the time the Agent has to respond before Cloud + // considering the request as timed-out + // example: `60000` + uint64 timeout = 4; + // defined in milliseconds, the time the Agent has to send back to the Cloud + // an AgentCommandAck message signaling that is still working on the request + // example: `3000` + uint64 ack_timeout = 5; + // the requested Agent resource + // example: `/api/v2/data?query_params_go_here` + string resource = 6; +} + +// AgentCommandAck is sent from the Agent to the Cloud at predefined intervals (`AgentCommand.ack_timeout`) to predefined topic (`AgentCommand.ack_topic`) +// signaling that the Agent is still working to serve an AgentCommand (referenced by the message_id) that the Cloud sent +message AgentCommandAck { + // unique identifier to reference AgentCommand on which the Agent is still working on serving + // example: `617038b3-7c2a-4617-a78f-ab37bd820198` + string message_id = 1; + // the timestamp when the Agent created this AgentCommandAck message + google.protobuf.Timestamp created_at = 2; + // integer revealing the progress of completion to serve the AgentCommand with the given message_id + // example: `25` + uint32 progress_percent = 3; +} + +// AgentCommandResponse is sent from the Agent to the Cloud at `/agent/{claim_id}/inbound/v1/cmd/AgentCommand` +// the message includes the resource that the Cloud needs to GET from the Agent HTTP API along with related metadata +message AgentCommandResponse { + // unique identifier for the AgentCommand + // example: `617038b3-7c2a-4617-a78f-ab37bd820198` + string message_id = 1; + // the (http) status code of the Agent's API response + // example: `200` + uint32 status_code = 2; + // the dumped raw (http) response the Agent's API returned + bytes response = 3; + // the Agent's timestamp (aka legacy `timestamp`) + google.protobuf.Timestamp timestamp = 4; + // the timestamp when the Agent received the AgentCommand for execution (aka legacy `t-rx`) + google.protobuf.Timestamp received_at = 5; + // the amount of microseconds the Agent needed to execute the HTTP request of the AgentCommand (aka legacy`t-exec`) + uint64 exec_time = 6; +} diff --git a/aclk/aclk-schemas/proto/agent/v1/connection.proto b/aclk/aclk-schemas/proto/agent/v1/connection.proto new file mode 100644 index 000000000..4321b0b90 --- /dev/null +++ b/aclk/aclk-schemas/proto/agent/v1/connection.proto @@ -0,0 +1,59 @@ +syntax = "proto3"; +option go_package = "agent/v1;agent"; + +package agent.v1; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "proto/aclk/v1/lib.proto"; + +message UpdateAgentConnection { + string claim_id = 1; + bool reachable = 2; + + int64 session_id = 3; + + ConnectionUpdateSource update_source = 4; + + // mqtt_broker_addr shard to use for reaching the agent + // cloud injects this information + string mqtt_broker_addr = 5; + + google.protobuf.Timestamp updated_at = 6; + + // vmq_instance_id broker shard to use for reaching the agent + // cloud injects this information + int32 vmq_instance_id = 7; + + // > 15 optional fields: + // How long the system was running until connection (only applicable when reachable=true) + google.protobuf.Duration system_uptime = 15; + + // How long the netdata agent was running until connection (only applicable when reachable=true) + google.protobuf.Duration agent_uptime = 16; + + repeated aclk_lib.v1.Capability capabilities = 17; +} + +message SendNodeInstances { + string claim_id = 1; + Config config = 2; +} + +// ConnectionUpdateSource is to determine whether the connection update was issued +enum ConnectionUpdateSource { + // CONNECTION_UPDATE_SOURCE_UNSPECIFIED acts as default value for protobuf and is never specified + CONNECTION_UPDATE_SOURCE_UNSPECIFIED = 0; + // CONNECTION_UPDATE_SOURCE_AGENT A direct message from an agent + CONNECTION_UPDATE_SOURCE_AGENT = 1; + // CONNECTION_UPDATE_SOURCE_LWT message delivered as the Last Will and Testiment from MQTT broker if an agent connection with the broker is lost + CONNECTION_UPDATE_SOURCE_LWT = 2; + // CONNECTION_UPDATE_SOURCE_HEURISTIC A cloud generated message to sanitize incorrect internal state + CONNECTION_UPDATE_SOURCE_HEURISTIC = 3; +} + +message Config { + bool bearer_protection = 1; + bool cloud_only_notifications = 2; + bool universal_dashboards = 3; +} diff --git a/aclk/aclk-schemas/proto/agent/v1/disconnect.proto b/aclk/aclk-schemas/proto/agent/v1/disconnect.proto new file mode 100644 index 000000000..852ef702a --- /dev/null +++ b/aclk/aclk-schemas/proto/agent/v1/disconnect.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package agent.v1; + +import "google/protobuf/timestamp.proto"; + +option go_package = "agent/v1;agent"; + +// Sent by Cloud to instruct Agent to disconnect ASAP +message DisconnectReq { + uint64 reconnect_after_seconds = 1; + bool permaban = 2; + google.protobuf.Timestamp created_at = 3; + uint32 error_code = 4; + string error_description = 5; +} diff --git a/aclk/aclk-schemas/proto/alarm/v1/config.proto b/aclk/aclk-schemas/proto/alarm/v1/config.proto new file mode 100644 index 000000000..430078fcf --- /dev/null +++ b/aclk/aclk-schemas/proto/alarm/v1/config.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; + +package alarms.v1; + +option go_package = "alarms/v1;alarms"; + +message SendAlarmConfiguration{ + string config_hash = 1; +} + +message ProvideAlarmConfiguration { + string config_hash = 1; + AlarmConfiguration config = 2; +} + +message AlarmConfiguration{ + string alarm = 1; + string template = 2; + string on_chart = 3; + + string classification = 4; + string type = 5; + string component = 6; + + string os = 7; + string hosts = 8; + string plugin = 9; + string module = 10; + string charts = 11; + string families = 12; + string lookup = 13; + string every = 14; + string units = 15; + + string green = 16; + string red = 17; + + string calculation_expr = 18; + string warning_expr = 19; + string critical_expr = 20; + + string recipient = 21; + string exec = 22; + string delay = 23; + string repeat = 24; + string info = 25; + string options = 26; + string host_labels = 27; + + //parsed values from above config values + //indicated by p_ + int32 p_db_lookup_after = 28; + int32 p_db_lookup_before = 29; + string p_db_lookup_dimensions = 30; + string p_db_lookup_method = 31; + string p_db_lookup_options = 32; + int32 p_update_every = 33; + + string chart_labels = 34; + string summary = 35; +} diff --git a/aclk/aclk-schemas/proto/alarm/v1/stream.proto b/aclk/aclk-schemas/proto/alarm/v1/stream.proto new file mode 100644 index 000000000..44f190682 --- /dev/null +++ b/aclk/aclk-schemas/proto/alarm/v1/stream.proto @@ -0,0 +1,148 @@ +syntax = "proto3"; + +package alarms.v1; + +import "google/protobuf/timestamp.proto"; + +option go_package = "alarms/v1;alarms"; + +message SendAlarmLogHealth { + string node_id = 1; +} + +message AlarmLogHealth { + string claim_id = 1; + string node_id = 2; + bool enabled = 3; + AlarmLogStatus status = 4; + LogEntries log_entries = 5; +} + +message LogEntries { + int64 first_sequence_id = 1; + google.protobuf.Timestamp first_when = 2; + + int64 last_sequence_id = 3; + google.protobuf.Timestamp last_when = 4; +} + +enum AlarmLogStatus { + ALARM_LOG_STATUS_UNSPECIFIED = 0; + ALARM_LOG_STATUS_RUNNING = 1; + ALARM_LOG_STATUS_IDLE = 2; +} + +message StartAlarmStreaming { + string node_id = 1; + uint64 batch_id = 2 [deprecated=true]; + uint64 start_sequnce_id = 3 [deprecated=true]; + // Instructs the agent to sync all configured alarms + bool resets = 4; +} + +message SendAlarmCheckpoint { + string node_id = 1; + string claim_id = 2; +} + +message AlarmCheckpoint { + string node_id = 1; + string claim_id = 2; + bytes checksum = 3; +} + +message AlarmLogEntry { + string node_id = 1; + string claim_id = 2; + + // The chart's id field + string chart = 3; + string name = 4; + string family = 5; + uint64 batch_id = 6 [deprecated=true]; + uint64 sequence_id = 7 [deprecated=true]; + uint64 when = 8; + + string config_hash = 9; + + int32 utc_offset = 10; + string timezone = 11; + + // Paths that can be custom for the same alarm, but depend on installation path for each user. Should be here or in config ? + string exec_path = 12; + string conf_source = 13; + string command = 14; + + // In seconds, uint32 is safe ? + uint32 duration = 15; + uint32 non_clear_duration = 16; + + AlarmStatus status = 17; + AlarmStatus old_status = 18; + uint64 delay = 19; + uint64 delay_up_to_timestamp = 20; + // Todo: verify that we need these. sequence_id doesn't suffice? + // uint64 updated_by_id = 12; + // uint64 updates_id = 13; + uint64 last_repeat = 21; + bool silenced = 22; + + // Check if string values are needed + string value_string = 23; + string old_value_string = 24; + + double value = 25; + double old_value = 26; + + // Updated alarm entry, when the status of the alarm has been updated by a later entry + bool updated = 27; + + // Rendered_info + string rendered_info = 28; + + // The chart's context field + string chart_context = 29; + + // Counter of alert transitions for this alert chain + uint64 event_id = 30; + + // A unique uuid for this alert event + string transition_id = 31; + + // The chart's name field + string chart_name = 32; + + // The rendered summary + string summary = 33; +} + +enum AlarmStatus { + ALARM_STATUS_NULL = 0; + ALARM_STATUS_UNKNOWN = 1; + ALARM_STATUS_REMOVED = 2; + ALARM_STATUS_NOT_A_NUMBER = 3; + ALARM_STATUS_CLEAR = 4; + ALARM_STATUS_WARNING = 5; + ALARM_STATUS_CRITICAL = 6; +} + +// SendAlarmSnapshot: send from cloud to the agent, to initiate an AlarmSnapshot image of current alarms back to the cloud +message SendAlarmSnapshot { + string node_id = 1; + string claim_id = 2; + uint64 snapshot_id = 3 [deprecated=true]; + uint64 sequence_id = 4 [deprecated=true]; + string snapshot_uuid = 5; +} + +// Agent responds with AlarmSnapshot to a SendAlarmSnapshot message +message AlarmSnapshot{ + string node_id = 1; + string claim_id = 2; + uint64 snapshot_id = 3 [deprecated=true]; // Same id from SendAlarmSnapshot message + uint32 chunks = 4; // In case full snapshot can not fit in a single message, indicates the total number of messages for this snapshot_id + uint32 chunk_size = 5; // How many alerts this chunk contains + uint32 chunk = 6; // Chunk index of this message + repeated AlarmLogEntry alarms = 7; // a list of AlarmLogEntry's + string snapshot_uuid = 8; +} diff --git a/aclk/aclk-schemas/proto/chart/v1/config.proto b/aclk/aclk-schemas/proto/chart/v1/config.proto new file mode 100644 index 000000000..f0c5e3a35 --- /dev/null +++ b/aclk/aclk-schemas/proto/chart/v1/config.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package chart.v1; + +option go_package = "chart/config/v1;chartconfig"; + +// UpdateChartConfigs command contains the list of missing chart configs from the cloud to agent +message UpdateChartConfigs { + // claim_id, node_id pair is used to identify the Node Instance + string claim_id = 1; + string node_id = 2; + // list of config hashes missing from cloud and requested from the agent + repeated string config_hashes = 3; +} + +message ChartConfigsUpdated { + repeated ChartConfigUpdated configs = 1; +} + +message ChartConfigUpdated { + string type = 1; + string family = 2; + string context = 3; + string title = 4; + uint64 priority = 5; + string plugin = 6; + string module = 7; + ChartType chart_type = 8; + string units = 9; + string config_hash = 10; +} + +enum ChartType { + LINE = 0; + AREA = 1; + STACKED = 2; +} diff --git a/aclk/aclk-schemas/proto/chart/v1/dimension.proto b/aclk/aclk-schemas/proto/chart/v1/dimension.proto new file mode 100644 index 000000000..8bcb564b8 --- /dev/null +++ b/aclk/aclk-schemas/proto/chart/v1/dimension.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package chart.v1; + +import "google/protobuf/timestamp.proto"; + +import "proto/aclk/v1/lib.proto"; + +option go_package = "chart/dimension/v1;chartdimension"; + +// ChartDimensionUpdated is a single event sent from the Agent to the Cloud containing chart dimension data. +// +// ChartDimensionUpdated messages are dispatched in bulk to the Cloud wrapped in ChartsAndDimensionsUpdated messages. +message ChartDimensionUpdated { + string id = 1; + string chart_id = 2; + string node_id = 3; + string claim_id = 4; + string name = 5; + google.protobuf.Timestamp created_at = 6; + // null value means that the dimension is currently collected (live) + google.protobuf.Timestamp last_timestamp = 7; + aclk_lib.v1.ACLKMessagePosition position = 8; +} diff --git a/aclk/aclk-schemas/proto/chart/v1/instance.proto b/aclk/aclk-schemas/proto/chart/v1/instance.proto new file mode 100644 index 000000000..25c99e7c7 --- /dev/null +++ b/aclk/aclk-schemas/proto/chart/v1/instance.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package chart.v1; + +import "proto/aclk/v1/lib.proto"; + +option go_package = "chart/instance/v1;chartinstance"; + +// ChartInstanceUpdated is a single event sent from the Agent to the Cloud containing chart instance data. +// +// ChartInstanceUpdated messages are dispatched in bulk to the Cloud wrapped in ChartsAndDimensionsUpdated messages. +message ChartInstanceUpdated { + string id = 1; + string claim_id = 2; + string node_id = 3; + string name = 4; + map<string, string> chart_labels = 5; + MemoryMode memory_mode = 6; + // in seconds + uint32 update_every_interval = 7; + string config_hash = 8; + aclk_lib.v1.ACLKMessagePosition position = 9; +} + +enum MemoryMode { + NONE = 0; + RAM = 1; + MAP = 2; + SAVE = 3; + ALLOC = 4; + DB_ENGINE = 5; +} diff --git a/aclk/aclk-schemas/proto/chart/v1/stream.proto b/aclk/aclk-schemas/proto/chart/v1/stream.proto new file mode 100644 index 000000000..9473538f2 --- /dev/null +++ b/aclk/aclk-schemas/proto/chart/v1/stream.proto @@ -0,0 +1,86 @@ +syntax = "proto3"; + +package chart.v1; + +import "google/protobuf/timestamp.proto"; + +import "proto/chart/v1/instance.proto"; +import "proto/chart/v1/dimension.proto"; + +option go_package = "chart/stream/v1;chartstream"; + +// StreamChartsAndDimensions is a Command produced by the Cloud, consumed by the Agent. +// +// It instructs the Agent to start sending ChartsAndDimensionsUpdated messages for a NodeInstance +// after the last sequence_id that the Cloud has successfully ingested. +message StreamChartsAndDimensions { + // claim_id, node_id pair is used to identify the Node Instance + string claim_id = 1; + string node_id = 2; + + // sequence_id last verified sequence sent by the Agent + uint64 sequence_id = 3; + // batch_id identifies the stream_id and gets incremented every time the Cloud sends a new StreamChartsAndDimensions command + uint64 batch_id = 4; + // seq_id_created_at autogenerated timestamp in Agent's DB upon sequence_id creation + google.protobuf.Timestamp seq_id_created_at = 5; +} + + +// ChartsAndDimensionsAck is an Event produced by the Cloud, consumed by the Agent. +// +// This Event is an acknowledgment from the Cloud side that Chart messages up to a specific last_sequence_id +// have been successfully ingested, and could be potentially deleted from the Agent's DB. +message ChartsAndDimensionsAck { + string claim_id = 1; + string node_id = 2; + // the last verified stored message's seq_id + uint64 last_sequence_id = 3; +} + +// ResetChartMessages is a Command produced by the Agent, consumed by the Cloud. +// +// This Command instructs the Cloud to clear its Chart state for a specific NodeInstance and re-sync +// because of a ResetReason. +message ResetChartMessages { + // claim_id, node_id pair is used to identify the Node Instance + string claim_id = 1; + string node_id = 2; + + ResetReason reason = 3; +} + +enum ResetReason { + DB_EMPTY = 0; + SEQ_ID_NOT_EXISTS = 1; + TIMESTAMP_MISMATCH = 2; +} + +// ChartsAndDimensionsUpdated is a wrapper Event (`fat` message) produced by the Agent, consumed by the Cloud. +// +// It potentially includes a collection of ChartInstanceUpdated messages and|or a collection of ChartDimensionUpdated messages. +message ChartsAndDimensionsUpdated { + repeated chart.v1.ChartInstanceUpdated charts = 1; + repeated chart.v1.ChartDimensionUpdated dimensions = 2; + uint64 batch_id = 3; +} + +// RetentionUpdated includes the available retentions (in seconds) of the dimensions - of a specific node instance and memory-mode - +// on a per update_every level. +// This message is sent over upon Agent Database rotation events to inform the Cloud in total about the newly updated data retentions +// of a node instance's dimensions. +message RetentionUpdated { + // claim_id, node_id pair is used to identify the Node Instance + string claim_id = 1; + string node_id = 2; + // the memory_mode used by the node instance's chart instances + chart.v1.MemoryMode memory_mode = 3; + // this mapping identifies the newly updated available retention (in seconds) of the node instance's dimensions + // the keys are the update_every categories of various dimensions (1, 2, 4, 10 etc.), + // and the values are the available retention (in seconds) of each dimension belonging to the update_every category + // denoted by the key + map<uint32, uint32> interval_durations = 4; + // the timestamp when the db rotation event took place. Can be used in conjunction with the interval_durations + // to compute the beginning of each `updated_every` group's retention + google.protobuf.Timestamp rotation_timestamp = 5; +} diff --git a/aclk/aclk-schemas/proto/context/v1/context.proto b/aclk/aclk-schemas/proto/context/v1/context.proto new file mode 100644 index 000000000..eb771f8eb --- /dev/null +++ b/aclk/aclk-schemas/proto/context/v1/context.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +package context.v1; + +option go_package = "context/v1;context"; + +// ContextsUpdated is an Event produced by the Agent, consumed by the Cloud. +// +// it contains a collection of ContextUpdated messages for a specific NodeInstance. +message ContextsUpdated { + // contexUpdates contains the collection of context updates + repeated ContextUpdated contextUpdates = 1; + // claim_id, node_id pair identifies the node instance + string claim_id = 2; + string node_id = 3; + // version_hash is the contexts version_hash result the cloud should + // get after applying this message updates. + uint64 version_hash = 4; + // it's and always increasing number to compare + // which version_hash is more recent between multiple + // ContextsUpdated messages. Bigger means more recent. + uint64 created_at = 5; +} + +// ContextUpdated contains context data. +message ContextUpdated { + // context id + string id = 1; + // context version is an epoch in seconds. + uint64 version = 2; + // first_entry, last_entry are epochs in seconds + uint64 first_entry = 3; + uint64 last_entry = 4; + // deleted flag is used to signal a context deletion + bool deleted = 5; + // context configuration fields + string title = 6; + uint64 priority = 7; + string chart_type = 8; + string units = 9; + string family = 10; +} + +// ContextsSnapshot is an Event produced by the Agent, consumed by the Cloud. +// +// it contains a snapshot of the existing contexts on the Agent. +// snapshot version and context versions are epochs in seconds so we can +// identify if a context version was generated after a specific snapshot. +message ContextsSnapshot { + // contexts contains the collection of existing contexts + repeated ContextUpdated contexts = 1; + // claim_id, node_id pair identifies the node instance + string claim_id = 2; + string node_id = 3; + // version is an epoch in seconds + uint64 version = 4; +} diff --git a/aclk/aclk-schemas/proto/context/v1/stream.proto b/aclk/aclk-schemas/proto/context/v1/stream.proto new file mode 100644 index 000000000..a6e7e3abf --- /dev/null +++ b/aclk/aclk-schemas/proto/context/v1/stream.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package context.v1; + +option go_package = "context/v1;context"; + +// ContextsCheckpoint is a Command produced by the Cloud, consumed by the Agent. +// +// It informs the Agent the contexts' version_hash that the cloud has for a specific NodeInstance. +message ContextsCheckpoint { + // claim_id, node_id pair is used to identify the NodeInstance. + string claim_id = 1; + string node_id = 2; + // version_hash tells the Agent the current version hash for the contexts received + // if the version hash calculated by the Agent is different, Agent will request + // to re-sync all contexts. + uint64 version_hash= 3; +} + +// StopStreamingContexts is a Command produced by the Cloud, consumed by the Agent. +// +// It instructs the Agent to stop sending ContextsUpdated messages for a NodeInstance +// due to a reason. +message StopStreamingContexts { + // claim_id, node_id pair is used to identify the node instance + string claim_id = 1; + string node_id = 2; + + StopStreamingContextsReason reason = 3; +} + +enum StopStreamingContextsReason { + RATE_LIMIT_EXCEEDED = 0; +} diff --git a/aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto b/aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto new file mode 100644 index 000000000..f0c02461e --- /dev/null +++ b/aclk/aclk-schemas/proto/nodeinstance/connection/v1/connection.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; +option go_package = "nodeinstance/connection/v1;nodeinstanceconnection"; + +package nodeinstance.v1; + +import "google/protobuf/timestamp.proto"; +import "proto/aclk/v1/lib.proto"; + +message UpdateNodeInstanceConnection { + string claim_id = 1; + string node_id = 2; + + // liveness whether node data are actively streamed to the agent. + bool liveness = 3; + + // queryable whether the agent has data about the node. + bool queryable = 4; + + int64 session_id = 5; + + google.protobuf.Timestamp updated_at = 6; + + // mqtt_broker_addr shard to use for reaching the agent + // cloud injects this information. + string mqtt_broker_addr = 7; + + // vmq_instance_id broker shard to use for reaching the agent + // cloud injects this information. + int32 vmq_instance_id = 8; + + // hops is the number of streaming hops between collection of node data + // and the claimed agent. Zero if no streaming is involved. + int32 hops = 9; + + // capabilities of node instance NOT the NODE or agent!!! + repeated aclk_lib.v1.Capability capabilities = 10; +} diff --git a/aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto b/aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto new file mode 100644 index 000000000..922337154 --- /dev/null +++ b/aclk/aclk-schemas/proto/nodeinstance/create/v1/creation.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; +option go_package = "node_instance/creation/v1;node_instancecreation"; + +package nodeinstance.create.v1; + +message CreateNodeInstance { + // Claim ID of the Agent the Node Instance belongs to. + // Eventually, the NodeInstance will be identified by the compilation of + // this claim_id and NodeID returned by `CreateNodeInstanceResult` + string claim_id = 1; + // Machine GUID of the Machine the request comes from + // Used to look for an existing NodeID in the space claim_id belongs to + string machine_guid = 2; + string hostname = 3; + + // vmq_instance_id broker shard to use for reaching the agent + // cloud injects this information. + int32 vmq_instance_id = 4; + // mqtt_broker_addr shard to use for reaching the agent + // cloud injects this information. + string mqtt_broker_addr = 5; + + // hops is the number of streaming hops between collection of node data + // and the claimed agent. Zero if no streaming is involved. + int32 hops = 6; +} + +message CreateNodeInstanceResult { + string node_id = 1; + string machine_guid = 2; +} + diff --git a/aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto b/aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto new file mode 100644 index 000000000..7aa9d0448 --- /dev/null +++ b/aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto @@ -0,0 +1,148 @@ +syntax = "proto3"; +option go_package = "node_instance/info/v1;nodeinstanceinfo"; + +package nodeinstance.info.v1; + +import "google/protobuf/timestamp.proto"; +import "proto/aclk/v1/lib.proto"; + +// UpdateNodeInfo (Command) +// +// pulsar topic: `UpdateNodeInfo` (sharded) +// +// key: `claim_id,node_id` +// +// Publishers: `netdata/agent` +// Subscribers: `cloud-node-mqtt-output-service` +// +// When: +// On nodeinstance connect +// +message UpdateNodeInfo { + string node_id = 7; + + string claim_id = 1; + + NodeInfo data = 2; + // to be obsoleted in future + // all new fields should go into node_info + // or node_instance_info respectively + + google.protobuf.Timestamp updated_at = 3; + + int64 session_id = 4; + + string machine_guid = 5; + + bool child = 6; + + MachineLearningInfo ml_info = 8; + // to be obsoleted in far future + + NodeInfo2 node_info = 9; + // node_info shows data about actual node + // for example feature (ml) for this + // node (child) might be available/enabled on the node (child) directly + // but not available trough the parent (node_instance) + + NodeInstanceInfo node_instance_info = 10; + // info specific to the node_instance for this node available trough agent + // who sends this message. + // e.g. machine learning is enabled for this node and processing is done + // by the actual agent (parent). (child itself might or might not be + // ml ml_capable by itself (see node_info)) +} + +message NodeInfo2 { + repeated aclk_lib.v1.Capability capabilities = 1; +} + +message NodeInstanceInfo { + repeated aclk_lib.v1.Capability capabilities = 1; +} + +// NodeInfo describes the metadata of a node +message NodeInfo { + string name = 1; + + string os = 2; + string os_name = 3; + string os_version = 4; + + string kernel_name = 5; + string kernel_version = 6; + + string architecture = 7; + + // number of cpu cores in the node + uint32 cpus = 8; + + // human readable (value + unit) frequency of cpu + string cpu_frequency = 9; + + // human readable (value + unit) size of node's memory + string memory = 10; + + // human readable (value + unit) size of all (sum) node's disks + string disk_space = 11; + + // version of the netdata agent + string version = 12; + + // release channel of netdata agent (example: nightly) + string release_channel = 13; + + string timezone = 14; + + // virtualization_type example: kvm (optional) + string virtualization_type = 15; + + // container_type example: docker (optional) + string container_type = 16; + + string custom_info = 17; + + // [Obsolete] repeated string services = 18; + reserved 18; + + string machine_guid = 19; + + // [Obsolete] repeated MirroredHostStatus mirrored_hosts_status = 20; + reserved 20; + + map<string, string> host_labels = 21; + + MachineLearningInfo ml_info = 22; + + // [Obsolete] repeated string collectors = 23; + reserved 23; +} + +message MachineLearningInfo { + // have ML capability + bool ml_capable = 1; + + // runs ML functionality + bool ml_enabled = 2; +} + +// UpdateNodeCollectors (Command) +// +// key: `claim_id,node_id` +// +// Publishers: `netdata/agent` +// +// When: +// On nodeinstance connect (after agent settles) and on detection of change of collectors +// + +message CollectorInfo { + string module = 1; + string plugin = 2; +} + +message UpdateNodeCollectors { + string claim_id = 1; + string node_id = 2; + repeated CollectorInfo collectors = 3; +} diff --git a/aclk/aclk.c b/aclk/aclk.c index 854408ce6..e95d7d6ab 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -154,7 +154,9 @@ biofailed: static int wait_till_cloud_enabled() { - netdata_log_info("Waiting for Cloud to be enabled"); + nd_log(NDLS_DAEMON, NDLP_INFO, + "Waiting for Cloud to be enabled"); + while (!netdata_cloud_enabled) { sleep_usec(USEC_PER_SEC * 1); if (!service_running(SERVICE_ACLK)) @@ -233,17 +235,22 @@ void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) switch(log_type) { case MQTT_WSS_LOG_ERROR: case MQTT_WSS_LOG_FATAL: + nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str); + return; + case MQTT_WSS_LOG_WARN: - error_report("%s", str); + nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str); return; + case MQTT_WSS_LOG_INFO: - netdata_log_info("%s", str); + nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str); return; + case MQTT_WSS_LOG_DEBUG: - netdata_log_debug(D_ACLK, "%s", str); return; + default: - netdata_log_error("Unknown log type from mqtt_wss"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss"); } } @@ -297,7 +304,9 @@ static void puback_callback(uint16_t packet_id) #endif if (aclk_shared_state.mqtt_shutdown_msg_id == (int)packet_id) { - netdata_log_info("Shutdown message has been acknowledged by the cloud. Exiting gracefully"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Shutdown message has been acknowledged by the cloud. Exiting gracefully"); + aclk_shared_state.mqtt_shutdown_msg_rcvd = 1; } } @@ -335,9 +344,11 @@ static int handle_connection(mqtt_wss_client client) } if (disconnect_req || aclk_kill_link) { - netdata_log_info("Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", - disconnect_req ? "true" : "false", - aclk_kill_link ? "true" : "false"); + nd_log(NDLS_DAEMON, NDLP_NOTICE, + "Going to restart connection due to disconnect_req=%s (cloud req), aclk_kill_link=%s (reclaim)", + disconnect_req ? "true" : "false", + aclk_kill_link ? "true" : "false"); + disconnect_req = 0; aclk_kill_link = 0; aclk_graceful_disconnect(client); @@ -390,7 +401,9 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) void aclk_graceful_disconnect(mqtt_wss_client client) { - netdata_log_info("Preparing to gracefully shutdown ACLK connection"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Preparing to gracefully shutdown ACLK connection"); + aclk_queue_lock(); aclk_queue_flush(); @@ -403,17 +416,22 @@ void aclk_graceful_disconnect(mqtt_wss_client client) break; } if (aclk_shared_state.mqtt_shutdown_msg_rcvd) { - netdata_log_info("MQTT App Layer `disconnect` message sent successfully"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "MQTT App Layer `disconnect` message sent successfully"); break; } } - netdata_log_info("ACLK link is down"); - netdata_log_access("ACLK DISCONNECTED"); + + nd_log(NDLS_DAEMON, NDLP_WARNING, "ACLK link is down"); + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED"); + aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); aclk_connected = 0; - netdata_log_info("Attempting to gracefully shutdown the MQTT/WSS connection"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Attempting to gracefully shutdown the MQTT/WSS connection"); + mqtt_wss_disconnect(client, 1000); } @@ -455,7 +473,9 @@ static int aclk_block_till_recon_allowed() { next_connection_attempt = now_realtime_sec() + (recon_delay / MSEC_PER_SEC); last_backoff_value = (float)recon_delay / MSEC_PER_SEC; - netdata_log_info("Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Wait before attempting to reconnect in %.3f seconds", recon_delay / (float)MSEC_PER_SEC); + // we want to wake up from time to time to check netdata_exit while (recon_delay) { @@ -593,7 +613,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) return 1; } - netdata_log_info("Attempting connection now"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Attempting connection now"); + memset(&base_url, 0, sizeof(url_t)); if (url_parse(aclk_cloud_base_url, &base_url)) { aclk_status = ACLK_STATUS_INVALID_CLOUD_URL; @@ -680,7 +702,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) error_report("Can't use encoding=proto without at least \"proto\" capability."); continue; } - netdata_log_info("New ACLK protobuf protocol negotiated successfully (/env response)."); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "New ACLK protobuf protocol negotiated successfully (/env response)."); memset(&auth_url, 0, sizeof(url_t)); if (url_parse(aclk_env->auth_endpoint, &auth_url)) { @@ -750,9 +774,9 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) if (!ret) { last_conn_time_mqtt = now_realtime_sec(); - netdata_log_info("ACLK connection successfully established"); + nd_log(NDLS_DAEMON, NDLP_INFO, "ACLK connection successfully established"); aclk_status = ACLK_STATUS_CONNECTED; - netdata_log_access("ACLK CONNECTED"); + nd_log(NDLS_ACCESS, NDLP_INFO, "ACLK CONNECTED"); mqtt_connected_actions(client); return 0; } @@ -798,7 +822,9 @@ void *aclk_main(void *ptr) netdata_thread_disable_cancelability(); #if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK ) - netdata_log_info("Killing ACLK thread -> cloud functionality has been disabled"); + nd_log(NDLS_DAEMON, NDLP_INFO, + "Killing ACLK thread -> cloud functionality has been disabled"); + static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; return NULL; #endif @@ -857,7 +883,7 @@ void *aclk_main(void *ptr) aclk_stats_upd_online(0); last_disconnect_time = now_realtime_sec(); aclk_connected = 0; - netdata_log_access("ACLK DISCONNECTED"); + nd_log(NDLS_ACCESS, NDLP_WARNING, "ACLK DISCONNECTED"); } } while (service_running(SERVICE_ACLK)); @@ -891,7 +917,7 @@ exit: return NULL; } -void aclk_host_state_update(RRDHOST *host, int cmd) +void aclk_host_state_update(RRDHOST *host, int cmd, int queryable) { uuid_t node_id; int ret = 0; @@ -924,7 +950,9 @@ void aclk_host_state_update(RRDHOST *host, int cmd) rrdhost_aclk_state_unlock(localhost); create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE; create_query->data.bin_payload.msg_name = "CreateNodeInstance"; - netdata_log_info("Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Registering host=%s, hops=%u", host->machine_guid, host->system_info->hops); + aclk_queue_query(create_query); return; } @@ -934,7 +962,7 @@ void aclk_host_state_update(RRDHOST *host, int cmd) node_instance_connection_t node_state_update = { .hops = host->system_info->hops, .live = cmd, - .queryable = 1, + .queryable = queryable, .session_id = aclk_session_newarch }; node_state_update.node_id = mallocz(UUID_STR_LEN); @@ -947,8 +975,9 @@ void aclk_host_state_update(RRDHOST *host, int cmd) query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - netdata_log_info("Queuing status update for node=%s, live=%d, hops=%u",(char*)node_state_update.node_id, cmd, - host->system_info->hops); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Queuing status update for node=%s, live=%d, hops=%u, queryable=%d", + (char*)node_state_update.node_id, cmd, host->system_info->hops, queryable); freez((void*)node_state_update.node_id); query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection"; query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN; @@ -990,9 +1019,10 @@ void aclk_send_node_instances() node_state_update.claim_id = localhost->aclk_state.claimed_id; query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update); rrdhost_aclk_state_unlock(localhost); - netdata_log_info("Queuing status update for node=%s, live=%d, hops=%d",(char*)node_state_update.node_id, - list->live, - list->hops); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Queuing status update for node=%s, live=%d, hops=%d, queryable=1", + (char*)node_state_update.node_id, list->live, list->hops); freez((void*)node_state_update.capabilities); freez((void*)node_state_update.node_id); @@ -1014,8 +1044,11 @@ void aclk_send_node_instances() node_instance_creation.claim_id = localhost->aclk_state.claimed_id, create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation); rrdhost_aclk_state_unlock(localhost); - netdata_log_info("Queuing registration for host=%s, hops=%d",(char*)node_instance_creation.machine_guid, - list->hops); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "Queuing registration for host=%s, hops=%d", + (char*)node_instance_creation.machine_guid, list->hops); + freez((void *)node_instance_creation.machine_guid); aclk_queue_query(create_query); } @@ -1322,7 +1355,7 @@ void add_aclk_host_labels(void) { void aclk_queue_node_info(RRDHOST *host, bool immediate) { - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; if (likely(wc)) wc->node_info_send_time = (host == localhost || immediate) ? 1 : now_realtime_sec(); } diff --git a/aclk/aclk.h b/aclk/aclk.h index 0badc1a62..72d1a2e11 100644 --- a/aclk/aclk.h +++ b/aclk/aclk.h @@ -75,7 +75,7 @@ extern struct aclk_shared_state { int mqtt_shutdown_msg_rcvd; } aclk_shared_state; -void aclk_host_state_update(RRDHOST *host, int cmd); +void aclk_host_state_update(RRDHOST *host, int cmd, int queryable); void aclk_send_node_instances(void); void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname); diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index abacbca83..5e3574b97 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -6,7 +6,6 @@ #include "../../web/server/web_client_cache.h" #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" -#define ACLK_MAX_WEB_RESPONSE_SIZE (30 * 1024 * 1024) pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; @@ -90,6 +89,12 @@ static bool aclk_web_client_interrupt_cb(struct web_client *w __maybe_unused, vo } static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "aclk"), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + int retval = 0; BUFFER *local_buffer = NULL; size_t size = 0; @@ -110,7 +115,7 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) usec_t t; web_client_timeout_checkpoint_set(w, query->timeout); if(web_client_timeout_checkpoint_and_check(w, &t)) { - netdata_log_access("QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout); + nd_log(NDLS_ACCESS, NDLP_ERR, "QUERY CANCELED: QUEUE TIME EXCEEDED %llu ms (LIMIT %d ms)", t / USEC_PER_MS, query->timeout); retval = 1; w->response.code = HTTP_RESP_SERVICE_UNAVAILABLE; aclk_http_msg_v2_err(query_thr->client, query->callback_topic, query->msg_id, w->response.code, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, NULL, 0); @@ -131,13 +136,6 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) w->response.code = (short)web_client_api_request_with_node_selection(localhost, w, path); web_client_timeout_checkpoint_response_ready(w, &t); - if(buffer_strlen(w->response.data) > ACLK_MAX_WEB_RESPONSE_SIZE) { - buffer_flush(w->response.data); - buffer_strcat(w->response.data, "response is too big"); - w->response.data->content_type = CT_TEXT_PLAIN; - w->response.code = HTTP_RESP_CONTENT_TOO_LONG; - } - if (aclk_stats_enabled) { ACLK_STATS_LOCK; aclk_metrics_per_sample.cloud_q_process_total += t; @@ -217,25 +215,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) // send msg. w->response.code = aclk_http_msg_v2(query_thr->client, query->callback_topic, query->msg_id, t, query->created, w->response.code, local_buffer->buffer, local_buffer->len); - struct timeval tv; - cleanup: - now_monotonic_high_precision_timeval(&tv); - netdata_log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'", - w->id - , gettid() - , query_thr->idx - , "DATA" - , sent - , size - , size > sent ? -(((size - sent) / (double)size) * 100.0) : ((size > 0) ? (((sent - size ) / (double)size) * 100.0) : 0.0) - , dt_usec(&w->timings.tv_ready, &w->timings.tv_in) / 1000.0 - , dt_usec(&tv, &w->timings.tv_ready) / 1000.0 - , dt_usec(&tv, &w->timings.tv_in) / 1000.0 - , w->response.code - , strip_control_characters((char *)buffer_tostring(w->url_as_received)) - ); - + web_client_log_completed_request(w, false); web_client_release_to_cache(w); pending_req_list_rm(query->msg_id); diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index 6e4cd93fb..0e91e28c0 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -108,7 +108,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur } start = payload + 4; - if(!(end = strstr(payload, " HTTP/1.1\x0D\x0A"))) { + if(!(end = strstr(payload, HTTP_1_1 HTTP_ENDL))) { errno = 0; netdata_log_error("Doesn't look like HTTP GET request."); return 1; @@ -455,7 +455,7 @@ int cancel_pending_req(const char *msg, size_t msg_len) return 1; } - netdata_log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id); + nd_log(NDLS_ACCESS, NDLP_NOTICE, "ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id); if (mark_pending_req_cancelled(cmd.request_id)) error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id); diff --git a/aclk/aclk_tx_msgs.c b/aclk/aclk_tx_msgs.c index 4102c818d..0e4182a72 100644 --- a/aclk/aclk_tx_msgs.c +++ b/aclk/aclk_tx_msgs.c @@ -86,7 +86,7 @@ static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_objec int rc = mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id); if (rc == MQTT_WSS_ERR_TOO_BIG_FOR_SERVER) - return HTTP_RESP_FORBIDDEN; + return HTTP_RESP_CONTENT_TOO_LONG; #ifdef NETDATA_INTERNAL_CHECKS aclk_stats_msg_published(packet_id); @@ -194,7 +194,7 @@ int aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_ int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len); switch (rc) { - case HTTP_RESP_FORBIDDEN: + case HTTP_RESP_CONTENT_TOO_LONG: aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, NULL, 0); break; case HTTP_RESP_INTERNAL_SERVER_ERROR: diff --git a/aclk/aclk_util.c b/aclk/aclk_util.c index 00920e069..3bf2e3f18 100644 --- a/aclk/aclk_util.c +++ b/aclk/aclk_util.c @@ -1,6 +1,9 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "aclk_util.h" + +#ifdef ENABLE_ACLK + #include "aclk_proxy.h" #include "daemon/common.h" @@ -437,6 +440,7 @@ void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt freez(proxy); } +#endif /* ENABLE_ACLK */ #if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) diff --git a/aclk/aclk_util.h b/aclk/aclk_util.h index 6b7e4e9c2..38ef5b0bc 100644 --- a/aclk/aclk_util.h +++ b/aclk/aclk_util.h @@ -3,6 +3,8 @@ #define ACLK_UTIL_H #include "libnetdata/libnetdata.h" + +#ifdef ENABLE_ACLK #include "mqtt_wss_client.h" #define CLOUD_EC_MALFORMED_NODE_ID 1 @@ -112,6 +114,7 @@ unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, un #define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0) void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type); +#endif /* ENABLE_ACLK */ int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len); diff --git a/aclk/https_client.c b/aclk/https_client.c index 623082027..5385786b8 100644 --- a/aclk/https_client.c +++ b/aclk/https_client.c @@ -4,20 +4,12 @@ #include "https_client.h" -#include "mqtt_websockets/c-rbuf/include/ringbuffer.h" - #include "aclk_util.h" #include "daemon/global_statistics.h" #define DEFAULT_CHUNKED_RESPONSE_BUFFER_SIZE (4096) -enum http_parse_state { - HTTP_PARSE_INITIAL = 0, - HTTP_PARSE_HEADERS, - HTTP_PARSE_CONTENT -}; - static const char *http_req_type_to_str(http_req_type_t req) { switch (req) { case HTTP_REQ_GET: @@ -33,39 +25,33 @@ static const char *http_req_type_to_str(http_req_type_t req) { #define TRANSFER_ENCODING_CHUNKED (-2) -typedef struct { - enum http_parse_state state; - int content_length; - int http_code; - - // for chunked data only - char *chunked_response; - size_t chunked_response_size; - size_t chunked_response_written; - - enum chunked_content_state { - CHUNKED_CONTENT_CHUNK_SIZE = 0, - CHUNKED_CONTENT_CHUNK_DATA, - CHUNKED_CONTENT_CHUNK_END_CRLF, - CHUNKED_CONTENT_FINAL_CRLF - } chunked_content_state; - - size_t chunk_size; - size_t chunk_got; -} http_parse_ctx; - #define HTTP_PARSE_CTX_INITIALIZER { .state = HTTP_PARSE_INITIAL, .content_length = -1, .http_code = 0 } -static inline void http_parse_ctx_clear(http_parse_ctx *ctx) { +void http_parse_ctx_create(http_parse_ctx *ctx) +{ ctx->state = HTTP_PARSE_INITIAL; ctx->content_length = -1; ctx->http_code = 0; + ctx->headers = c_rhash_new(0); + ctx->flags = HTTP_PARSE_FLAGS_DEFAULT; +} + +void http_parse_ctx_destroy(http_parse_ctx *ctx) +{ + c_rhash_iter_t iter; + const char *key; + + c_rhash_iter_t_initialize(&iter); + while ( !c_rhash_iter_str_keys(ctx->headers, &iter, &key) ) { + void *val; + c_rhash_get_ptr_by_str(ctx->headers, key, &val); + freez(val); + } + + c_rhash_destroy(ctx->headers); } #define POLL_TO_MS 100 -#define NEED_MORE_DATA 0 -#define PARSE_SUCCESS 1 -#define PARSE_ERROR -1 #define HTTP_LINE_TERM "\x0D\x0A" #define RESP_PROTO "HTTP/1.1 " #define HTTP_KEYVAL_SEPARATOR ": " @@ -76,7 +62,7 @@ static int process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const ch { // currently we care only about specific headers // we can skip the rest - if (!strcmp("content-length", key)) { + if (parse_ctx->content_length < 0 && !strcmp("content-length", key)) { if (parse_ctx->content_length == TRANSFER_ENCODING_CHUNKED) { netdata_log_error("Content-length and transfer-encoding: chunked headers are mutually exclusive"); return 1; @@ -85,7 +71,7 @@ static int process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const ch netdata_log_error("Duplicate content-length header"); return 1; } - parse_ctx->content_length = atoi(val); + parse_ctx->content_length = str2u(val); if (parse_ctx->content_length < 0) { netdata_log_error("Invalid content-length %d", parse_ctx->content_length); return 1; @@ -102,9 +88,20 @@ static int process_http_hdr(http_parse_ctx *parse_ctx, const char *key, const ch } return 0; } + char *val_cpy = strdupz(val); + c_rhash_insert_str_ptr(parse_ctx->headers, key, val_cpy); return 0; } +const char *get_http_header_by_name(http_parse_ctx *ctx, const char *name) +{ + const char *ret; + if (c_rhash_get_ptr_by_str(ctx->headers, name, (void**)&ret)) + return NULL; + + return ret; +} + static int parse_http_hdr(rbuf_t buf, http_parse_ctx *parse_ctx) { int idx, idx_end; @@ -169,8 +166,8 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx) case CHUNKED_CONTENT_CHUNK_SIZE: if (!rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx)) { if (rbuf_bytes_available(buf) >= rbuf_get_capacity(buf)) - return PARSE_ERROR; - return NEED_MORE_DATA; + return HTTP_PARSE_ERROR; + return HTTP_PARSE_NEED_MORE_DATA; } if (idx == 0) { parse_ctx->chunked_content_state = CHUNKED_CONTENT_FINAL_CRLF; @@ -178,7 +175,7 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx) } if (idx >= HTTP_HDR_BUFFER_SIZE) { netdata_log_error("Chunk size is too long"); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } char buf_size[HTTP_HDR_BUFFER_SIZE]; rbuf_pop(buf, buf_size, idx); @@ -186,13 +183,13 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx) long chunk_size = strtol(buf_size, NULL, 16); if (chunk_size < 0 || chunk_size == LONG_MAX) { netdata_log_error("Chunk size out of range"); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } parse_ctx->chunk_size = chunk_size; if (parse_ctx->chunk_size == 0) { if (errno == EINVAL) { netdata_log_error("Invalid chunk size"); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } parse_ctx->chunked_content_state = CHUNKED_CONTENT_CHUNK_END_CRLF; continue; @@ -204,7 +201,7 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx) // fallthrough case CHUNKED_CONTENT_CHUNK_DATA: if (!(bytes_to_copy = rbuf_bytes_available(buf))) - return NEED_MORE_DATA; + return HTTP_PARSE_NEED_MORE_DATA; if (bytes_to_copy > parse_ctx->chunk_size - parse_ctx->chunk_got) bytes_to_copy = parse_ctx->chunk_size - parse_ctx->chunk_got; rbuf_pop(buf, parse_ctx->chunked_response + parse_ctx->chunked_response_written, bytes_to_copy); @@ -217,19 +214,19 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx) case CHUNKED_CONTENT_FINAL_CRLF: case CHUNKED_CONTENT_CHUNK_END_CRLF: if (rbuf_bytes_available(buf) < strlen(HTTP_LINE_TERM)) - return NEED_MORE_DATA; + return HTTP_PARSE_NEED_MORE_DATA; char buf_crlf[strlen(HTTP_LINE_TERM)]; rbuf_pop(buf, buf_crlf, strlen(HTTP_LINE_TERM)); if (memcmp(buf_crlf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM))) { netdata_log_error("CRLF expected"); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } if (parse_ctx->chunked_content_state == CHUNKED_CONTENT_FINAL_CRLF) { if (parse_ctx->chunked_response_size != parse_ctx->chunked_response_written) netdata_log_error("Chunked response size mismatch"); chunked_response_buffer_grow_by(parse_ctx, 1); parse_ctx->chunked_response[parse_ctx->chunked_response_written] = 0; - return PARSE_SUCCESS; + return HTTP_PARSE_SUCCESS; } if (parse_ctx->chunk_size == 0) { parse_ctx->chunked_content_state = CHUNKED_CONTENT_FINAL_CRLF; @@ -241,34 +238,34 @@ static int process_chunked_content(rbuf_t buf, http_parse_ctx *parse_ctx) } while(1); } -static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) +http_parse_rc parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) { int idx; char rc[4]; do { if (parse_ctx->state != HTTP_PARSE_CONTENT && !rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx)) - return NEED_MORE_DATA; + return HTTP_PARSE_NEED_MORE_DATA; switch (parse_ctx->state) { case HTTP_PARSE_INITIAL: if (rbuf_memcmp_n(buf, RESP_PROTO, strlen(RESP_PROTO))) { netdata_log_error("Expected response to start with \"%s\"", RESP_PROTO); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } rbuf_bump_tail(buf, strlen(RESP_PROTO)); if (rbuf_pop(buf, rc, 4) != 4) { netdata_log_error("Expected HTTP status code"); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } if (rc[3] != ' ') { netdata_log_error("Expected space after HTTP return code"); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } rc[3] = 0; parse_ctx->http_code = atoi(rc); if (parse_ctx->http_code < 100 || parse_ctx->http_code >= 600) { netdata_log_error("HTTP code not in range 100 to 599"); - return PARSE_ERROR; + return HTTP_PARSE_ERROR; } rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx); @@ -284,7 +281,7 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) break; } if (parse_http_hdr(buf, parse_ctx)) - return PARSE_ERROR; + return HTTP_PARSE_ERROR; rbuf_find_bytes(buf, HTTP_LINE_TERM, strlen(HTTP_LINE_TERM), &idx); rbuf_bump_tail(buf, idx + strlen(HTTP_LINE_TERM)); break; @@ -294,11 +291,14 @@ static int parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx) return process_chunked_content(buf, parse_ctx); if (parse_ctx->content_length < 0) - return PARSE_SUCCESS; + return HTTP_PARSE_SUCCESS; + + if (parse_ctx->flags & HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT) + return HTTP_PARSE_SUCCESS; if (rbuf_bytes_available(buf) >= (size_t)parse_ctx->content_length) - return PARSE_SUCCESS; - return NEED_MORE_DATA; + return HTTP_PARSE_SUCCESS; + return HTTP_PARSE_NEED_MORE_DATA; } } while(1); } @@ -486,7 +486,7 @@ static int read_parse_response(https_req_ctx_t *ctx) { } while (ctx->poll_fd.events == 0 && rbuf_bytes_free(ctx->buf_rx) > 0); } while (!(ret = parse_http_response(ctx->buf_rx, &ctx->parse_ctx))); - if (ret != PARSE_SUCCESS) { + if (ret != HTTP_PARSE_SUCCESS) { netdata_log_error("Error parsing HTTP response"); return 1; } @@ -500,7 +500,7 @@ static int handle_http_request(https_req_ctx_t *ctx) { BUFFER *hdr = buffer_create(TX_BUFFER_SIZE, &netdata_buffers_statistics.buffers_aclk); int rc = 0; - http_parse_ctx_clear(&ctx->parse_ctx); + http_parse_ctx_create(&ctx->parse_ctx); // Prepare data to send switch (ctx->request->request_type) { @@ -526,7 +526,7 @@ static int handle_http_request(https_req_ctx_t *ctx) { buffer_strcat(hdr, ctx->request->url); } - buffer_strcat(hdr, " HTTP/1.1\x0D\x0A"); + buffer_strcat(hdr, HTTP_1_1 HTTP_ENDL); //TODO Headers! if (ctx->request->request_type != HTTP_REQ_CONNECT) { @@ -661,12 +661,15 @@ int https_request(https_req_t *request, https_req_response_t *response) { ctx->request = &req; if (handle_http_request(ctx)) { netdata_log_error("Failed to CONNECT with proxy"); + http_parse_ctx_destroy(&ctx->parse_ctx); goto exit_sock; } if (ctx->parse_ctx.http_code != 200) { netdata_log_error("Proxy didn't return 200 OK (got %d)", ctx->parse_ctx.http_code); + http_parse_ctx_destroy(&ctx->parse_ctx); goto exit_sock; } + http_parse_ctx_destroy(&ctx->parse_ctx); netdata_log_info("Proxy accepted CONNECT upgrade"); } ctx->request = request; @@ -713,8 +716,10 @@ int https_request(https_req_t *request, https_req_response_t *response) { // The actual request here if (handle_http_request(ctx)) { netdata_log_error("Couldn't process request"); + http_parse_ctx_destroy(&ctx->parse_ctx); goto exit_SSL; } + http_parse_ctx_destroy(&ctx->parse_ctx); response->http_code = ctx->parse_ctx.http_code; if (ctx->parse_ctx.content_length == TRANSFER_ENCODING_CHUNKED) { response->payload_size = ctx->parse_ctx.chunked_response_size; diff --git a/aclk/https_client.h b/aclk/https_client.h index daf4766f8..0b97fbb02 100644 --- a/aclk/https_client.h +++ b/aclk/https_client.h @@ -5,6 +5,9 @@ #include "libnetdata/libnetdata.h" +#include "mqtt_websockets/c-rbuf/include/ringbuffer.h" +#include "mqtt_websockets/c_rhash/include/c_rhash.h" + typedef enum http_req_type { HTTP_REQ_GET = 0, HTTP_REQ_POST, @@ -77,4 +80,56 @@ void https_req_response_init(https_req_response_t *res); int https_request(https_req_t *request, https_req_response_t *response); +// we expose previously internal parser as this is usefull also from +// other parts of the code +enum http_parse_state { + HTTP_PARSE_INITIAL = 0, + HTTP_PARSE_HEADERS, + HTTP_PARSE_CONTENT +}; + +typedef uint32_t parse_ctx_flags_t; + +#define HTTP_PARSE_FLAG_DONT_WAIT_FOR_CONTENT ((parse_ctx_flags_t)0x01) + +#define HTTP_PARSE_FLAGS_DEFAULT ((parse_ctx_flags_t)0) + +typedef struct { + parse_ctx_flags_t flags; + + enum http_parse_state state; + int content_length; + int http_code; + + c_rhash headers; + + // for chunked data only + char *chunked_response; + size_t chunked_response_size; + size_t chunked_response_written; + + enum chunked_content_state { + CHUNKED_CONTENT_CHUNK_SIZE = 0, + CHUNKED_CONTENT_CHUNK_DATA, + CHUNKED_CONTENT_CHUNK_END_CRLF, + CHUNKED_CONTENT_FINAL_CRLF + } chunked_content_state; + + size_t chunk_size; + size_t chunk_got; +} http_parse_ctx; + +void http_parse_ctx_create(http_parse_ctx *ctx); +void http_parse_ctx_destroy(http_parse_ctx *ctx); + +typedef enum { + HTTP_PARSE_ERROR = -1, + HTTP_PARSE_NEED_MORE_DATA = 0, + HTTP_PARSE_SUCCESS = 1 +} http_parse_rc; + +http_parse_rc parse_http_response(rbuf_t buf, http_parse_ctx *parse_ctx); + +const char *get_http_header_by_name(http_parse_ctx *ctx, const char *name); + #endif /* NETDATA_HTTPS_CLIENT_H */ |