Skip to content

Instantly share code, notes, and snippets.

@cwurm
Created November 1, 2018 10:56
Show Gist options
  • Save cwurm/71bcbcd5f1949386285085c780dcfb60 to your computer and use it in GitHub Desktop.
Save cwurm/71bcbcd5f1949386285085c780dcfb60 to your computer and use it in GitHub Desktop.
metricbeat/module/system/socket/connection.go - refactoring out reusable functionality from socket.go
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// +build linux
package socket
import (
"fmt"
"net"
"os"
"path/filepath"
"syscall"
"time"
"github.com/joeshaw/multierror"
"github.com/elastic/gosigar"
"github.com/elastic/gosigar/sys/linux"
"github.com/pkg/errors"
)
const (
ProcessType_PROCESS = "process"
ProcessType_RPC = "rpc"
)
type Connection struct {
Family linux.AddressFamily
LocalIP net.IP
LocalPort int
RemoteIP net.IP
RemotePort int
State linux.TCPState
Direction Direction
DestHost string // Reverse lookup of dest IP.
DestHostETLDPlusOne string
DestHostError error // Resolver error.
// Process identifiers.
Inode uint32 // Inode of the socket.
ProcessPID int // PID of the socket owner.
ProcessExe string // Absolute path to the executable.
ProcessCommand string // Command, or RPC program name
ProcessCmdLine string // Full command line with arguments.
ProcessType string // Process or RPC service.
ProcessError error // Reason process info is unavailable.
// User identifiers.
UID uint32 // UID of the socket owner.
Username string // Username of the socket.
}
type ConnectionList struct {
diags []*linux.InetDiagMsg
netlink *NetlinkSession
ptable *ProcTable
reverseLookup *ReverseLookupCache
listeners *ListenerTable
users *UserCache
rpcPortmap *RpcPortmap
euid int
// Signals if the above enrichment data structures need a refresh
// before being used again. Set by Refresh(), cleared by the first subsequent call
// to EnrichConnection().
staleEnrichments bool
}
// NewConnectionList creates a new connection list.
func NewConnectionList() *ConnectionList {
return &ConnectionList{
netlink: NewNetlinkSession(),
euid: os.Geteuid(),
}
}
// EnableEnrichmentWithDirection enables enrichment of a socket
// with its direction (Listening, Incoming, or Outgoing).
func (cl *ConnectionList) EnableEnrichmentWithDirection() {
cl.listeners = NewListenerTable()
}
// EnableEnrichmentWithUser looks up the username for the socket's UID.
func (cl *ConnectionList) EnableEnrichmentWithUser() {
users := NewUserCache()
cl.users = &users
}
// EnableEnrichmentWithProcess tries to find the process holding a socket's inode.
func (cl *ConnectionList) EnableEnrichmentWithProcess(hostFS string) (err error) {
cl.ptable, err = NewProcTable(filepath.Join(hostFS, gosigar.Procd))
if err != nil {
return errors.Wrap(err, "error when creating process table")
}
return nil
}
// EnableEnrichmentWithRPC searches the rpcbind portmap for a socket's ports.
func (cl *ConnectionList) EnableEnrichmentWithRPC() (err error) {
if cl.ptable == nil {
return errors.New("cannot enable RPC enrichment without process enrichment")
}
cl.rpcPortmap, err = GetRpcPortmap()
if err != nil {
return errors.Wrap(err, "error getting RPC portmap")
}
return nil
}
// EnableEnrichmentWithReverseLookup performs a reverse DNS lookup on a socket's remote IP.
func (cl *ConnectionList) EnableEnrichmentWithReverseLookup(successTTL, failureTTL time.Duration) error {
if cl.listeners == nil {
return errors.New("cannot enable reverse lookup enrichment without direction enrichment")
}
cl.reverseLookup = NewReverseLookupCache(successTTL, failureTTL)
return nil
}
func (cl *ConnectionList) Refresh() (err error) {
cl.diags, err = cl.netlink.GetSocketList()
if err != nil {
return errors.Wrap(err, "error getting sockets")
}
cl.staleEnrichments = true
return nil
}
func (cl *ConnectionList) GetDiags() []*linux.InetDiagMsg {
return cl.diags
}
func NewConnection(diag *linux.InetDiagMsg) *Connection {
return &Connection{
Family: linux.AddressFamily(diag.Family),
State: linux.TCPState(diag.State),
LocalIP: diag.SrcIP(),
LocalPort: diag.SrcPort(),
RemoteIP: diag.DstIP(),
RemotePort: diag.DstPort(),
Inode: diag.Inode,
UID: diag.UID,
ProcessPID: -1,
}
}
func (cl *ConnectionList) refreshEnrichments() (err error) {
var errs multierror.Errors
// Register all listening sockets.
if cl.listeners != nil {
cl.listeners.Reset()
for _, diag := range cl.diags {
if diag.DstPort() == 0 {
cl.listeners.Put(uint8(syscall.IPPROTO_TCP), diag.SrcIP(), diag.SrcPort())
}
}
}
// Refresh inode to process mapping (must be root).
if cl.ptable != nil {
if err := cl.ptable.Refresh(); err != nil {
errs = append(errs, errors.Wrap(err, "error when refreshing process table"))
}
}
// Refresh RPC portmap
if cl.rpcPortmap != nil {
cl.rpcPortmap, err = GetRpcPortmap()
if err != nil {
errs = append(errs, err)
}
}
return errs.Err()
}
// EnrichConnection enriches the connection with any enabled enrichments.
func (cl *ConnectionList) EnrichConnection(c *Connection) error {
// Refresh was called to get new sockets, need to refresh enrichment data as well
if cl.staleEnrichments {
if err := cl.refreshEnrichments(); err != nil {
return errors.Wrap(err, "refreshing enrichments failed - skipping enrichment")
}
}
// Enrichment: Username from UID.
if cl.users != nil {
c.Username = cl.users.LookupUID(int(c.UID))
}
// Enrichment: Determine direction (Listening, Incoming, or Outgoing).
if cl.listeners != nil {
c.Direction = cl.listeners.Direction(uint8(syscall.IPPROTO_TCP),
c.LocalIP, c.LocalPort, c.RemoteIP, c.RemotePort)
}
// Enrichment: Reverse DNS lookup on the remote IP.
if cl.reverseLookup != nil && c.Direction != Listening {
hostname, err := cl.reverseLookup.Lookup(c.RemoteIP)
if err != nil {
c.DestHostError = err
} else {
c.DestHost = hostname
c.DestHostETLDPlusOne, _ = etldPlusOne(hostname)
}
}
// Enrichment: Add process info by finding the process that holds the socket's inode.
if cl.ptable != nil && c.Inode != 0 {
proc := cl.ptable.ProcessBySocketInode(c.Inode)
if proc != nil {
c.ProcessPID = proc.PID
c.ProcessExe = proc.Executable
c.ProcessCommand = proc.Command
c.ProcessCmdLine = proc.CmdLine
c.ProcessType = ProcessType_PROCESS
} else if cl.rpcPortmap != nil && len(*cl.rpcPortmap) > 0 {
rpcService, found := (*cl.rpcPortmap)[uint32(c.LocalPort)]
if !found {
rpcService, found = (*cl.rpcPortmap)[uint32(c.RemotePort)]
}
if found {
c.ProcessCommand = rpcService.programName
c.ProcessType = ProcessType_RPC
}
}
if c.ProcessType == "" && cl.euid == 0 {
c.ProcessError = fmt.Errorf("process not found. inode=%v, tcp_state=%v",
c.Inode, c.State)
}
} else if c.Inode == 0 {
c.ProcessError = fmt.Errorf("process has exited. inode=%v, tcp_state=%v",
c.Inode, c.State)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment