338 lines
8.3 KiB
Go
338 lines
8.3 KiB
Go
package crowdsec
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// LAPIClient is a thread-safe HTTP client for the CrowdSec Local API.
|
|
type LAPIClient struct {
|
|
baseURL string
|
|
login string
|
|
password string
|
|
|
|
mu sync.RWMutex
|
|
token string // JWT, refreshed on 401
|
|
|
|
http *http.Client
|
|
}
|
|
|
|
// NewLAPIClient creates a new client. Call Login() before other methods.
|
|
func NewLAPIClient(baseURL, login, password string) *LAPIClient {
|
|
return &LAPIClient{
|
|
baseURL: baseURL,
|
|
login: login,
|
|
password: password,
|
|
http: &http.Client{
|
|
Timeout: 15 * time.Second,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Login authenticates with the LAPI and stores the JWT token.
|
|
func (c *LAPIClient) Login(ctx context.Context) error {
|
|
payload := LoginRequest{
|
|
MachineID: c.login,
|
|
Password: c.password,
|
|
}
|
|
|
|
body, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return fmt.Errorf("lapi login marshal: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
|
c.baseURL+"/v1/watchers/login", bytes.NewReader(body))
|
|
if err != nil {
|
|
return fmt.Errorf("lapi login request: %w", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := c.http.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("lapi login: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
data, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("lapi login: status %d: %s", resp.StatusCode, string(data))
|
|
}
|
|
|
|
var lr LoginResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&lr); err != nil {
|
|
return fmt.Errorf("lapi login decode: %w", err)
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.token = lr.Token
|
|
c.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// IsHealthy returns true if the LAPI responds to a ping.
|
|
func (c *LAPIClient) IsHealthy(ctx context.Context) bool {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/v1/decisions", nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
c.setAuth(req)
|
|
req.URL.RawQuery = "limit=1"
|
|
|
|
resp, err := c.http.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
resp.Body.Close()
|
|
return resp.StatusCode < 500
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Decisions
|
|
// -----------------------------------------------------------------------
|
|
|
|
// DecisionFilter contains optional filters for listing decisions.
|
|
type DecisionFilter struct {
|
|
Limit int
|
|
Offset int
|
|
Type string // ban, captcha, etc.
|
|
Scope string // Ip, Range, Country
|
|
Value string // specific IP/value
|
|
Origin string
|
|
}
|
|
|
|
// ListDecisions returns decisions matching the filter.
|
|
func (c *LAPIClient) ListDecisions(ctx context.Context, f DecisionFilter) ([]Decision, error) {
|
|
q := url.Values{}
|
|
q.Set("limit", strconv.Itoa(max(f.Limit, 100)))
|
|
if f.Offset > 0 {
|
|
q.Set("offset", strconv.Itoa(f.Offset))
|
|
}
|
|
if f.Type != "" {
|
|
q.Set("type", f.Type)
|
|
}
|
|
if f.Scope != "" {
|
|
q.Set("scope", f.Scope)
|
|
}
|
|
if f.Value != "" {
|
|
q.Set("value", f.Value)
|
|
}
|
|
if f.Origin != "" {
|
|
q.Set("origin", f.Origin)
|
|
}
|
|
|
|
var decisions []Decision
|
|
err := c.doJSON(ctx, http.MethodGet, "/v1/decisions?"+q.Encode(), nil, &decisions)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if decisions == nil {
|
|
decisions = []Decision{}
|
|
}
|
|
return decisions, nil
|
|
}
|
|
|
|
// DeleteDecision deletes a decision by ID.
|
|
func (c *LAPIClient) DeleteDecision(ctx context.Context, id int64) error {
|
|
return c.doJSON(ctx, http.MethodDelete,
|
|
fmt.Sprintf("/v1/decisions/%d", id), nil, nil)
|
|
}
|
|
|
|
// DeleteAllDecisions deletes all active decisions.
|
|
func (c *LAPIClient) DeleteAllDecisions(ctx context.Context) error {
|
|
return c.doJSON(ctx, http.MethodDelete, "/v1/decisions", nil, nil)
|
|
}
|
|
|
|
// AddDecision posts one manual decision to the LAPI.
|
|
func (c *LAPIClient) AddDecision(ctx context.Context, d DecisionInput) error {
|
|
payload := struct {
|
|
Decisions []DecisionInput `json:"decisions"`
|
|
}{
|
|
Decisions: []DecisionInput{d},
|
|
}
|
|
// The LAPI wraps decisions in an alert object
|
|
type alertPayload struct {
|
|
Capacity int32 `json:"capacity"`
|
|
Decisions []DecisionInput `json:"decisions"`
|
|
Events []interface{} `json:"events"`
|
|
EventsCount int32 `json:"events_count"`
|
|
Leakspeed string `json:"leakspeed"`
|
|
Message string `json:"message"`
|
|
ScenarioHash string `json:"scenario_hash"`
|
|
ScenarioVersion string `json:"scenario_version"`
|
|
Simulated bool `json:"simulated"`
|
|
Source AlertSource `json:"source"`
|
|
StartAt string `json:"start_at"`
|
|
StopAt string `json:"stop_at"`
|
|
}
|
|
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
|
ap := alertPayload{
|
|
Capacity: 0,
|
|
Decisions: payload.Decisions,
|
|
Events: []interface{}{},
|
|
EventsCount: 1,
|
|
Leakspeed: "0",
|
|
Message: fmt.Sprintf("manual decision for %s", d.Value),
|
|
Simulated: false,
|
|
Source: AlertSource{
|
|
Scope: d.Scope,
|
|
Value: d.Value,
|
|
IP: d.Value,
|
|
},
|
|
StartAt: now,
|
|
StopAt: now,
|
|
}
|
|
|
|
return c.doJSON(ctx, http.MethodPost, "/v1/alerts", []alertPayload{ap}, nil)
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Alerts
|
|
// -----------------------------------------------------------------------
|
|
|
|
// AlertFilter contains optional filters for listing alerts.
|
|
type AlertFilter struct {
|
|
Limit int
|
|
Offset int
|
|
Scenario string
|
|
IP string
|
|
Since string // duration string e.g. "24h"
|
|
}
|
|
|
|
// ListAlerts returns alerts matching the filter.
|
|
func (c *LAPIClient) ListAlerts(ctx context.Context, f AlertFilter) ([]Alert, error) {
|
|
q := url.Values{}
|
|
q.Set("limit", strconv.Itoa(max(f.Limit, 100)))
|
|
if f.Offset > 0 {
|
|
q.Set("offset", strconv.Itoa(f.Offset))
|
|
}
|
|
if f.Scenario != "" {
|
|
q.Set("scenario", f.Scenario)
|
|
}
|
|
if f.IP != "" {
|
|
q.Set("ip", f.IP)
|
|
}
|
|
if f.Since != "" {
|
|
q.Set("since", f.Since)
|
|
}
|
|
|
|
var alerts []Alert
|
|
err := c.doJSON(ctx, http.MethodGet, "/v1/alerts?"+q.Encode(), nil, &alerts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if alerts == nil {
|
|
alerts = []Alert{}
|
|
}
|
|
return alerts, nil
|
|
}
|
|
|
|
// DeleteAlert deletes an alert by ID.
|
|
func (c *LAPIClient) DeleteAlert(ctx context.Context, id int64) error {
|
|
return c.doJSON(ctx, http.MethodDelete,
|
|
fmt.Sprintf("/v1/alerts/%d", id), nil, nil)
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Internal helpers
|
|
// -----------------------------------------------------------------------
|
|
|
|
// doJSON performs an authenticated JSON request, automatically re-logging on 401.
|
|
func (c *LAPIClient) doJSON(ctx context.Context, method, path string, reqBody, respBody any) error {
|
|
err := c.doJSONOnce(ctx, method, path, reqBody, respBody)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
// If 401, try refreshing token once
|
|
if isUnauthorized(err) {
|
|
if loginErr := c.Login(ctx); loginErr != nil {
|
|
return fmt.Errorf("lapi re-login failed: %w", loginErr)
|
|
}
|
|
return c.doJSONOnce(ctx, method, path, reqBody, respBody)
|
|
}
|
|
return err
|
|
}
|
|
|
|
type lapiError struct {
|
|
status int
|
|
body string
|
|
}
|
|
|
|
func (e *lapiError) Error() string {
|
|
return fmt.Sprintf("lapi status %d: %s", e.status, e.body)
|
|
}
|
|
|
|
func isUnauthorized(err error) bool {
|
|
if e, ok := err.(*lapiError); ok {
|
|
return e.status == http.StatusUnauthorized
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *LAPIClient) doJSONOnce(ctx context.Context, method, path string, reqBody, respBody any) error {
|
|
var bodyReader io.Reader
|
|
if reqBody != nil {
|
|
b, err := json.Marshal(reqBody)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
bodyReader = bytes.NewReader(b)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, bodyReader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Accept", "application/json")
|
|
c.setAuth(req)
|
|
|
|
resp, err := c.http.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("lapi %s %s: %w", method, path, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
data, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return fmt.Errorf("lapi read body: %w", err)
|
|
}
|
|
|
|
if resp.StatusCode >= 400 {
|
|
return &lapiError{status: resp.StatusCode, body: string(data)}
|
|
}
|
|
|
|
if respBody != nil && len(data) > 0 && string(data) != "null" {
|
|
if err := json.Unmarshal(data, respBody); err != nil {
|
|
return fmt.Errorf("lapi decode response: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *LAPIClient) setAuth(r *http.Request) {
|
|
c.mu.RLock()
|
|
token := c.token
|
|
c.mu.RUnlock()
|
|
if token != "" {
|
|
r.Header.Set("Authorization", "Bearer "+token)
|
|
}
|
|
}
|
|
|
|
func max(a, b int) int {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|